Line data Source code
1 : #include "fd_grpc_client.h"
2 : #include "fd_grpc_client_private.h"
3 : #include "../../ballet/nanopb/pb_encode.h" /* pb_msgdesc_t */
4 : #include <sys/socket.h>
5 : #include "../h2/fd_h2_rbuf_sock.h"
6 : #include "fd_grpc_codec.h"
7 : #if FD_HAS_OPENSSL
8 : #include "../openssl/fd_openssl.h"
9 : #include <openssl/ssl.h>
10 : #include <openssl/err.h>
11 : #include "../h2/fd_h2_rbuf_ossl.h"
12 : #endif
13 :
14 : ulong
15 21 : fd_grpc_client_align( void ) {
16 21 : return fd_ulong_max( alignof(fd_grpc_client_t), fd_grpc_h2_stream_pool_align() );
17 21 : }
18 :
19 : ulong
20 6 : fd_grpc_client_footprint( ulong buf_max ) {
21 6 : ulong l = FD_LAYOUT_INIT;
22 6 : l = FD_LAYOUT_APPEND( l, alignof(fd_grpc_client_t), sizeof(fd_grpc_client_t) );
23 6 : l = FD_LAYOUT_APPEND( l, 1UL, buf_max ); /* nanopb_tx */
24 6 : l = FD_LAYOUT_APPEND( l, 1UL, buf_max ); /* frame_scratch */
25 6 : l = FD_LAYOUT_APPEND( l, 1UL, buf_max ); /* frame_rx_buf */
26 6 : l = FD_LAYOUT_APPEND( l, 1UL, buf_max ); /* frame_tx_buf */
27 6 : l = FD_LAYOUT_APPEND( l, fd_grpc_h2_stream_pool_align(), fd_grpc_h2_stream_pool_footprint( FD_GRPC_CLIENT_MAX_STREAMS ) );
28 6 : l = FD_LAYOUT_APPEND( l, 1UL, buf_max*FD_GRPC_CLIENT_MAX_STREAMS );
29 6 : return FD_LAYOUT_FINI( l, fd_grpc_client_align() );
30 6 : }
31 :
32 : static void
33 36 : fd_grpc_h2_stream_reset( fd_grpc_h2_stream_t * stream ) {
34 36 : memset( &stream->s, 0, sizeof(fd_h2_stream_t) );
35 36 : stream->request_ctx = 0UL;
36 36 : memset( &stream->hdrs, 0, sizeof(fd_grpc_resp_hdrs_t) );
37 36 : stream->hdrs.grpc_status = FD_GRPC_STATUS_UNKNOWN;
38 36 : stream->hdrs_received = 0;
39 36 : stream->msg_buf_used = 0UL;
40 36 : stream->msg_sz = 0UL;
41 36 : stream->has_header_deadline = 0;
42 36 : stream->has_rx_end_deadline = 0;
43 36 : }
44 :
45 : fd_grpc_client_t *
46 : fd_grpc_client_new( void * mem,
47 : fd_grpc_client_callbacks_t const * callbacks,
48 : fd_grpc_client_metrics_t * metrics,
49 : void * app_ctx,
50 : ulong buf_max,
51 3 : ulong rng_seed ) {
52 3 : if( FD_UNLIKELY( !mem ) ) {
53 0 : FD_LOG_WARNING(( "NULL mem" ));
54 0 : return NULL;
55 0 : }
56 3 : if( FD_UNLIKELY( buf_max<4096UL ) ) {
57 0 : FD_LOG_WARNING(( "undersz buf_max" ));
58 0 : return NULL;
59 0 : }
60 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, fd_grpc_client_align() ) ) ) {
61 0 : FD_LOG_WARNING(( "unaligned mem" ));
62 0 : return NULL;
63 0 : }
64 :
65 3 : FD_SCRATCH_ALLOC_INIT( l, mem );
66 3 : void * client_mem = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_grpc_client_t), sizeof(fd_grpc_client_t) );
67 3 : void * nanopb_tx = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max ); /* nanopb_tx */
68 3 : void * frame_scratch = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max ); /* frame_scratch */
69 3 : void * frame_rx_buf = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max ); /* frame_rx_buf */
70 3 : void * frame_tx_buf = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max ); /* frame_tx_buf */
71 3 : void * stream_pool_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_grpc_h2_stream_pool_align(), fd_grpc_h2_stream_pool_footprint( FD_GRPC_CLIENT_MAX_STREAMS ) );
72 3 : void * stream_buf_mem = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max*FD_GRPC_CLIENT_MAX_STREAMS );
73 3 : ulong end = FD_SCRATCH_ALLOC_FINI( l, fd_grpc_client_align() );
74 3 : FD_TEST( end-(ulong)mem == fd_grpc_client_footprint( buf_max ) );
75 :
76 3 : fd_grpc_client_t * client = client_mem;
77 :
78 3 : fd_grpc_h2_stream_t * stream_pool =
79 3 : fd_grpc_h2_stream_pool_join( fd_grpc_h2_stream_pool_new( stream_pool_mem, FD_GRPC_CLIENT_MAX_STREAMS ) );
80 3 : if( FD_UNLIKELY( !stream_pool ) ) FD_LOG_CRIT(( "Failed to create stream pool" )); /* unreachable */
81 :
82 3 : *client = (fd_grpc_client_t){
83 3 : .callbacks = callbacks,
84 3 : .ctx = app_ctx,
85 3 : .stream_pool = stream_pool,
86 3 : .stream_bufs = stream_buf_mem,
87 3 : .nanopb_tx = nanopb_tx,
88 3 : .nanopb_tx_max = buf_max,
89 3 : .frame_scratch = frame_scratch,
90 3 : .frame_scratch_max = buf_max,
91 3 : .frame_rx_buf = frame_rx_buf,
92 3 : .frame_rx_buf_max = buf_max,
93 3 : .frame_tx_buf = frame_tx_buf,
94 3 : .frame_tx_buf_max = buf_max,
95 3 : .metrics = metrics
96 3 : };
97 :
98 : /* FIXME for performance, cache this? */
99 3 : fd_h2_hdr_matcher_init( client->matcher, rng_seed );
100 3 : fd_h2_hdr_matcher_insert_literal( client->matcher, FD_GRPC_HDR_STATUS, "grpc-status" );
101 3 : fd_h2_hdr_matcher_insert_literal( client->matcher, FD_GRPC_HDR_MESSAGE, "grpc-message" );
102 :
103 3 : client->version_len = 5;
104 3 : memcpy( client->version, "0.0.0", 5 );
105 :
106 27 : for( ulong i=0UL; i<FD_GRPC_CLIENT_MAX_STREAMS; i++ ) {
107 24 : fd_grpc_h2_stream_t * stream = &client->stream_pool[ i ];
108 24 : stream->msg_buf = (uchar *)stream_buf_mem + (i*buf_max);
109 24 : stream->msg_buf_max = buf_max;
110 24 : FD_TEST( (ulong)( stream->msg_buf + stream->msg_buf_max )<=end );
111 24 : }
112 3 : fd_grpc_client_reset( client );
113 :
114 3 : return client;
115 3 : }
116 :
117 : void *
118 3 : fd_grpc_client_delete( fd_grpc_client_t * client ) {
119 3 : return client;
120 3 : }
121 :
122 : void
123 : fd_grpc_client_set_version( fd_grpc_client_t * client,
124 : char const * version,
125 0 : ulong version_len ) {
126 0 : if( FD_UNLIKELY( version_len > FD_GRPC_CLIENT_VERSION_LEN_MAX ) ) {
127 0 : FD_LOG_WARNING(( "Version string too long (%lu chars), ignoring", version_len ));
128 0 : return;
129 0 : }
130 0 : client->version_len = (uchar)version_len;
131 0 : memcpy( client->version, version, version_len );
132 0 : }
133 :
134 : void
135 : fd_grpc_client_set_authority( fd_grpc_client_t * client,
136 : char const * host,
137 : ulong host_len,
138 0 : ushort port ) {
139 0 : host_len = fd_ulong_min( host_len, sizeof(client->host)-1 );
140 0 : fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( client->host ), host, host_len ) );
141 0 : client->host_len = (uchar)host_len;
142 0 : client->port = (ushort)port;
143 0 : }
144 :
145 : int
146 12 : fd_grpc_client_stream_acquire_is_safe( fd_grpc_client_t * client ) {
147 : /* Sufficient quota to start a stream? */
148 12 : if( FD_UNLIKELY( client->conn->stream_active_cnt[1]+1 > client->conn->peer_settings.max_concurrent_streams ) ) {
149 0 : return 0;
150 0 : }
151 :
152 : /* Free stream object available? */
153 12 : if( FD_UNLIKELY( !fd_grpc_h2_stream_pool_free( client->stream_pool ) ) ) {
154 0 : return 0;
155 0 : }
156 12 : if( FD_UNLIKELY( client->stream_cnt >= FD_GRPC_CLIENT_MAX_STREAMS ) ) {
157 0 : return 0;
158 0 : }
159 :
160 12 : return 1;
161 12 : }
162 :
163 : fd_grpc_h2_stream_t *
164 : fd_grpc_client_stream_acquire( fd_grpc_client_t * client,
165 36 : ulong request_ctx ) {
166 36 : if( FD_UNLIKELY( client->stream_cnt >= FD_GRPC_CLIENT_MAX_STREAMS ) ) {
167 0 : FD_LOG_CRIT(( "stream pool exhausted" ));
168 0 : }
169 :
170 36 : fd_h2_conn_t * conn = client->conn;
171 36 : uint const stream_id = client->conn->tx_stream_next;
172 36 : conn->tx_stream_next += 2U;
173 :
174 36 : fd_grpc_h2_stream_t * stream = fd_grpc_h2_stream_pool_ele_acquire( client->stream_pool );
175 36 : fd_grpc_h2_stream_reset( stream );
176 36 : stream->request_ctx = request_ctx;
177 :
178 36 : fd_h2_stream_open( fd_h2_stream_init( &stream->s ), conn, stream_id );
179 36 : client->request_stream = stream;
180 36 : client->stream_ids[ client->stream_cnt ] = stream_id;
181 36 : client->streams [ client->stream_cnt ] = stream;
182 36 : client->stream_cnt++;
183 :
184 36 : return stream;
185 36 : }
186 :
187 : void
188 : fd_grpc_client_stream_release( fd_grpc_client_t * client,
189 36 : fd_grpc_h2_stream_t * stream ) {
190 36 : if( FD_UNLIKELY( !client->stream_cnt ) ) FD_LOG_CRIT(( "stream map corrupt" )); /* unreachable */
191 :
192 : /* Deallocate tx_op */
193 36 : if( FD_UNLIKELY( stream == client->request_stream ) ) {
194 24 : client->request_stream = NULL;
195 24 : *client->request_tx_op = (fd_h2_tx_op_t){0};
196 24 : }
197 :
198 : /* Remove stream from map */
199 36 : int map_idx = -1;
200 90 : for( uint i=0UL; i<(client->stream_cnt); i++ ) {
201 54 : if( client->stream_ids[ i ] == stream->s.stream_id ) {
202 36 : map_idx = (int)i;
203 36 : }
204 54 : }
205 36 : if( FD_UNLIKELY( map_idx<0 ) ) FD_LOG_CRIT(( "stream map corrupt" )); /* unreachable */
206 36 : if( (ulong)map_idx+1 < client->stream_cnt ) {
207 6 : client->stream_ids[ map_idx ] = client->stream_ids[ client->stream_cnt-1 ];
208 6 : client->streams [ map_idx ] = client->streams [ client->stream_cnt-1 ];
209 6 : }
210 36 : client->stream_cnt--;
211 :
212 36 : fd_grpc_h2_stream_pool_ele_release( client->stream_pool, stream );
213 36 : }
214 :
215 : void
216 18 : fd_grpc_client_reset( fd_grpc_client_t * client ) {
217 18 : fd_h2_rbuf_init( client->frame_rx, client->frame_rx_buf, client->frame_rx_buf_max );
218 18 : fd_h2_rbuf_init( client->frame_tx, client->frame_tx_buf, client->frame_tx_buf_max );
219 18 : fd_h2_conn_init_client( client->conn );
220 18 : client->conn->ctx = client;
221 18 : client->h2_hs_done = 0;
222 18 : client->ssl_hs_done = 0;
223 18 : client->request_stream = NULL;
224 18 : *client->request_tx_op = (fd_h2_tx_op_t){0};
225 :
226 : /* Disable RX flow control */
227 18 : client->conn->self_settings.initial_window_size = (1U<<31)-1U;
228 18 : client->conn->rx_wnd_max = (1U<<31)-1U;
229 18 : client->conn->rx_wnd_wmark = client->conn->rx_wnd_max - (1U<<20);
230 :
231 : /* Free all stream objects */
232 21 : while( client->stream_cnt ) {
233 3 : fd_grpc_h2_stream_t * stream = client->streams[ client->stream_cnt-1 ];
234 3 : fd_grpc_client_stream_release( client, stream );
235 3 : }
236 18 : }
237 :
238 : /* fd_grpc_client_send_stream_quota writes a WINDOW_UPDATE frame, which
239 : eventually allows the peer to send more data bytes. */
240 :
241 : static void
242 : fd_grpc_client_send_stream_quota( fd_h2_rbuf_t * rbuf_tx,
243 : fd_grpc_h2_stream_t * stream,
244 3 : uint bump ) {
245 3 : fd_h2_window_update_t window_update = {
246 3 : .hdr = {
247 3 : .typlen = fd_h2_frame_typlen( FD_H2_FRAME_TYPE_WINDOW_UPDATE, 4UL ),
248 3 : .r_stream_id = fd_uint_bswap( stream->s.stream_id )
249 3 : },
250 3 : .increment = fd_uint_bswap( bump )
251 3 : };
252 3 : fd_h2_rbuf_push( rbuf_tx, &window_update, sizeof(fd_h2_window_update_t) );
253 3 : stream->s.rx_wnd += bump;
254 3 : }
255 :
256 : /* fd_grpc_client_send_timeout is called when a stream timeout triggers.
257 : Calls back to the user, writes a RST_STREAM frame, and frees the
258 : stream object. */
259 :
260 : static void
261 : fd_grpc_client_send_timeout( fd_h2_rbuf_t * rbuf_tx,
262 : fd_grpc_client_t * client,
263 : fd_grpc_h2_stream_t * stream,
264 6 : int deadline_kind ) {
265 6 : client->callbacks->rx_timeout( client->ctx, stream->request_ctx, deadline_kind );
266 6 : fd_h2_tx_rst_stream( rbuf_tx, stream->s.stream_id, FD_H2_ERR_CANCEL );
267 6 : fd_grpc_client_stream_release( client, stream );
268 6 : }
269 :
270 : void
271 : fd_grpc_client_service_streams( fd_grpc_client_t * client,
272 18 : long ts_nanos ) {
273 18 : ulong const meta_frame_max =
274 18 : fd_ulong_max( sizeof(fd_h2_window_update_t), sizeof(fd_h2_rst_stream_t) );
275 18 : fd_h2_conn_t * conn = client->conn;
276 18 : fd_h2_rbuf_t * rbuf_tx = client->frame_tx;
277 18 : if( FD_UNLIKELY( conn->flags ) ) return;
278 18 : uint const wnd_max = conn->self_settings.initial_window_size;
279 18 : uint const wnd_thres = wnd_max / 2;
280 36 : for( ulong i=0UL; i<(client->stream_cnt); i++ ) {
281 18 : if( FD_UNLIKELY( fd_h2_rbuf_free_sz( rbuf_tx )<meta_frame_max ) ) break;
282 18 : fd_grpc_h2_stream_t * stream = client->streams[ i ];
283 :
284 18 : if( FD_UNLIKELY( ( stream->has_header_deadline ) &
285 18 : ( stream->header_deadline_nanos - ts_nanos <= 0L ) ) ) {
286 3 : fd_grpc_client_send_timeout( rbuf_tx, client, stream, FD_GRPC_DEADLINE_HEADER );
287 3 : i--; /* stream removed */
288 3 : continue;
289 3 : }
290 :
291 15 : if( FD_UNLIKELY( ( stream->has_rx_end_deadline ) &
292 15 : ( stream->rx_end_deadline_nanos - ts_nanos <= 0L ) ) ) {
293 3 : fd_grpc_client_send_timeout( rbuf_tx, client, stream, FD_GRPC_DEADLINE_RX_END );
294 3 : i--; /* stream removed */
295 3 : continue;
296 3 : }
297 :
298 12 : if( FD_UNLIKELY( stream->s.rx_wnd < wnd_thres ) ) {
299 3 : uint const bump = wnd_max - stream->s.rx_wnd;
300 3 : fd_grpc_client_send_stream_quota( rbuf_tx, stream, bump );
301 3 : }
302 12 : }
303 18 : }
304 :
305 : #if FD_HAS_OPENSSL
306 :
307 : static int
308 : fd_ossl_log_error( char const * str,
309 : ulong len,
310 0 : void * ctx ) {
311 0 : (void)ctx;
312 0 : if( len>0 && str[ len-1 ]=='\n' ) len--;
313 0 : FD_LOG_INFO(( "%.*s", (int)len, str ));
314 0 : return 0;
315 0 : }
316 :
317 : int
318 : fd_grpc_client_rxtx_ossl( fd_grpc_client_t * client,
319 : SSL * ssl,
320 0 : int * charge_busy ) {
321 0 : if( FD_UNLIKELY( !client->ssl_hs_done ) ) {
322 0 : int res = SSL_do_handshake( ssl );
323 0 : if( res<=0 ) {
324 0 : int error = SSL_get_error( ssl, res );
325 0 : if( FD_LIKELY( error==SSL_ERROR_WANT_READ || error==SSL_ERROR_WANT_WRITE ) ) return 1;
326 0 : FD_LOG_INFO(( "SSL_do_handshake failed (%i-%s)", error, fd_openssl_ssl_strerror( error ) ));
327 0 : long verify_result = SSL_get_verify_result( ssl );
328 0 : if( error == SSL_ERROR_SSL && verify_result != X509_V_OK ) {
329 0 : FD_LOG_WARNING(( "Certificate verification failed: %s", X509_verify_cert_error_string( verify_result ) ));
330 0 : }
331 0 : ERR_print_errors_cb( fd_ossl_log_error, NULL );
332 0 : return 0;
333 0 : } else {
334 0 : client->ssl_hs_done = 1;
335 0 : }
336 0 : }
337 :
338 0 : fd_h2_conn_t * conn = client->conn;
339 0 : int ssl_err = 0;
340 0 : ulong read_sz = fd_h2_rbuf_ssl_read( client->frame_rx, ssl, &ssl_err );
341 0 : if( FD_UNLIKELY( ssl_err && ssl_err!=SSL_ERROR_WANT_READ ) ) {
342 0 : if( ssl_err==SSL_ERROR_ZERO_RETURN ) {
343 0 : FD_LOG_WARNING(( "gRPC server closed connection" ));
344 0 : return 0;
345 0 : }
346 0 : FD_LOG_WARNING(( "SSL_read_ex failed (%i-%s)", ssl_err, fd_openssl_ssl_strerror( ssl_err ) ));
347 0 : ERR_print_errors_cb( fd_ossl_log_error, NULL );
348 0 : return 0;
349 0 : }
350 0 : if( FD_UNLIKELY( conn->flags ) ) fd_h2_tx_control( conn, client->frame_tx, &fd_grpc_client_h2_callbacks );
351 0 : fd_h2_rx( conn, client->frame_rx, client->frame_tx, client->frame_scratch, client->frame_scratch_max, &fd_grpc_client_h2_callbacks );
352 0 : fd_grpc_client_service_streams( client, fd_log_wallclock() );
353 0 : ulong write_sz = fd_h2_rbuf_ssl_write( client->frame_tx, ssl );
354 :
355 0 : if( read_sz!=0 || write_sz!=0 ) *charge_busy = 1;
356 0 : return 1;
357 0 : }
358 :
359 : #endif /* FD_HAS_OPENSSL */
360 :
361 : #if FD_H2_HAS_SOCKETS
362 :
363 : int
364 : fd_grpc_client_rxtx_socket( fd_grpc_client_t * client,
365 : int sock_fd,
366 0 : int * charge_busy ) {
367 0 : fd_h2_conn_t * conn = client->conn;
368 0 : ulong const frame_rx_lo_0 = client->frame_rx->lo_off;
369 0 : ulong const frame_rx_hi_0 = client->frame_rx->hi_off;
370 0 : ulong const frame_tx_lo_1 = client->frame_tx->lo_off;
371 0 : ulong const frame_tx_hi_1 = client->frame_tx->hi_off;
372 :
373 0 : int rx_err = fd_h2_rbuf_recvmsg( client->frame_rx, sock_fd, MSG_NOSIGNAL|MSG_DONTWAIT );
374 0 : if( FD_UNLIKELY( rx_err ) ) {
375 0 : FD_LOG_INFO(( "Disconnected: recvmsg error (%i-%s)", rx_err, fd_io_strerror( rx_err ) ));
376 0 : return 0;
377 0 : }
378 :
379 0 : if( FD_UNLIKELY( conn->flags ) ) fd_h2_tx_control( conn, client->frame_tx, &fd_grpc_client_h2_callbacks );
380 0 : fd_h2_rx( conn, client->frame_rx, client->frame_tx, client->frame_scratch, client->frame_scratch_max, &fd_grpc_client_h2_callbacks );
381 0 : fd_grpc_client_service_streams( client, fd_log_wallclock() );
382 :
383 0 : int tx_err = fd_h2_rbuf_sendmsg( client->frame_tx, sock_fd, MSG_NOSIGNAL|MSG_DONTWAIT );
384 0 : if( FD_UNLIKELY( tx_err ) ) {
385 0 : FD_LOG_WARNING(( "fd_h2_rbuf_sendmsg failed (%i-%s)", tx_err, fd_io_strerror( tx_err ) ));
386 0 : return 0;
387 0 : }
388 :
389 0 : ulong const frame_rx_lo_1 = client->frame_rx->lo_off;
390 0 : ulong const frame_rx_hi_1 = client->frame_rx->hi_off;
391 0 : ulong const frame_tx_lo_0 = client->frame_tx->lo_off;
392 0 : ulong const frame_tx_hi_0 = client->frame_tx->hi_off;
393 :
394 0 : if( frame_rx_lo_0!=frame_rx_lo_1 || frame_rx_hi_0!=frame_rx_hi_1 ||
395 0 : frame_tx_lo_0!=frame_tx_lo_1 || frame_tx_hi_0!=frame_tx_hi_1 ) {
396 0 : *charge_busy = 1;
397 0 : }
398 :
399 0 : return 1;
400 0 : }
401 :
402 : #endif /* FD_H2_HAS_SOCKETS */
403 :
404 : /* fd_grpc_client_request continue attempts to write a request data
405 : frame. */
406 :
407 : static int
408 0 : fd_grpc_client_request_continue1( fd_grpc_client_t * client ) {
409 0 : fd_grpc_h2_stream_t * stream = client->request_stream;
410 0 : fd_h2_stream_t * h2_stream = &stream->s;
411 0 : fd_h2_tx_op_copy( client->conn, h2_stream, client->frame_tx, client->request_tx_op );
412 0 : if( FD_UNLIKELY( client->request_tx_op->chunk_sz ) ) return 0;
413 0 : if( FD_UNLIKELY( h2_stream->state != FD_H2_STREAM_STATE_CLOSING_TX ) ) return 0;
414 0 : client->metrics->stream_chunks_tx_cnt++;
415 : /* Request finished */
416 0 : client->request_stream = NULL;
417 0 : client->callbacks->tx_complete( client->ctx, stream->request_ctx );
418 0 : return 1;
419 0 : }
420 :
421 : static int
422 0 : fd_grpc_client_request_continue( fd_grpc_client_t * client ) {
423 0 : if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD ) ) return 0;
424 0 : if( FD_UNLIKELY( !client->request_stream ) ) return 0;
425 0 : if( FD_UNLIKELY( !client->request_tx_op->chunk_sz ) ) return 0;
426 0 : return fd_grpc_client_request_continue1( client );
427 0 : }
428 :
429 : int
430 0 : fd_grpc_client_is_connected( fd_grpc_client_t * client ) {
431 0 : if( FD_UNLIKELY( !client ) ) return 0;
432 0 : if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD ) ) return 0;
433 0 : if( FD_UNLIKELY( !client->h2_hs_done ) ) return 0;
434 0 : return 1;
435 0 : }
436 :
437 : int
438 0 : fd_grpc_client_request_is_blocked( fd_grpc_client_t * client ) {
439 0 : if( FD_UNLIKELY( !client ) ) return 1;
440 0 : if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD ) ) return 1;
441 0 : if( FD_UNLIKELY( !client->h2_hs_done ) ) return 1;
442 0 : if( FD_UNLIKELY( !fd_h2_rbuf_is_empty( client->frame_tx ) ) ) return 1;
443 0 : if( FD_UNLIKELY( !fd_grpc_client_stream_acquire_is_safe( client ) ) ) return 1;
444 0 : return 0;
445 0 : }
446 :
447 : fd_grpc_h2_stream_t *
448 : fd_grpc_client_request_start(
449 : fd_grpc_client_t * client,
450 : char const * path,
451 : ulong path_len,
452 : ulong request_ctx,
453 : pb_msgdesc_t const * fields,
454 : void const * message,
455 : char const * auth_token,
456 : ulong auth_token_sz
457 0 : ) {
458 0 : if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( client ) ) ) return NULL;
459 :
460 : /* Encode message */
461 0 : FD_TEST( client->nanopb_tx_max > sizeof(fd_grpc_hdr_t) );
462 0 : uchar * proto_buf = client->nanopb_tx + sizeof(fd_grpc_hdr_t);
463 0 : pb_ostream_t ostream = pb_ostream_from_buffer( proto_buf, client->nanopb_tx_max - sizeof(fd_grpc_hdr_t) );
464 0 : if( FD_UNLIKELY( !pb_encode( &ostream, fields, message ) ) ) {
465 0 : FD_LOG_WARNING(( "Failed to encode Protobuf message (%.*s). This is a bug (insufficient buffer space?)", (int)path_len, path ));
466 0 : return NULL;
467 0 : }
468 0 : ulong const serialized_sz = ostream.bytes_written;
469 :
470 : /* Create gRPC length prefix */
471 0 : fd_grpc_hdr_t hdr = {
472 0 : .compressed=0,
473 0 : .msg_sz=fd_uint_bswap( (uint)serialized_sz )
474 0 : };
475 0 : memcpy( client->nanopb_tx, &hdr, sizeof(fd_grpc_hdr_t) );
476 0 : ulong const payload_sz = serialized_sz + sizeof(fd_grpc_hdr_t);
477 :
478 : /* Allocate stream descriptor */
479 0 : fd_grpc_h2_stream_t * stream = fd_grpc_client_stream_acquire( client, request_ctx );
480 0 : uint const stream_id = stream->s.stream_id;
481 :
482 : /* Write HTTP/2 request headers */
483 0 : fd_h2_tx_prepare( client->conn, client->frame_tx, FD_H2_FRAME_TYPE_HEADERS, FD_H2_FLAG_END_HEADERS, stream_id );
484 0 : fd_grpc_req_hdrs_t req_meta = {
485 0 : .host = client->host,
486 0 : .host_len = client->host_len,
487 0 : .port = client->port,
488 0 : .path = path,
489 0 : .path_len = path_len,
490 0 : .https = 1, /* grpc_client assumes TLS encryption for now */
491 :
492 0 : .bearer_auth = auth_token,
493 0 : .bearer_auth_len = auth_token_sz
494 0 : };
495 0 : if( FD_UNLIKELY( !fd_grpc_h2_gen_request_hdrs(
496 0 : &req_meta,
497 0 : client->frame_tx,
498 0 : client->version,
499 0 : client->version_len
500 0 : ) ) ) {
501 0 : FD_LOG_WARNING(( "Failed to generate gRPC request headers (%.*s). This is a bug", (int)path_len, path ));
502 0 : return NULL;
503 0 : }
504 0 : fd_h2_tx_commit( client->conn, client->frame_tx );
505 :
506 : /* Queue request payload for send
507 : (Protobuf message might have to be fragmented into multiple HTTP/2
508 : DATA frames if the client gets blocked) */
509 0 : fd_h2_tx_op_init( client->request_tx_op, client->nanopb_tx, payload_sz, FD_H2_FLAG_END_STREAM );
510 0 : fd_grpc_client_request_continue1( client );
511 0 : client->metrics->requests_sent++;
512 0 : client->metrics->streams_active++;
513 :
514 0 : FD_LOG_DEBUG(( "gRPC request path=%.*s sz=%lu", (int)path_len, path, serialized_sz ));
515 :
516 0 : return stream;
517 0 : }
518 :
519 : void
520 : fd_grpc_client_deadline_set( fd_grpc_h2_stream_t * stream,
521 : int deadline_kind,
522 9 : long ts_nanos ) {
523 9 : switch( deadline_kind ) {
524 6 : case FD_GRPC_DEADLINE_HEADER:
525 6 : stream->header_deadline_nanos = ts_nanos;
526 6 : stream->has_header_deadline = 1;
527 6 : break;
528 3 : case FD_GRPC_DEADLINE_RX_END:
529 3 : stream->rx_end_deadline_nanos = ts_nanos;
530 3 : stream->has_rx_end_deadline = 1;
531 3 : break;
532 9 : }
533 9 : }
534 :
535 : /* Lookup stream by ID */
536 :
537 : static fd_h2_stream_t *
538 : fd_grpc_h2_stream_query( fd_h2_conn_t * conn,
539 0 : uint stream_id ) {
540 0 : fd_grpc_client_t * client = conn->ctx;
541 0 : for( ulong i=0UL; i<FD_GRPC_CLIENT_MAX_STREAMS; i++ ) {
542 0 : if( client->stream_ids[ i ] == stream_id ) {
543 0 : return &client->streams[ i ]->s;
544 0 : }
545 0 : }
546 0 : return NULL;
547 0 : }
548 :
549 : static void
550 0 : fd_grpc_h2_conn_established( fd_h2_conn_t * conn ) {
551 0 : fd_grpc_client_t * client = conn->ctx;
552 0 : client->h2_hs_done = 1;
553 0 : client->callbacks->conn_established( client->ctx );
554 0 : }
555 :
556 : static void
557 : fd_grpc_h2_conn_final( fd_h2_conn_t * conn,
558 : uint h2_err,
559 0 : int closed_by ) {
560 0 : fd_grpc_client_t * client = conn->ctx;
561 0 : client->callbacks->conn_dead( client->ctx, h2_err, closed_by );
562 0 : }
563 :
564 : /* React to response data */
565 :
566 : void
567 : fd_grpc_h2_cb_headers(
568 : fd_h2_conn_t * conn,
569 : fd_h2_stream_t * h2_stream,
570 : void const * data,
571 : ulong data_sz,
572 : ulong flags
573 18 : ) {
574 18 : fd_grpc_h2_stream_t * stream = fd_grpc_h2_stream_upcast( h2_stream );
575 18 : fd_grpc_client_t * client = conn->ctx;
576 :
577 18 : int h2_status = fd_grpc_h2_read_response_hdrs( &stream->hdrs, client->matcher, data, data_sz );
578 18 : if( FD_UNLIKELY( h2_status!=FD_H2_SUCCESS ) ) {
579 : /* Failed to parse HTTP/2 headers */
580 3 : fd_h2_stream_error( h2_stream, client->frame_tx, FD_H2_ERR_PROTOCOL );
581 3 : client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs ); /* invalidates stream->hdrs */
582 3 : fd_grpc_client_stream_release( client, stream );
583 3 : return;
584 3 : }
585 :
586 15 : if( !stream->hdrs_received && !!( flags & FD_H2_FLAG_END_HEADERS) ) {
587 : /* Got initial response header */
588 12 : stream->hdrs_received = 1;
589 12 : stream->has_header_deadline = 0;
590 12 : if( FD_LIKELY( ( stream->hdrs.h2_status==200 ) &
591 12 : ( !!stream->hdrs.is_grpc_proto ) ) ) {
592 6 : client->callbacks->rx_start( client->ctx, stream->request_ctx );
593 6 : }
594 12 : }
595 :
596 15 : if( ( flags & (FD_H2_FLAG_END_HEADERS|FD_H2_FLAG_END_STREAM) )
597 15 : ==(FD_H2_FLAG_END_HEADERS|FD_H2_FLAG_END_STREAM) ) {
598 3 : client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs );
599 3 : fd_grpc_client_stream_release( client, stream );
600 3 : return;
601 3 : }
602 15 : }
603 :
604 : static void
605 : fd_grpc_h2_cb_data(
606 : fd_h2_conn_t * conn,
607 : fd_h2_stream_t * h2_stream,
608 : void const * data,
609 : ulong data_sz,
610 : ulong flags
611 0 : ) {
612 0 : fd_grpc_client_t * client = conn->ctx;
613 0 : fd_grpc_h2_stream_t * stream = fd_grpc_h2_stream_upcast( h2_stream );
614 0 : if( FD_UNLIKELY( ( stream->hdrs.h2_status!=200 ) |
615 0 : ( !stream->hdrs.is_grpc_proto ) ) ) {
616 0 : return;
617 0 : }
618 :
619 0 : do {
620 :
621 : /* Read header bytes */
622 0 : if( stream->msg_buf_used < sizeof(fd_grpc_hdr_t) ) {
623 0 : ulong hdr_frag_sz = fd_ulong_min( sizeof(fd_grpc_hdr_t) - stream->msg_buf_used, data_sz );
624 0 : fd_memcpy( stream->msg_buf + stream->msg_buf_used, data, hdr_frag_sz );
625 0 : stream->msg_buf_used += hdr_frag_sz;
626 0 : data = (void const *)( (ulong)data + (ulong)hdr_frag_sz );
627 0 : data_sz -= hdr_frag_sz;
628 0 : if( FD_UNLIKELY( stream->msg_buf_used < sizeof(fd_grpc_hdr_t) ) ) return;
629 :
630 : /* Header complete */
631 0 : stream->msg_sz = fd_uint_bswap( FD_LOAD( uint, (void *)( (ulong)stream->msg_buf+1 ) ) );
632 0 : if( FD_UNLIKELY( sizeof(fd_grpc_hdr_t) + stream->msg_sz > stream->msg_buf_max ) ) {
633 0 : FD_LOG_WARNING(( "Received oversized gRPC message (%lu bytes), killing request", stream->msg_sz ));
634 0 : fd_h2_stream_error( h2_stream, client->frame_tx, FD_H2_ERR_INTERNAL );
635 0 : fd_grpc_client_stream_release( client, stream );
636 0 : return;
637 0 : }
638 0 : }
639 :
640 : /* Read payload bytes */
641 0 : ulong wmark = sizeof(fd_grpc_hdr_t) + stream->msg_sz;
642 0 : ulong chunk_sz = fd_ulong_min( stream->msg_buf_used+data_sz, wmark ) - stream->msg_buf_used;
643 0 : if( FD_UNLIKELY( chunk_sz>data_sz ) ) FD_LOG_CRIT(( "integer underflow" )); /* unreachable */
644 0 : fd_memcpy( stream->msg_buf + stream->msg_buf_used, data, chunk_sz );
645 0 : stream->msg_buf_used += chunk_sz;
646 0 : data = (void const *)( (ulong)data + (ulong)chunk_sz );
647 0 : data_sz -= chunk_sz;
648 :
649 0 : client->metrics->stream_chunks_rx_cnt++;
650 0 : client->metrics->stream_chunks_rx_bytes += chunk_sz;
651 :
652 0 : if( stream->msg_buf_used >= wmark ) {
653 : /* Data complete */
654 0 : void const * msg_ptr = stream->msg_buf + sizeof(fd_grpc_hdr_t);
655 0 : client->callbacks->rx_msg( client->ctx, msg_ptr, stream->msg_sz, stream->request_ctx );
656 0 : stream->msg_buf_used = 0UL;
657 0 : stream->msg_sz = 0UL;
658 0 : }
659 :
660 0 : } while( data_sz );
661 :
662 0 : if( flags & FD_H2_FLAG_END_STREAM ) {
663 : /* FIXME incomplete gRPC message */
664 0 : if( FD_UNLIKELY( !stream->msg_buf_used ) ) {
665 0 : FD_LOG_WARNING(( "Received incomplete gRPC message" ));
666 0 : }
667 0 : client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs );
668 0 : }
669 0 : }
670 :
671 : /* Server might kill our request */
672 :
673 : static void
674 : fd_grpc_h2_rst_stream( fd_h2_conn_t * conn,
675 : fd_h2_stream_t * stream,
676 : uint error_code,
677 0 : int closed_by ) {
678 0 : if( closed_by==1 ) {
679 0 : FD_LOG_WARNING(( "Server terminated request stream_id=%u (%u-%s)",
680 0 : stream->stream_id, error_code, fd_h2_strerror( error_code ) ));
681 0 : } else {
682 0 : FD_LOG_WARNING(( "Stream failed stream_id=%u (%u-%s)",
683 0 : stream->stream_id, error_code, fd_h2_strerror( error_code ) ));
684 0 : }
685 0 : fd_grpc_client_t * client = conn->ctx;
686 0 : fd_grpc_client_stream_release( client, fd_grpc_h2_stream_upcast( stream ) );
687 0 : }
688 :
689 : /* A HTTP/2 flow control change might unblock a queued request send op */
690 :
691 : void
692 : fd_grpc_h2_window_update( fd_h2_conn_t * conn,
693 0 : uint increment ) {
694 0 : (void)increment;
695 0 : fd_grpc_client_request_continue( conn->ctx );
696 0 : }
697 :
698 : void
699 : fd_grpc_h2_stream_window_update( fd_h2_conn_t * conn,
700 : fd_h2_stream_t * stream,
701 0 : uint increment ) {
702 0 : (void)stream; (void)increment;
703 0 : fd_grpc_client_request_continue( conn->ctx );
704 0 : }
705 :
706 : void
707 0 : fd_grpc_h2_ping_ack( fd_h2_conn_t * conn ) {
708 0 : fd_grpc_client_t * client = conn->ctx;
709 0 : client->callbacks->ping_ack( client->ctx );
710 0 : }
711 :
712 : fd_h2_rbuf_t *
713 0 : fd_grpc_client_rbuf_tx( fd_grpc_client_t * client ) {
714 0 : return client->frame_tx;
715 0 : }
716 :
717 : fd_h2_rbuf_t *
718 0 : fd_grpc_client_rbuf_rx( fd_grpc_client_t * client ) {
719 0 : return client->frame_rx;
720 0 : }
721 :
722 : fd_h2_conn_t *
723 0 : fd_grpc_client_h2_conn( fd_grpc_client_t * client ) {
724 0 : return client->conn;
725 0 : }
726 :
727 : /* fd_grpc_client_h2_callbacks specifies h2->grpc_client callbacks.
728 : Stored in .rodata for security. Must be kept in sync with fd_h2 to
729 : avoid NULL pointers. */
730 :
731 : fd_h2_callbacks_t const fd_grpc_client_h2_callbacks = {
732 : .stream_create = fd_h2_noop_stream_create,
733 : .stream_query = fd_grpc_h2_stream_query,
734 : .conn_established = fd_grpc_h2_conn_established,
735 : .conn_final = fd_grpc_h2_conn_final,
736 : .headers = fd_grpc_h2_cb_headers,
737 : .data = fd_grpc_h2_cb_data,
738 : .rst_stream = fd_grpc_h2_rst_stream,
739 : .window_update = fd_grpc_h2_window_update,
740 : .stream_window_update = fd_grpc_h2_stream_window_update,
741 : .ping_ack = fd_grpc_h2_ping_ack,
742 : };
|