Line data Source code
1 : #include "fd_grpc_client.h"
2 : #include "fd_grpc_client_private.h"
3 : #include "../../ballet/nanopb/pb_encode.h" /* pb_msgdesc_t */
4 : #include <sys/socket.h>
5 : #include "../h2/fd_h2_rbuf_sock.h"
6 : #include "fd_grpc_codec.h"
7 : #if FD_HAS_OPENSSL
8 : #include "../openssl/fd_openssl.h"
9 : #include <openssl/ssl.h>
10 : #include <openssl/err.h>
11 : #include "../h2/fd_h2_rbuf_ossl.h"
12 : #endif
13 :
14 : ulong
15 669 : fd_grpc_client_align( void ) {
16 669 : return fd_ulong_max( alignof(fd_grpc_client_t), fd_grpc_h2_stream_pool_align() );
17 669 : }
18 :
19 : ulong
20 168 : fd_grpc_client_footprint( ulong buf_max ) {
21 168 : ulong l = FD_LAYOUT_INIT;
22 168 : l = FD_LAYOUT_APPEND( l, alignof(fd_grpc_client_t), sizeof(fd_grpc_client_t) );
23 168 : l = FD_LAYOUT_APPEND( l, 1UL, buf_max ); /* nanopb_tx */
24 168 : l = FD_LAYOUT_APPEND( l, 1UL, buf_max ); /* frame_scratch */
25 168 : l = FD_LAYOUT_APPEND( l, 1UL, buf_max ); /* frame_rx_buf */
26 168 : l = FD_LAYOUT_APPEND( l, 1UL, buf_max ); /* frame_tx_buf */
27 168 : l = FD_LAYOUT_APPEND( l, fd_grpc_h2_stream_pool_align(), fd_grpc_h2_stream_pool_footprint( FD_GRPC_CLIENT_MAX_STREAMS ) );
28 168 : l = FD_LAYOUT_APPEND( l, 1UL, buf_max*FD_GRPC_CLIENT_MAX_STREAMS );
29 168 : return FD_LAYOUT_FINI( l, fd_grpc_client_align() );
30 168 : }
31 :
32 : static void
33 102 : fd_grpc_h2_stream_reset( fd_grpc_h2_stream_t * stream ) {
34 102 : memset( &stream->s, 0, sizeof(fd_h2_stream_t) );
35 102 : stream->request_ctx = 0UL;
36 102 : memset( &stream->hdrs, 0, sizeof(fd_grpc_resp_hdrs_t) );
37 102 : stream->hdrs.grpc_status = FD_GRPC_STATUS_UNKNOWN;
38 102 : stream->hdrs_received = 0;
39 102 : stream->msg_buf_used = 0UL;
40 102 : stream->msg_sz = 0UL;
41 102 : stream->has_header_deadline = 0;
42 102 : stream->has_rx_end_deadline = 0;
43 102 : }
44 :
45 : fd_grpc_client_t *
46 : fd_grpc_client_new( void * mem,
47 : fd_grpc_client_callbacks_t const * callbacks,
48 : fd_grpc_client_metrics_t * metrics,
49 : void * app_ctx,
50 : ulong buf_max,
51 84 : ulong rng_seed ) {
52 84 : if( FD_UNLIKELY( !mem ) ) {
53 0 : FD_LOG_WARNING(( "NULL mem" ));
54 0 : return NULL;
55 0 : }
56 84 : if( FD_UNLIKELY( buf_max<4096UL ) ) {
57 0 : FD_LOG_WARNING(( "undersz buf_max" ));
58 0 : return NULL;
59 0 : }
60 84 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, fd_grpc_client_align() ) ) ) {
61 0 : FD_LOG_WARNING(( "unaligned mem" ));
62 0 : return NULL;
63 0 : }
64 :
65 84 : FD_SCRATCH_ALLOC_INIT( l, mem );
66 84 : void * client_mem = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_grpc_client_t), sizeof(fd_grpc_client_t) );
67 84 : void * nanopb_tx = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max ); /* nanopb_tx */
68 84 : void * frame_scratch = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max ); /* frame_scratch */
69 84 : void * frame_rx_buf = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max ); /* frame_rx_buf */
70 84 : void * frame_tx_buf = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max ); /* frame_tx_buf */
71 84 : void * stream_pool_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_grpc_h2_stream_pool_align(), fd_grpc_h2_stream_pool_footprint( FD_GRPC_CLIENT_MAX_STREAMS ) );
72 84 : void * stream_buf_mem = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max*FD_GRPC_CLIENT_MAX_STREAMS );
73 84 : ulong end = FD_SCRATCH_ALLOC_FINI( l, fd_grpc_client_align() );
74 84 : FD_TEST( end-(ulong)mem == fd_grpc_client_footprint( buf_max ) );
75 :
76 84 : fd_grpc_client_t * client = client_mem;
77 :
78 84 : fd_grpc_h2_stream_t * stream_pool =
79 84 : fd_grpc_h2_stream_pool_join( fd_grpc_h2_stream_pool_new( stream_pool_mem, FD_GRPC_CLIENT_MAX_STREAMS ) );
80 84 : if( FD_UNLIKELY( !stream_pool ) ) FD_LOG_CRIT(( "Failed to create stream pool" )); /* unreachable */
81 :
82 84 : *client = (fd_grpc_client_t){
83 84 : .callbacks = callbacks,
84 84 : .ctx = app_ctx,
85 84 : .stream_pool = stream_pool,
86 84 : .stream_bufs = stream_buf_mem,
87 84 : .nanopb_tx = nanopb_tx,
88 84 : .nanopb_tx_max = buf_max,
89 84 : .frame_scratch = frame_scratch,
90 84 : .frame_scratch_max = buf_max,
91 84 : .frame_rx_buf = frame_rx_buf,
92 84 : .frame_rx_buf_max = buf_max,
93 84 : .frame_tx_buf = frame_tx_buf,
94 84 : .frame_tx_buf_max = buf_max,
95 84 : .metrics = metrics
96 84 : };
97 :
98 : /* FIXME for performance, cache this? */
99 84 : fd_h2_hdr_matcher_init( client->matcher, rng_seed );
100 84 : fd_h2_hdr_matcher_insert_literal( client->matcher, FD_GRPC_HDR_STATUS, "grpc-status" );
101 84 : fd_h2_hdr_matcher_insert_literal( client->matcher, FD_GRPC_HDR_MESSAGE, "grpc-message" );
102 :
103 84 : client->version_len = 5;
104 84 : memcpy( client->version, "0.0.0", 5 );
105 :
106 756 : for( ulong i=0UL; i<FD_GRPC_CLIENT_MAX_STREAMS; i++ ) {
107 672 : fd_grpc_h2_stream_t * stream = &client->stream_pool[ i ];
108 672 : stream->msg_buf = (uchar *)stream_buf_mem + (i*buf_max);
109 672 : stream->msg_buf_max = buf_max;
110 672 : FD_TEST( (ulong)( stream->msg_buf + stream->msg_buf_max )<=end );
111 672 : }
112 84 : fd_grpc_client_reset( client );
113 :
114 84 : return client;
115 84 : }
116 :
117 : void *
118 3 : fd_grpc_client_delete( fd_grpc_client_t * client ) {
119 3 : return client;
120 3 : }
121 :
122 : void
123 : fd_grpc_client_set_version( fd_grpc_client_t * client,
124 : char const * version,
125 0 : ulong version_len ) {
126 0 : if( FD_UNLIKELY( version_len > FD_GRPC_CLIENT_VERSION_LEN_MAX ) ) {
127 0 : FD_LOG_WARNING(( "Version string too long (%lu chars), ignoring", version_len ));
128 0 : return;
129 0 : }
130 0 : client->version_len = (uchar)version_len;
131 0 : memcpy( client->version, version, version_len );
132 0 : }
133 :
134 : void
135 : fd_grpc_client_set_authority( fd_grpc_client_t * client,
136 : char const * host,
137 : ulong host_len,
138 0 : ushort port ) {
139 0 : host_len = fd_ulong_min( host_len, sizeof(client->host)-1 );
140 0 : fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( client->host ), host, host_len ) );
141 0 : client->host_len = (uchar)host_len;
142 0 : client->port = (ushort)port;
143 0 : }
144 :
145 : int
146 114 : fd_grpc_client_stream_acquire_is_safe( fd_grpc_client_t * client ) {
147 : /* Sufficient quota to start a stream? */
148 114 : if( FD_UNLIKELY( client->conn->stream_active_cnt[1]+1 > client->conn->peer_settings.max_concurrent_streams ) ) {
149 18 : return 0;
150 18 : }
151 :
152 : /* Free stream object available? */
153 96 : if( FD_UNLIKELY( !fd_grpc_h2_stream_pool_free( client->stream_pool ) ) ) {
154 0 : return 0;
155 0 : }
156 96 : if( FD_UNLIKELY( client->stream_cnt >= FD_GRPC_CLIENT_MAX_STREAMS ) ) {
157 0 : return 0;
158 0 : }
159 :
160 96 : return 1;
161 96 : }
162 :
163 : fd_grpc_h2_stream_t *
164 : fd_grpc_client_stream_acquire( fd_grpc_client_t * client,
165 102 : ulong request_ctx ) {
166 102 : if( FD_UNLIKELY( client->stream_cnt >= FD_GRPC_CLIENT_MAX_STREAMS ) ) {
167 0 : FD_LOG_CRIT(( "stream pool exhausted" ));
168 0 : }
169 :
170 102 : fd_h2_conn_t * conn = client->conn;
171 102 : uint const stream_id = client->conn->tx_stream_next;
172 102 : conn->tx_stream_next += 2U;
173 :
174 102 : fd_grpc_h2_stream_t * stream = fd_grpc_h2_stream_pool_ele_acquire( client->stream_pool );
175 102 : fd_grpc_h2_stream_reset( stream );
176 102 : stream->request_ctx = request_ctx;
177 :
178 102 : fd_h2_stream_open( fd_h2_stream_init( &stream->s ), conn, stream_id );
179 102 : client->request_stream = stream;
180 102 : client->stream_ids[ client->stream_cnt ] = stream_id;
181 102 : client->streams [ client->stream_cnt ] = stream;
182 102 : client->stream_cnt++;
183 :
184 102 : return stream;
185 102 : }
186 :
187 : void
188 : fd_grpc_client_stream_release( fd_grpc_client_t * client,
189 21 : fd_grpc_h2_stream_t * stream ) {
190 21 : if( FD_UNLIKELY( !client->stream_cnt ) ) FD_LOG_CRIT(( "stream map corrupt" )); /* unreachable */
191 :
192 : /* Deallocate tx_op */
193 21 : if( FD_UNLIKELY( stream == client->request_stream ) ) {
194 15 : client->request_stream = NULL;
195 15 : *client->request_tx_op = (fd_h2_tx_op_t){0};
196 15 : }
197 :
198 : /* Remove stream from map */
199 21 : int map_idx = -1;
200 69 : for( uint i=0UL; i<(client->stream_cnt); i++ ) {
201 48 : if( client->stream_ids[ i ] == stream->s.stream_id ) {
202 21 : map_idx = (int)i;
203 21 : }
204 48 : }
205 21 : if( FD_UNLIKELY( map_idx<0 ) ) FD_LOG_CRIT(( "stream map corrupt" )); /* unreachable */
206 21 : if( (ulong)map_idx+1 < client->stream_cnt ) {
207 0 : client->stream_ids[ map_idx ] = client->stream_ids[ client->stream_cnt-1 ];
208 0 : client->streams [ map_idx ] = client->streams [ client->stream_cnt-1 ];
209 0 : }
210 21 : client->stream_cnt--;
211 :
212 21 : fd_grpc_h2_stream_pool_ele_release( client->stream_pool, stream );
213 21 : }
214 :
215 : void
216 87 : fd_grpc_client_reset( fd_grpc_client_t * client ) {
217 87 : fd_h2_rbuf_init( client->frame_rx, client->frame_rx_buf, client->frame_rx_buf_max );
218 87 : fd_h2_rbuf_init( client->frame_tx, client->frame_tx_buf, client->frame_tx_buf_max );
219 87 : fd_h2_conn_init_client( client->conn );
220 87 : client->conn->ctx = client;
221 87 : client->h2_hs_done = 0;
222 87 : client->ssl_hs_done = 0;
223 87 : client->request_stream = NULL;
224 87 : *client->request_tx_op = (fd_h2_tx_op_t){0};
225 :
226 : /* Disable RX flow control */
227 87 : client->conn->self_settings.initial_window_size = (1U<<31)-1U;
228 87 : client->conn->rx_wnd_max = (1U<<31)-1U;
229 87 : client->conn->rx_wnd_wmark = client->conn->rx_wnd_max - (1U<<20);
230 :
231 : /* Free all stream objects */
232 93 : while( client->stream_cnt ) {
233 6 : fd_grpc_h2_stream_t * stream = client->streams[ client->stream_cnt-1 ];
234 6 : fd_grpc_client_stream_release( client, stream );
235 6 : }
236 87 : }
237 :
238 : /* fd_grpc_client_send_stream_quota writes a WINDOW_UPDATE frame, which
239 : eventually allows the peer to send more data bytes. */
240 :
241 : static void
242 : fd_grpc_client_send_stream_quota( fd_h2_rbuf_t * rbuf_tx,
243 : fd_grpc_h2_stream_t * stream,
244 0 : uint bump ) {
245 0 : fd_h2_window_update_t window_update = {
246 0 : .hdr = {
247 0 : .typlen = fd_h2_frame_typlen( FD_H2_FRAME_TYPE_WINDOW_UPDATE, 4UL ),
248 0 : .r_stream_id = fd_uint_bswap( stream->s.stream_id )
249 0 : },
250 0 : .increment = fd_uint_bswap( bump )
251 0 : };
252 0 : fd_h2_rbuf_push( rbuf_tx, &window_update, sizeof(fd_h2_window_update_t) );
253 0 : stream->s.rx_wnd += bump;
254 0 : }
255 :
256 : /* fd_grpc_client_send_timeout is called when a stream timeout triggers.
257 : Calls back to the user, writes a RST_STREAM frame, and frees the
258 : stream object. */
259 :
260 : static void
261 : fd_grpc_client_send_timeout( fd_h2_rbuf_t * rbuf_tx,
262 : fd_grpc_client_t * client,
263 : fd_grpc_h2_stream_t * stream,
264 6 : int deadline_kind ) {
265 6 : client->callbacks->rx_timeout( client->ctx, stream->request_ctx, deadline_kind );
266 6 : fd_h2_tx_rst_stream( rbuf_tx, stream->s.stream_id, FD_H2_ERR_CANCEL );
267 6 : fd_grpc_client_stream_release( client, stream );
268 6 : }
269 :
270 : void
271 : fd_grpc_client_service_streams( fd_grpc_client_t * client,
272 33 : long ts_nanos ) {
273 33 : ulong const meta_frame_max =
274 33 : fd_ulong_max( sizeof(fd_h2_window_update_t), sizeof(fd_h2_rst_stream_t) );
275 33 : fd_h2_conn_t * conn = client->conn;
276 33 : fd_h2_rbuf_t * rbuf_tx = client->frame_tx;
277 33 : if( FD_UNLIKELY( conn->flags ) ) return;
278 33 : uint const wnd_max = conn->self_settings.initial_window_size;
279 33 : uint const wnd_thres = wnd_max / 2;
280 105 : for( ulong i=0UL; i<(client->stream_cnt); i++ ) {
281 72 : if( FD_UNLIKELY( fd_h2_rbuf_free_sz( rbuf_tx )<meta_frame_max ) ) break;
282 72 : fd_grpc_h2_stream_t * stream = client->streams[ i ];
283 :
284 72 : if( FD_UNLIKELY( ( stream->has_header_deadline ) &
285 72 : ( stream->header_deadline_nanos - ts_nanos <= 0L ) ) ) {
286 3 : fd_grpc_client_send_timeout( rbuf_tx, client, stream, FD_GRPC_DEADLINE_HEADER );
287 3 : i--; /* stream removed */
288 3 : continue;
289 3 : }
290 :
291 69 : if( FD_UNLIKELY( ( stream->has_rx_end_deadline ) &
292 69 : ( stream->rx_end_deadline_nanos - ts_nanos <= 0L ) ) ) {
293 3 : fd_grpc_client_send_timeout( rbuf_tx, client, stream, FD_GRPC_DEADLINE_RX_END );
294 3 : i--; /* stream removed */
295 3 : continue;
296 3 : }
297 :
298 66 : if( FD_UNLIKELY( stream->s.rx_wnd < wnd_thres ) ) {
299 0 : uint const bump = wnd_max - stream->s.rx_wnd;
300 0 : fd_grpc_client_send_stream_quota( rbuf_tx, stream, bump );
301 0 : }
302 66 : }
303 33 : }
304 :
305 : #if FD_HAS_OPENSSL
306 :
307 : static int
308 : fd_ossl_log_error( char const * str,
309 : ulong len,
310 0 : void * ctx ) {
311 0 : (void)ctx;
312 0 : if( len>0 && str[ len-1 ]=='\n' ) len--;
313 0 : FD_LOG_INFO(( "%.*s", (int)len, str ));
314 0 : return 0;
315 0 : }
316 :
317 : int
318 : fd_grpc_client_rxtx_ossl( fd_grpc_client_t * client,
319 : SSL * ssl,
320 0 : int * charge_busy ) {
321 0 : if( FD_UNLIKELY( !client->ssl_hs_done ) ) {
322 0 : int res = SSL_do_handshake( ssl );
323 0 : if( res<=0 ) {
324 0 : int error = SSL_get_error( ssl, res );
325 0 : if( FD_LIKELY( error==SSL_ERROR_WANT_READ || error==SSL_ERROR_WANT_WRITE ) ) return 0;
326 0 : FD_LOG_INFO(( "SSL_do_handshake failed (%i-%s)", error, fd_openssl_ssl_strerror( error ) ));
327 0 : long verify_result = SSL_get_verify_result( ssl );
328 0 : if( error == SSL_ERROR_SSL && verify_result != X509_V_OK ) {
329 0 : FD_LOG_WARNING(( "Certificate verification failed: %s", X509_verify_cert_error_string( verify_result ) ));
330 0 : }
331 0 : ERR_print_errors_cb( fd_ossl_log_error, NULL );
332 0 : return -1;
333 0 : } else {
334 0 : client->ssl_hs_done = 1;
335 0 : }
336 0 : }
337 :
338 0 : fd_h2_conn_t * conn = client->conn;
339 0 : int ssl_err = 0;
340 0 : ulong read_sz = fd_h2_rbuf_ssl_read( client->frame_rx, ssl, &ssl_err );
341 0 : if( FD_UNLIKELY( ssl_err && ssl_err!=SSL_ERROR_WANT_READ ) ) {
342 0 : if( ssl_err==SSL_ERROR_ZERO_RETURN ) {
343 0 : FD_LOG_WARNING(( "gRPC server closed connection" ));
344 0 : return -1;
345 0 : }
346 0 : FD_LOG_WARNING(( "SSL_read_ex failed (%i-%s)", ssl_err, fd_openssl_ssl_strerror( ssl_err ) ));
347 0 : ERR_print_errors_cb( fd_ossl_log_error, NULL );
348 0 : return -1;
349 0 : }
350 0 : if( FD_UNLIKELY( conn->flags ) ) fd_h2_tx_control( conn, client->frame_tx, &fd_grpc_client_h2_callbacks );
351 0 : fd_h2_rx( conn, client->frame_rx, client->frame_tx, client->frame_scratch, client->frame_scratch_max, &fd_grpc_client_h2_callbacks );
352 0 : fd_grpc_client_service_streams( client, fd_log_wallclock() );
353 0 : ulong write_sz = fd_h2_rbuf_ssl_write( client->frame_tx, ssl );
354 0 : client->metrics->stream_chunks_rx_bytes += read_sz;
355 0 : client->metrics->stream_chunks_tx_bytes += write_sz;
356 :
357 0 : if( read_sz!=0 || write_sz!=0 ) *charge_busy = 1;
358 0 : return 0;
359 0 : }
360 :
361 : #endif /* FD_HAS_OPENSSL */
362 :
363 : #if FD_H2_HAS_SOCKETS
364 :
365 : int
366 : fd_grpc_client_rxtx_socket( fd_grpc_client_t * client,
367 : int sock_fd,
368 27 : int * charge_busy ) {
369 27 : fd_h2_conn_t * conn = client->conn;
370 27 : ulong const frame_rx_lo_0 = client->frame_rx->lo_off;
371 27 : ulong const frame_rx_hi_0 = client->frame_rx->hi_off;
372 27 : ulong const frame_tx_lo_1 = client->frame_tx->lo_off;
373 27 : ulong const frame_tx_hi_1 = client->frame_tx->hi_off;
374 :
375 27 : int rx_err = fd_h2_rbuf_recvmsg( client->frame_rx, sock_fd, MSG_NOSIGNAL|MSG_DONTWAIT );
376 27 : if( FD_UNLIKELY( rx_err ) ) {
377 0 : FD_LOG_INFO(( "Disconnected: recvmsg error (%i-%s)", rx_err, fd_io_strerror( rx_err ) ));
378 0 : errno = rx_err;
379 0 : return -1;
380 0 : }
381 :
382 27 : if( FD_UNLIKELY( conn->flags ) ) fd_h2_tx_control( conn, client->frame_tx, &fd_grpc_client_h2_callbacks );
383 27 : fd_h2_rx( conn, client->frame_rx, client->frame_tx, client->frame_scratch, client->frame_scratch_max, &fd_grpc_client_h2_callbacks );
384 27 : fd_grpc_client_service_streams( client, fd_log_wallclock() );
385 :
386 27 : int tx_err = fd_h2_rbuf_sendmsg( client->frame_tx, sock_fd, MSG_NOSIGNAL|MSG_DONTWAIT );
387 27 : if( FD_LIKELY( tx_err && tx_err==EAGAIN ) ) return 1;
388 27 : if( FD_UNLIKELY( tx_err ) ) {
389 0 : FD_LOG_WARNING(( "fd_h2_rbuf_sendmsg failed (%i-%s)", tx_err, fd_io_strerror( tx_err ) ));
390 0 : errno = tx_err;
391 0 : return -1;
392 0 : }
393 :
394 27 : ulong const frame_rx_lo_1 = client->frame_rx->lo_off;
395 27 : ulong const frame_rx_hi_1 = client->frame_rx->hi_off;
396 27 : ulong const frame_tx_lo_0 = client->frame_tx->lo_off;
397 27 : ulong const frame_tx_hi_0 = client->frame_tx->hi_off;
398 :
399 27 : client->metrics->stream_chunks_rx_bytes += frame_rx_hi_1 - frame_rx_hi_0;
400 27 : client->metrics->stream_chunks_tx_bytes += frame_tx_lo_0 - frame_tx_lo_1;
401 :
402 27 : if( frame_rx_lo_0!=frame_rx_lo_1 || frame_rx_hi_0!=frame_rx_hi_1 ||
403 27 : frame_tx_lo_0!=frame_tx_lo_1 || frame_tx_hi_0!=frame_tx_hi_1 ) {
404 3 : *charge_busy = 1;
405 3 : }
406 :
407 27 : return 0;
408 27 : }
409 :
410 : #endif /* FD_H2_HAS_SOCKETS */
411 :
412 : /* fd_grpc_client_request continue attempts to write a request data
413 : frame. */
414 :
415 : static int
416 9 : fd_grpc_client_request_continue1( fd_grpc_client_t * client ) {
417 9 : fd_grpc_h2_stream_t * stream = client->request_stream;
418 9 : fd_h2_stream_t * h2_stream = &stream->s;
419 9 : fd_h2_tx_op_copy( client->conn, h2_stream, client->frame_tx, client->request_tx_op );
420 9 : if( FD_UNLIKELY( client->request_tx_op->chunk_sz ) ) return 0;
421 9 : if( FD_UNLIKELY( h2_stream->state != FD_H2_STREAM_STATE_CLOSING_TX ) ) return 0;
422 9 : client->metrics->stream_chunks_tx_cnt++;
423 : /* Request finished */
424 9 : client->request_stream = NULL;
425 9 : client->callbacks->tx_complete( client->ctx, stream->request_ctx );
426 9 : return 1;
427 9 : }
428 :
429 : static int
430 0 : fd_grpc_client_request_continue( fd_grpc_client_t * client ) {
431 0 : if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD ) ) return 0;
432 0 : if( FD_UNLIKELY( !client->request_stream ) ) return 0;
433 0 : if( FD_UNLIKELY( !client->request_tx_op->chunk_sz ) ) return 0;
434 0 : return fd_grpc_client_request_continue1( client );
435 0 : }
436 :
437 : int
438 114 : fd_grpc_client_is_connected( fd_grpc_client_t * client ) {
439 114 : if( FD_UNLIKELY( !client ) ) return 0;
440 114 : if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD ) ) return 0;
441 114 : if( FD_UNLIKELY( !client->h2_hs_done ) ) return 0;
442 111 : return 1;
443 114 : }
444 :
445 : int
446 114 : fd_grpc_client_request_is_blocked( fd_grpc_client_t * client ) {
447 114 : if( FD_UNLIKELY( !client ) ) return 1;
448 114 : if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD ) ) return 1;
449 114 : if( FD_UNLIKELY( !client->h2_hs_done ) ) return 1;
450 114 : if( FD_UNLIKELY( !fd_h2_rbuf_is_empty( client->frame_tx ) ) ) return 1;
451 114 : if( FD_UNLIKELY( client->request_tx_op->chunk_sz > 0UL ) ) return 1;
452 114 : if( FD_UNLIKELY( !fd_grpc_client_stream_acquire_is_safe( client ) ) ) return 1;
453 96 : return 0;
454 114 : }
455 :
456 : int
457 0 : fd_grpc_client_request_stream_busy( fd_grpc_client_t * client ) {
458 0 : return client->request_stream != NULL;
459 0 : }
460 :
461 : fd_grpc_h2_stream_t *
462 : fd_grpc_client_request_start(
463 : fd_grpc_client_t * client,
464 : char const * path,
465 : ulong path_len,
466 : ulong request_ctx,
467 : pb_msgdesc_t const * fields,
468 : void const * message,
469 : char const * auth_token,
470 : ulong auth_token_sz,
471 : int is_streaming
472 9 : ) {
473 9 : if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( client ) ) ) return NULL;
474 :
475 : /* Encode message */
476 9 : FD_TEST( client->nanopb_tx_max > sizeof(fd_grpc_hdr_t) );
477 9 : uchar * proto_buf = client->nanopb_tx + sizeof(fd_grpc_hdr_t);
478 9 : pb_ostream_t ostream = pb_ostream_from_buffer( proto_buf, client->nanopb_tx_max - sizeof(fd_grpc_hdr_t) );
479 9 : if( FD_UNLIKELY( !pb_encode( &ostream, fields, message ) ) ) {
480 0 : FD_LOG_WARNING(( "Failed to encode Protobuf message (%.*s). This is a bug (insufficient buffer space?)", (int)path_len, path ));
481 0 : return NULL;
482 0 : }
483 9 : ulong const serialized_sz = ostream.bytes_written;
484 :
485 : /* Create gRPC length prefix */
486 9 : fd_grpc_hdr_t hdr = {
487 9 : .compressed=0,
488 9 : .msg_sz=fd_uint_bswap( (uint)serialized_sz )
489 9 : };
490 9 : memcpy( client->nanopb_tx, &hdr, sizeof(fd_grpc_hdr_t) );
491 9 : ulong const payload_sz = serialized_sz + sizeof(fd_grpc_hdr_t);
492 :
493 : /* Allocate stream descriptor */
494 9 : fd_grpc_h2_stream_t * stream = fd_grpc_client_stream_acquire( client, request_ctx );
495 9 : uint const stream_id = stream->s.stream_id;
496 :
497 : /* Write HTTP/2 request headers */
498 9 : fd_h2_tx_prepare( client->conn, client->frame_tx, FD_H2_FRAME_TYPE_HEADERS, FD_H2_FLAG_END_HEADERS, stream_id );
499 9 : fd_grpc_req_hdrs_t req_meta = {
500 9 : .host = client->host,
501 9 : .host_len = client->host_len,
502 9 : .port = client->port,
503 9 : .path = path,
504 9 : .path_len = path_len,
505 9 : .https = 1, /* grpc_client assumes TLS encryption for now */
506 :
507 9 : .bearer_auth = auth_token,
508 9 : .bearer_auth_len = auth_token_sz
509 9 : };
510 9 : if( FD_UNLIKELY( !fd_grpc_h2_gen_request_hdrs(
511 9 : &req_meta,
512 9 : client->frame_tx,
513 9 : client->version,
514 9 : client->version_len
515 9 : ) ) ) {
516 0 : FD_LOG_WARNING(( "Failed to generate gRPC request headers (%.*s). This is a bug", (int)path_len, path ));
517 0 : fd_grpc_client_stream_release( client, stream );
518 0 : return NULL;
519 0 : }
520 9 : fd_h2_tx_commit( client->conn, client->frame_tx );
521 :
522 : /* Queue request payload for send
523 : (Protobuf message might have to be fragmented into multiple HTTP/2
524 : DATA frames if the client gets blocked)
525 : For streaming requests, don't set END_STREAM flag yet */
526 9 : uint flags = is_streaming ? 0U : FD_H2_FLAG_END_STREAM;
527 9 : fd_h2_tx_op_init( client->request_tx_op, client->nanopb_tx, payload_sz, flags );
528 9 : fd_grpc_client_request_continue1( client );
529 9 : client->metrics->requests_sent++;
530 9 : client->metrics->streams_active++;
531 :
532 9 : FD_LOG_DEBUG(( "gRPC request path=%.*s sz=%lu streaming=%d", (int)path_len, path, serialized_sz, is_streaming ));
533 :
534 9 : return stream;
535 9 : }
536 :
537 : fd_grpc_h2_stream_t *
538 : fd_grpc_client_request_start1(
539 : fd_grpc_client_t * client,
540 : char const * path,
541 : ulong path_len,
542 : ulong request_ctx,
543 : uchar const * protobuf,
544 : ulong protobuf_sz,
545 : char const * auth_token,
546 : ulong auth_token_sz,
547 : int is_streaming
548 0 : ) {
549 0 : if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( client ) ) ) return NULL;
550 :
551 0 : int const headers_only = (protobuf==NULL);
552 0 : if( FD_UNLIKELY( headers_only && !is_streaming ) ) {
553 0 : FD_LOG_WARNING(( "headers-only request requires is_streaming (path %.*s). This is a bug", (int)path_len, path ));
554 0 : return NULL;
555 0 : }
556 :
557 : /* Validate protobuf size */
558 0 : FD_TEST( client->nanopb_tx_max > sizeof(fd_grpc_hdr_t) );
559 0 : ulong const max_proto_sz = client->nanopb_tx_max - sizeof(fd_grpc_hdr_t);
560 0 : if( FD_UNLIKELY( protobuf_sz > max_proto_sz ) ) {
561 0 : FD_LOG_WARNING(( "Protobuf message too large (%lu bytes) for path (%.*s). Max size is %lu bytes", protobuf_sz, (int)path_len, path, max_proto_sz ));
562 0 : return NULL;
563 0 : }
564 :
565 0 : if( FD_LIKELY( !headers_only ) ) {
566 : /* Copy protobuf to buffer after gRPC header */
567 0 : uchar * proto_buf = client->nanopb_tx + sizeof(fd_grpc_hdr_t);
568 0 : memcpy( proto_buf, protobuf, protobuf_sz );
569 :
570 : /* Create gRPC length prefix */
571 0 : fd_grpc_hdr_t hdr = {
572 0 : .compressed=0,
573 0 : .msg_sz=fd_uint_bswap( (uint)protobuf_sz )
574 0 : };
575 0 : memcpy( client->nanopb_tx, &hdr, sizeof(fd_grpc_hdr_t) );
576 0 : }
577 0 : ulong const payload_sz = protobuf_sz + sizeof(fd_grpc_hdr_t);
578 :
579 : /* Allocate stream descriptor */
580 0 : fd_grpc_h2_stream_t * stream = fd_grpc_client_stream_acquire( client, request_ctx );
581 0 : uint const stream_id = stream->s.stream_id;
582 :
583 : /* Write HTTP/2 request headers */
584 0 : fd_h2_tx_prepare( client->conn, client->frame_tx, FD_H2_FRAME_TYPE_HEADERS, FD_H2_FLAG_END_HEADERS, stream_id );
585 0 : fd_grpc_req_hdrs_t req_meta = {
586 0 : .host = client->host,
587 0 : .host_len = client->host_len,
588 0 : .port = client->port,
589 0 : .path = path,
590 0 : .path_len = path_len,
591 0 : .https = 1, /* grpc_client assumes TLS encryption for now */
592 :
593 0 : .bearer_auth = auth_token,
594 0 : .bearer_auth_len = auth_token_sz
595 0 : };
596 0 : if( FD_UNLIKELY( !fd_grpc_h2_gen_request_hdrs(
597 0 : &req_meta,
598 0 : client->frame_tx,
599 0 : client->version,
600 0 : client->version_len
601 0 : ) ) ) {
602 0 : FD_LOG_WARNING(( "Failed to generate gRPC request headers (%.*s). This is a bug", (int)path_len, path ));
603 0 : fd_grpc_client_stream_release( client, stream );
604 0 : return NULL;
605 0 : }
606 0 : fd_h2_tx_commit( client->conn, client->frame_tx );
607 :
608 0 : client->metrics->streams_active++;
609 :
610 0 : if( FD_UNLIKELY( headers_only ) ) {
611 0 : client->request_stream = NULL;
612 0 : return stream;
613 0 : }
614 :
615 : /* Queue request payload for send
616 : (Protobuf message might have to be fragmented into multiple HTTP/2
617 : DATA frames if the client gets blocked)
618 : For streaming requests, don't set END_STREAM flag yet */
619 0 : uint flags = is_streaming ? 0U : FD_H2_FLAG_END_STREAM;
620 0 : fd_h2_tx_op_init( client->request_tx_op, client->nanopb_tx, payload_sz, flags );
621 0 : fd_grpc_client_request_continue1( client );
622 0 : client->metrics->requests_sent++;
623 :
624 0 : FD_LOG_DEBUG(( "gRPC request path=%.*s sz=%lu streaming=%d", (int)path_len, path, protobuf_sz, is_streaming ));
625 :
626 0 : return stream;
627 0 : }
628 :
629 : int
630 : fd_grpc_client_stream_send_msg(
631 : fd_grpc_client_t * client,
632 : fd_grpc_h2_stream_t * stream,
633 : pb_msgdesc_t const * fields,
634 : void const * message
635 0 : ) {
636 0 : if( FD_UNLIKELY( !client || !stream ) ) return 0;
637 0 : if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD ) ) return 0;
638 0 : if( FD_UNLIKELY( !fd_h2_rbuf_is_empty( client->frame_tx ) ) ) return 0;
639 0 : if( FD_UNLIKELY( client->request_tx_op->chunk_sz > 0UL ) ) return 0;
640 0 : if( FD_UNLIKELY( client->request_stream != NULL && client->request_stream != stream ) ) return 0; /* Another stream has a request in progress */
641 :
642 : /* Encode message */
643 0 : FD_TEST( client->nanopb_tx_max > sizeof(fd_grpc_hdr_t) );
644 0 : uchar * proto_buf = client->nanopb_tx + sizeof(fd_grpc_hdr_t);
645 0 : pb_ostream_t ostream = pb_ostream_from_buffer( proto_buf, client->nanopb_tx_max - sizeof(fd_grpc_hdr_t) );
646 0 : if( FD_UNLIKELY( !pb_encode( &ostream, fields, message ) ) ) {
647 0 : FD_LOG_WARNING(( "Failed to encode Protobuf message for stream_id=%u. This is a bug (insufficient buffer space?)", stream->s.stream_id ));
648 0 : return 0;
649 0 : }
650 0 : ulong const serialized_sz = ostream.bytes_written;
651 :
652 : /* Create gRPC length prefix */
653 0 : fd_grpc_hdr_t hdr = {
654 0 : .compressed=0,
655 0 : .msg_sz=fd_uint_bswap( (uint)serialized_sz )
656 0 : };
657 0 : memcpy( client->nanopb_tx, &hdr, sizeof(fd_grpc_hdr_t) );
658 0 : ulong const payload_sz = serialized_sz + sizeof(fd_grpc_hdr_t);
659 :
660 : /* Queue message payload for send (without END_STREAM flag) */
661 0 : client->request_stream = stream;
662 0 : fd_h2_tx_op_init( client->request_tx_op, client->nanopb_tx, payload_sz, 0U );
663 0 : fd_grpc_client_request_continue1( client );
664 :
665 0 : FD_LOG_DEBUG(( "gRPC stream_send_msg stream_id=%u sz=%lu", stream->s.stream_id, serialized_sz ));
666 :
667 0 : return 1;
668 0 : }
669 :
670 : int
671 : fd_grpc_client_stream_send_msg1(
672 : fd_grpc_client_t * client,
673 : fd_grpc_h2_stream_t * stream,
674 : uchar const * protobuf,
675 : ulong protobuf_sz
676 0 : ) {
677 0 : if( FD_UNLIKELY( !client || !stream ) ) return 0;
678 0 : if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD ) ) return 0;
679 0 : if( FD_UNLIKELY( !fd_h2_rbuf_is_empty( client->frame_tx ) ) ) return 0;
680 0 : if( FD_UNLIKELY( client->request_tx_op->chunk_sz > 0UL ) ) return 0;
681 0 : if( FD_UNLIKELY( client->request_stream != NULL && client->request_stream != stream ) ) return 0; /* Another stream has a request in progress */
682 :
683 : /* Validate protobuf size */
684 0 : FD_TEST( client->nanopb_tx_max > sizeof(fd_grpc_hdr_t) );
685 0 : ulong const max_proto_sz = client->nanopb_tx_max - sizeof(fd_grpc_hdr_t);
686 0 : if( FD_UNLIKELY( protobuf_sz > max_proto_sz ) ) {
687 0 : FD_LOG_WARNING(( "Protobuf message too large (%lu bytes) for stream_id=%u. Max size is %lu bytes", protobuf_sz, stream->s.stream_id, max_proto_sz ));
688 0 : return 0;
689 0 : }
690 :
691 : /* Copy protobuf to buffer after gRPC header */
692 0 : uchar * proto_buf = client->nanopb_tx + sizeof(fd_grpc_hdr_t);
693 0 : memcpy( proto_buf, protobuf, protobuf_sz );
694 :
695 : /* Create gRPC length prefix */
696 0 : fd_grpc_hdr_t hdr = {
697 0 : .compressed=0,
698 0 : .msg_sz=fd_uint_bswap( (uint)protobuf_sz )
699 0 : };
700 0 : memcpy( client->nanopb_tx, &hdr, sizeof(fd_grpc_hdr_t) );
701 0 : ulong const payload_sz = protobuf_sz + sizeof(fd_grpc_hdr_t);
702 :
703 : /* Queue message payload for send (without END_STREAM flag) */
704 0 : client->request_stream = stream;
705 0 : fd_h2_tx_op_init( client->request_tx_op, client->nanopb_tx, payload_sz, 0U );
706 0 : fd_grpc_client_request_continue1( client );
707 :
708 0 : FD_LOG_DEBUG(( "gRPC stream_send_msg stream_id=%u sz=%lu", stream->s.stream_id, protobuf_sz ));
709 :
710 0 : return 1;
711 0 : }
712 :
713 : int
714 : fd_grpc_client_stream_close(
715 : fd_grpc_client_t * client,
716 : fd_grpc_h2_stream_t * stream
717 0 : ) {
718 0 : if( FD_UNLIKELY( !client || !stream ) ) return 0;
719 0 : if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD ) ) return 0;
720 0 : if( FD_UNLIKELY( !fd_h2_rbuf_is_empty( client->frame_tx ) ) ) return 0;
721 0 : if( FD_UNLIKELY( client->request_stream != NULL ) ) return 0; /* Another request in progress */
722 :
723 : /* Send empty DATA frame with END_STREAM flag to close the stream */
724 0 : fd_h2_tx_prepare( client->conn, client->frame_tx, FD_H2_FRAME_TYPE_DATA, FD_H2_FLAG_END_STREAM, stream->s.stream_id );
725 0 : fd_h2_tx_commit( client->conn, client->frame_tx );
726 :
727 0 : FD_LOG_DEBUG(( "gRPC stream_close stream_id=%u", stream->s.stream_id ));
728 :
729 0 : return 1;
730 0 : }
731 :
732 : void
733 : fd_grpc_client_deadline_set( fd_grpc_h2_stream_t * stream,
734 : int deadline_kind,
735 9 : long ts_nanos ) {
736 9 : switch( deadline_kind ) {
737 6 : case FD_GRPC_DEADLINE_HEADER:
738 6 : stream->header_deadline_nanos = ts_nanos;
739 6 : stream->has_header_deadline = 1;
740 6 : break;
741 3 : case FD_GRPC_DEADLINE_RX_END:
742 3 : stream->rx_end_deadline_nanos = ts_nanos;
743 3 : stream->has_rx_end_deadline = 1;
744 3 : break;
745 9 : }
746 9 : }
747 :
748 : /* Lookup stream by ID */
749 :
750 : static fd_h2_stream_t *
751 : fd_grpc_h2_stream_query( fd_h2_conn_t * conn,
752 6 : uint stream_id ) {
753 6 : fd_grpc_client_t * client = conn->ctx;
754 6 : for( ulong i=0UL; i<client->stream_cnt; i++ ) {
755 3 : if( client->stream_ids[ i ] == stream_id ) {
756 3 : return &client->streams[ i ]->s;
757 3 : }
758 3 : }
759 3 : return NULL;
760 6 : }
761 :
762 : static void
763 0 : fd_grpc_h2_conn_established( fd_h2_conn_t * conn ) {
764 0 : fd_grpc_client_t * client = conn->ctx;
765 0 : client->h2_hs_done = 1;
766 0 : client->callbacks->conn_established( client->ctx );
767 0 : }
768 :
769 : static void
770 : fd_grpc_h2_conn_final( fd_h2_conn_t * conn,
771 : uint h2_err,
772 0 : int closed_by ) {
773 0 : fd_grpc_client_t * client = conn->ctx;
774 0 : client->callbacks->conn_dead( client->ctx, h2_err, closed_by );
775 0 : }
776 :
777 : /* React to response data */
778 :
779 : void
780 : fd_grpc_h2_cb_headers(
781 : fd_h2_conn_t * conn,
782 : fd_h2_stream_t * h2_stream,
783 : void const * data,
784 : ulong data_sz,
785 : ulong flags
786 0 : ) {
787 0 : fd_grpc_h2_stream_t * stream = fd_grpc_h2_stream_upcast( h2_stream );
788 0 : fd_grpc_client_t * client = conn->ctx;
789 :
790 0 : int h2_status = fd_grpc_h2_read_response_hdrs( &stream->hdrs, client->matcher, data, data_sz );
791 0 : if( FD_UNLIKELY( h2_status!=FD_H2_SUCCESS ) ) {
792 : /* Failed to parse HTTP/2 headers */
793 0 : fd_h2_stream_error( h2_stream, conn, client->frame_tx, FD_H2_ERR_PROTOCOL );
794 0 : client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs ); /* invalidates stream->hdrs */
795 0 : fd_grpc_client_stream_release( client, stream );
796 0 : return;
797 0 : }
798 :
799 0 : if( !stream->hdrs_received && !!( flags & FD_H2_FLAG_END_HEADERS) ) {
800 : /* Got initial response header */
801 0 : stream->hdrs_received = 1;
802 0 : stream->has_header_deadline = 0;
803 0 : if( FD_LIKELY( ( stream->hdrs.h2_status==200 ) &
804 0 : ( !!stream->hdrs.is_grpc_proto ) ) ) {
805 0 : client->callbacks->rx_start( client->ctx, stream->request_ctx );
806 0 : }
807 0 : }
808 :
809 0 : if( ( flags & (FD_H2_FLAG_END_HEADERS|FD_H2_FLAG_END_STREAM) )
810 0 : ==(FD_H2_FLAG_END_HEADERS|FD_H2_FLAG_END_STREAM) ) {
811 0 : client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs );
812 0 : fd_grpc_client_stream_release( client, stream );
813 0 : return;
814 0 : }
815 0 : }
816 :
817 : void
818 : fd_grpc_h2_cb_data(
819 : fd_h2_conn_t * conn,
820 : fd_h2_stream_t * h2_stream,
821 : void const * data,
822 : ulong data_sz,
823 : ulong flags
824 6 : ) {
825 6 : fd_grpc_client_t * client = conn->ctx;
826 6 : fd_grpc_h2_stream_t * stream = fd_grpc_h2_stream_upcast( h2_stream );
827 6 : if( FD_UNLIKELY( ( stream->hdrs.h2_status!=200 ) |
828 6 : ( !stream->hdrs.is_grpc_proto ) ) ) {
829 0 : if( flags & FD_H2_FLAG_END_STREAM ) {
830 0 : client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs );
831 0 : fd_grpc_client_stream_release( client, stream );
832 0 : }
833 0 : return;
834 0 : }
835 :
836 6 : do {
837 :
838 : /* Read header bytes */
839 6 : if( stream->msg_buf_used < sizeof(fd_grpc_hdr_t) ) {
840 6 : ulong hdr_frag_sz = fd_ulong_min( sizeof(fd_grpc_hdr_t) - stream->msg_buf_used, data_sz );
841 6 : fd_memcpy( stream->msg_buf + stream->msg_buf_used, data, hdr_frag_sz );
842 6 : stream->msg_buf_used += hdr_frag_sz;
843 6 : data = (void const *)( (ulong)data + (ulong)hdr_frag_sz );
844 6 : data_sz -= hdr_frag_sz;
845 6 : if( FD_UNLIKELY( stream->msg_buf_used < sizeof(fd_grpc_hdr_t) ) ) return;
846 :
847 : /* Header complete */
848 6 : stream->msg_sz = fd_uint_bswap( FD_LOAD( uint, (void *)( (ulong)stream->msg_buf+1 ) ) );
849 6 : if( FD_UNLIKELY( sizeof(fd_grpc_hdr_t) + stream->msg_sz > stream->msg_buf_max ) ) {
850 6 : FD_LOG_WARNING(( "Received oversized gRPC message (%lu bytes), killing request", stream->msg_sz ));
851 6 : client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs );
852 6 : fd_h2_stream_error( h2_stream, conn, client->frame_tx, FD_H2_ERR_INTERNAL );
853 6 : fd_grpc_client_stream_release( client, stream );
854 6 : return;
855 6 : }
856 6 : }
857 :
858 : /* Read payload bytes */
859 0 : ulong wmark = sizeof(fd_grpc_hdr_t) + stream->msg_sz;
860 0 : ulong chunk_sz = fd_ulong_min( stream->msg_buf_used+data_sz, wmark ) - stream->msg_buf_used;
861 0 : if( FD_UNLIKELY( chunk_sz>data_sz ) ) FD_LOG_CRIT(( "integer underflow" )); /* unreachable */
862 0 : fd_memcpy( stream->msg_buf + stream->msg_buf_used, data, chunk_sz );
863 0 : stream->msg_buf_used += chunk_sz;
864 0 : data = (void const *)( (ulong)data + (ulong)chunk_sz );
865 0 : data_sz -= chunk_sz;
866 :
867 0 : client->metrics->stream_chunks_rx_cnt++;
868 :
869 0 : if( stream->msg_buf_used >= wmark ) {
870 : /* Data complete */
871 0 : void const * msg_ptr = stream->msg_buf + sizeof(fd_grpc_hdr_t);
872 0 : client->callbacks->rx_msg( client->ctx, msg_ptr, stream->msg_sz, stream->request_ctx );
873 0 : stream->msg_buf_used = 0UL;
874 0 : stream->msg_sz = 0UL;
875 0 : }
876 :
877 0 : } while( data_sz );
878 :
879 0 : if( flags & FD_H2_FLAG_END_STREAM ) {
880 : /* FIXME incomplete gRPC message */
881 0 : if( FD_UNLIKELY( stream->msg_buf_used ) ) {
882 0 : FD_LOG_WARNING(( "Received incomplete gRPC message" ));
883 0 : }
884 0 : client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs );
885 0 : fd_grpc_client_stream_release( client, stream );
886 0 : }
887 0 : }
888 :
889 : /* Server might kill our request */
890 :
891 : static void
892 : fd_grpc_h2_rst_stream( fd_h2_conn_t * conn,
893 : fd_h2_stream_t * h2_stream,
894 : uint error_code,
895 3 : int closed_by ) {
896 3 : if( closed_by==1 ) {
897 3 : FD_LOG_WARNING(( "Server terminated request stream_id=%u (%u-%s)",
898 3 : h2_stream->stream_id, error_code, fd_h2_strerror( error_code ) ));
899 3 : } else {
900 0 : FD_LOG_WARNING(( "Stream failed stream_id=%u (%u-%s)",
901 0 : h2_stream->stream_id, error_code, fd_h2_strerror( error_code ) ));
902 0 : }
903 3 : fd_grpc_client_t * client = conn->ctx;
904 3 : fd_grpc_h2_stream_t * stream = fd_grpc_h2_stream_upcast( h2_stream );
905 3 : client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs ); /* invalidates stream->hdrs */
906 3 : fd_grpc_client_stream_release( client, stream );
907 3 : }
908 :
909 : /* A HTTP/2 flow control change might unblock a queued request send op */
910 :
911 : void
912 : fd_grpc_h2_window_update( fd_h2_conn_t * conn,
913 0 : uint increment ) {
914 0 : (void)increment;
915 0 : fd_grpc_client_request_continue( conn->ctx );
916 0 : }
917 :
918 : void
919 : fd_grpc_h2_stream_window_update( fd_h2_conn_t * conn,
920 : fd_h2_stream_t * stream,
921 0 : uint increment ) {
922 0 : (void)stream; (void)increment;
923 0 : fd_grpc_client_request_continue( conn->ctx );
924 0 : }
925 :
926 : void
927 3 : fd_grpc_h2_ping_ack( fd_h2_conn_t * conn ) {
928 3 : fd_grpc_client_t * client = conn->ctx;
929 3 : client->callbacks->ping_ack( client->ctx );
930 3 : }
931 :
932 : fd_h2_rbuf_t *
933 6 : fd_grpc_client_rbuf_tx( fd_grpc_client_t * client ) {
934 6 : return client->frame_tx;
935 6 : }
936 :
937 : fd_h2_rbuf_t *
938 0 : fd_grpc_client_rbuf_rx( fd_grpc_client_t * client ) {
939 0 : return client->frame_rx;
940 0 : }
941 :
942 : fd_h2_conn_t *
943 264 : fd_grpc_client_h2_conn( fd_grpc_client_t * client ) {
944 264 : return client->conn;
945 264 : }
946 :
947 : /* fd_grpc_client_h2_callbacks specifies h2->grpc_client callbacks.
948 : Stored in .rodata for security. Must be kept in sync with fd_h2 to
949 : avoid NULL pointers. */
950 :
951 : fd_h2_callbacks_t const fd_grpc_client_h2_callbacks = {
952 : .stream_create = fd_h2_noop_stream_create,
953 : .stream_query = fd_grpc_h2_stream_query,
954 : .conn_established = fd_grpc_h2_conn_established,
955 : .conn_final = fd_grpc_h2_conn_final,
956 : .headers = fd_grpc_h2_cb_headers,
957 : .data = fd_grpc_h2_cb_data,
958 : .rst_stream = fd_grpc_h2_rst_stream,
959 : .window_update = fd_grpc_h2_window_update,
960 : .stream_window_update = fd_grpc_h2_stream_window_update,
961 : .ping_ack = fd_grpc_h2_ping_ack,
962 : };
|