Line data Source code
1 : /* fd_bundle_client.c steps gRPC related tasks. */
2 :
3 : #define _GNU_SOURCE /* SOL_TCP */
4 : #include "fd_bundle_auth.h"
5 : #include "fd_bundle_tile_private.h"
6 : #include "proto/block_engine.pb.h"
7 : #include "proto/bundle.pb.h"
8 : #include "proto/packet.pb.h"
9 : #include "../fd_txn_m.h"
10 : #include "../plugin/fd_plugin.h"
11 : #include "../../waltz/h2/fd_h2_conn.h"
12 : #include "../../waltz/http/fd_url.h" /* fd_url_unescape */
13 : #include "../../ballet/base58/fd_base58.h"
14 : #include "../../ballet/nanopb/pb_decode.h"
15 : #include "../../util/net/fd_ip4.h"
16 :
17 : #include <fcntl.h>
18 : #include <errno.h>
19 : #include <unistd.h> /* close */
20 : #include <poll.h> /* poll */
21 : #include <sys/socket.h> /* socket */
22 : #include <netinet/in.h>
23 : #include <netinet/ip.h>
24 : #include <netinet/tcp.h>
25 :
26 9 : #define FD_BUNDLE_CLIENT_REQUEST_TIMEOUT ((long)8e9) /* 8 seconds */
27 :
28 : #define FD_BUNDLE_CLIENT_MAX_TXN_PER_BUNDLE (5UL)
29 :
30 : __attribute__((weak)) long
31 756 : fd_bundle_now( void ) {
32 756 : return fd_log_wallclock();
33 756 : }
34 :
35 : void
36 3 : fd_bundle_client_reset( fd_bundle_tile_t * ctx ) {
37 3 : if( FD_UNLIKELY( ctx->tcp_sock >= 0 ) ) {
38 3 : if( FD_UNLIKELY( 0!=close( ctx->tcp_sock ) ) ) {
39 0 : FD_LOG_ERR(( "close(tcp_sock=%i) failed (%i-%s)", ctx->tcp_sock, errno, fd_io_strerror( errno ) ));
40 0 : }
41 3 : ctx->tcp_sock = -1;
42 3 : ctx->tcp_sock_connected = 0;
43 3 : }
44 3 : ctx->defer_reset = 0;
45 :
46 3 : ctx->builder_info_avail = 0;
47 3 : ctx->builder_info_wait = 0;
48 3 : ctx->packet_subscription_live = 0;
49 3 : ctx->packet_subscription_wait = 0;
50 3 : ctx->bundle_subscription_live = 0;
51 3 : ctx->bundle_subscription_wait = 0;
52 :
53 3 : memset( ctx->rtt, 0, sizeof(fd_rtt_estimate_t) );
54 :
55 3 : # if FD_HAS_OPENSSL
56 3 : if( FD_UNLIKELY( ctx->ssl ) ) {
57 0 : SSL_free( ctx->ssl );
58 0 : ctx->ssl = NULL;
59 0 : }
60 3 : # endif
61 :
62 3 : fd_bundle_tile_backoff( ctx, fd_bundle_now() );
63 :
64 3 : fd_bundle_auther_reset( &ctx->auther );
65 3 : fd_grpc_client_reset( ctx->grpc_client );
66 3 : }
67 :
68 : static int
69 : fd_bundle_client_do_connect( fd_bundle_tile_t const * ctx,
70 0 : uint ip4_addr ) {
71 0 : struct sockaddr_in addr = {
72 0 : .sin_family = AF_INET,
73 0 : .sin_addr.s_addr = ip4_addr,
74 0 : .sin_port = fd_ushort_bswap( ctx->server_tcp_port )
75 0 : };
76 0 : int err = connect( ctx->tcp_sock, fd_type_pun_const( &addr ), sizeof(struct sockaddr_in) );
77 : /* FD_LIKELY is used here as EINPROGRESS is expected even to local tcp ports */
78 0 : if( FD_LIKELY( err==-1 ) ) {
79 0 : return errno;
80 0 : }
81 0 : return 0;
82 0 : }
83 :
84 : static int
85 0 : fd_bundle_client_get_connect_result( fd_bundle_tile_t const * ctx ) {
86 0 : int so_err = 0;
87 0 : socklen_t so_err_sz = sizeof(so_err);
88 0 : if( FD_UNLIKELY( getsockopt( ctx->tcp_sock, SOL_SOCKET, SO_ERROR, &so_err, &so_err_sz )==-1 ) ) {
89 0 : return errno;
90 0 : }
91 0 : return so_err;
92 0 : }
93 :
94 : static void
95 0 : fd_bundle_client_create_conn( fd_bundle_tile_t * ctx ) {
96 0 : fd_bundle_client_reset( ctx );
97 :
98 : /* FIXME IPv6 support */
99 0 : fd_addrinfo_t hints = {0};
100 0 : hints.ai_family = AF_INET;
101 0 : fd_addrinfo_t * res = NULL;
102 0 : uchar scratch[ 4096 ];
103 0 : void * pscratch = scratch;
104 0 : int err = fd_getaddrinfo( ctx->server_fqdn, &hints, &res, &pscratch, sizeof(scratch) );
105 0 : if( FD_UNLIKELY( err ) ) {
106 0 : FD_LOG_WARNING(( "fd_getaddrinfo `%s` failed (%d-%s)", ctx->server_fqdn, err, fd_gai_strerror( err ) ));
107 0 : fd_bundle_client_reset( ctx );
108 0 : ctx->metrics.transport_fail_cnt++;
109 0 : return;
110 0 : }
111 0 : uint const ip4_addr = ((struct sockaddr_in *)res->ai_addr)->sin_addr.s_addr;
112 0 : ctx->server_ip4_addr = ip4_addr;
113 :
114 0 : int tcp_sock = socket( AF_INET, SOCK_STREAM|SOCK_CLOEXEC, 0 );
115 0 : if( FD_UNLIKELY( tcp_sock<0 ) ) {
116 0 : FD_LOG_ERR(( "socket(AF_INET,SOCK_STREAM|SOCK_CLOEXEC,0) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
117 0 : }
118 0 : ctx->tcp_sock = tcp_sock;
119 :
120 0 : if( FD_UNLIKELY( 0!=setsockopt( tcp_sock, SOL_SOCKET, SO_RCVBUF, &ctx->so_rcvbuf, sizeof(int) ) ) ) {
121 0 : FD_LOG_ERR(( "setsockopt(SOL_SOCKET,SO_RCVBUF,%i) failed (%i-%s)", ctx->so_rcvbuf, errno, fd_io_strerror( errno ) ));
122 0 : }
123 :
124 0 : int tcp_nodelay = 1;
125 0 : if( FD_UNLIKELY( 0!=setsockopt( tcp_sock, SOL_TCP, TCP_NODELAY, &tcp_nodelay, sizeof(int) ) ) ) {
126 0 : FD_LOG_ERR(( "setsockopt failed (%d-%s)", errno, fd_io_strerror( errno ) ));
127 0 : }
128 :
129 0 : if( FD_UNLIKELY( fcntl( tcp_sock, F_SETFL, O_NONBLOCK )==-1 ) ) {
130 0 : FD_LOG_ERR(( "fcntl(tcp_sock,F_SETFL,O_NONBLOCK) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
131 0 : }
132 :
133 0 : char const * scheme = "http";
134 0 : # if FD_HAS_OPENSSL
135 0 : if( ctx->is_ssl ) scheme = "https";
136 0 : # endif
137 :
138 0 : FD_LOG_INFO(( "Connecting to %s://" FD_IP4_ADDR_FMT ":%hu (%.*s)",
139 0 : scheme,
140 0 : FD_IP4_ADDR_FMT_ARGS( ip4_addr ), ctx->server_tcp_port,
141 0 : (int)ctx->server_sni_len, ctx->server_sni ));
142 :
143 0 : int connect_err = fd_bundle_client_do_connect( ctx, ip4_addr );
144 : /* FD_LIKELY as EINPROGRESS is expected */
145 0 : if( FD_LIKELY( connect_err ) ) {
146 0 : if( FD_UNLIKELY( connect_err!=EINPROGRESS ) ) {
147 0 : FD_LOG_WARNING(( "connect(tcp_sock," FD_IP4_ADDR_FMT ":%u) failed (%i-%s)",
148 0 : FD_IP4_ADDR_FMT_ARGS( ip4_addr ), ctx->server_tcp_port,
149 0 : connect_err, fd_io_strerror( connect_err ) ));
150 0 : fd_bundle_client_reset( ctx );
151 0 : ctx->metrics.transport_fail_cnt++;
152 0 : return;
153 0 : }
154 0 : }
155 :
156 0 : # if FD_HAS_OPENSSL
157 0 : if( ctx->is_ssl ) {
158 0 : BIO * bio = BIO_new_socket( ctx->tcp_sock, BIO_NOCLOSE );
159 0 : if( FD_UNLIKELY( !bio ) ) {
160 0 : FD_LOG_ERR(( "BIO_new_socket failed" ));
161 0 : }
162 :
163 0 : SSL * ssl = SSL_new( ctx->ssl_ctx );
164 0 : if( FD_UNLIKELY( !ssl ) ) {
165 0 : FD_LOG_ERR(( "SSL_new failed" ));
166 0 : }
167 :
168 0 : SSL_set_bio( ssl, bio, bio ); /* moves ownership of bio */
169 0 : SSL_set_connect_state( ssl );
170 :
171 : /* Indicate to endpoint which server name we want */
172 0 : if( FD_UNLIKELY( !SSL_set_tlsext_host_name( ssl, ctx->server_sni ) ) ) {
173 0 : FD_LOG_ERR(( "SSL_set_tlsext_host_name failed" ));
174 0 : }
175 :
176 : /* Enable hostname verification */
177 0 : if( FD_UNLIKELY( !SSL_set1_host( ssl, ctx->server_sni ) ) ) {
178 0 : FD_LOG_ERR(( "SSL_set1_host failed" ));
179 0 : }
180 :
181 0 : ctx->ssl = ssl;
182 0 : }
183 0 : # endif /* FD_HAS_OPENSSL */
184 :
185 0 : fd_grpc_client_reset( ctx->grpc_client );
186 0 : fd_keepalive_init( ctx->keepalive, ctx->rng, ctx->keepalive_interval, ctx->keepalive_interval, fd_bundle_now() );
187 0 : }
188 :
189 : static int
190 : fd_bundle_client_drive_io( fd_bundle_tile_t * ctx,
191 27 : int * charge_busy ) {
192 27 : # if FD_HAS_OPENSSL
193 27 : if( ctx->is_ssl ) {
194 0 : return fd_grpc_client_rxtx_ossl( ctx->grpc_client, ctx->ssl, charge_busy );
195 0 : }
196 27 : # endif /* FD_HAS_OPENSSL */
197 :
198 27 : return fd_grpc_client_rxtx_socket( ctx->grpc_client, ctx->tcp_sock, charge_busy );
199 27 : }
200 :
201 : static void
202 3 : fd_bundle_client_request_builder_info( fd_bundle_tile_t * ctx ) {
203 3 : if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
204 :
205 3 : block_engine_BlockBuilderFeeInfoRequest req = block_engine_BlockBuilderFeeInfoRequest_init_default;
206 3 : static char const path[] = "/block_engine.BlockEngineValidator/GetBlockBuilderFeeInfo";
207 3 : fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
208 3 : ctx->grpc_client,
209 3 : path, sizeof(path)-1,
210 3 : FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo,
211 3 : &block_engine_BlockBuilderFeeInfoRequest_msg, &req,
212 3 : ctx->auther.access_token, ctx->auther.access_token_sz,
213 3 : 0 /* is_streaming */
214 3 : );
215 3 : if( FD_UNLIKELY( !request ) ) return;
216 3 : fd_grpc_client_deadline_set(
217 3 : request,
218 3 : FD_GRPC_DEADLINE_RX_END,
219 3 : fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
220 :
221 3 : ctx->builder_info_wait = 1;
222 3 : }
223 :
224 : static void
225 3 : fd_bundle_client_subscribe_packets( fd_bundle_tile_t * ctx ) {
226 3 : if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
227 :
228 3 : block_engine_SubscribePacketsRequest req = block_engine_SubscribePacketsRequest_init_default;
229 3 : static char const path[] = "/block_engine.BlockEngineValidator/SubscribePackets";
230 3 : fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
231 3 : ctx->grpc_client,
232 3 : path, sizeof(path)-1,
233 3 : FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets,
234 3 : &block_engine_SubscribePacketsRequest_msg, &req,
235 3 : ctx->auther.access_token, ctx->auther.access_token_sz,
236 3 : 0 /* is_streaming */
237 3 : );
238 3 : if( FD_UNLIKELY( !request ) ) return;
239 3 : fd_grpc_client_deadline_set(
240 3 : request,
241 3 : FD_GRPC_DEADLINE_HEADER,
242 3 : fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
243 :
244 3 : ctx->packet_subscription_wait = 1;
245 3 : }
246 :
247 : static void
248 3 : fd_bundle_client_subscribe_bundles( fd_bundle_tile_t * ctx ) {
249 3 : if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
250 :
251 3 : block_engine_SubscribeBundlesRequest req = block_engine_SubscribeBundlesRequest_init_default;
252 3 : static char const path[] = "/block_engine.BlockEngineValidator/SubscribeBundles";
253 3 : fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
254 3 : ctx->grpc_client,
255 3 : path, sizeof(path)-1,
256 3 : FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles,
257 3 : &block_engine_SubscribeBundlesRequest_msg, &req,
258 3 : ctx->auther.access_token, ctx->auther.access_token_sz,
259 3 : 0 /* is_streaming */
260 3 : );
261 3 : if( FD_UNLIKELY( !request ) ) return;
262 3 : fd_grpc_client_deadline_set(
263 3 : request,
264 3 : FD_GRPC_DEADLINE_HEADER,
265 3 : fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
266 :
267 3 : ctx->bundle_subscription_wait = 1;
268 3 : }
269 :
270 : void
271 3 : fd_bundle_client_send_ping( fd_bundle_tile_t * ctx ) {
272 3 : if( FD_UNLIKELY( !ctx->grpc_client ) ) return; /* no client */
273 3 : fd_h2_conn_t * conn = fd_grpc_client_h2_conn( ctx->grpc_client );
274 3 : if( FD_UNLIKELY( !conn ) ) return; /* no conn */
275 3 : if( FD_UNLIKELY( conn->flags ) ) return; /* conn busy */
276 3 : fd_h2_rbuf_t * rbuf_tx = fd_grpc_client_rbuf_tx( ctx->grpc_client );
277 :
278 3 : if( FD_LIKELY( fd_h2_tx_ping( conn, rbuf_tx ) ) ) {
279 3 : long now = fd_bundle_now();
280 3 : fd_keepalive_tx( ctx->keepalive, ctx->rng, now );
281 3 : FD_LOG_DEBUG(( "Keepalive TX (deadline=+%gs)", (double)( ctx->keepalive->ts_deadline-now )/1e9 ));
282 3 : }
283 3 : }
284 :
285 : int
286 : fd_bundle_client_step_reconnect( fd_bundle_tile_t * ctx,
287 18 : long now ) {
288 : /* Drive auth */
289 18 : if( FD_UNLIKELY( ctx->auther.needs_poll ) ) {
290 0 : fd_bundle_auther_poll( &ctx->auther, ctx->grpc_client, ctx->keyguard_client );
291 0 : return 1;
292 0 : }
293 18 : if( FD_UNLIKELY( ctx->auther.state!=FD_BUNDLE_AUTH_STATE_DONE_WAIT ) ) return 0;
294 :
295 : /* Request block builder info */
296 18 : int const builder_info_expired = ( ctx->builder_info_valid_until - now )<0;
297 18 : if( FD_UNLIKELY( ( ( !ctx->builder_info_avail ) |
298 18 : ( builder_info_expired ) ) &
299 18 : ( !ctx->builder_info_wait ) ) ) {
300 3 : fd_bundle_client_request_builder_info( ctx );
301 3 : return 1;
302 3 : }
303 :
304 : /* Subscribe to packets */
305 15 : if( FD_UNLIKELY( !ctx->packet_subscription_live && !ctx->packet_subscription_wait ) ) {
306 3 : fd_bundle_client_subscribe_packets( ctx );
307 3 : return 1;
308 3 : }
309 :
310 : /* Subscribe to bundles */
311 12 : if( FD_UNLIKELY( !ctx->bundle_subscription_live && !ctx->bundle_subscription_wait ) ) {
312 3 : fd_bundle_client_subscribe_bundles( ctx );
313 3 : return 1;
314 3 : }
315 :
316 : /* Send a PING */
317 9 : if( FD_UNLIKELY( fd_keepalive_should_tx( ctx->keepalive, now ) ) ) {
318 3 : fd_bundle_client_send_ping( ctx );
319 3 : return 1;
320 3 : }
321 :
322 6 : return 0;
323 9 : }
324 :
325 : static void
326 : fd_bundle_client_step1( fd_bundle_tile_t * ctx,
327 30 : int * charge_busy ) {
328 :
329 : /* Wait for TCP socket to connect */
330 30 : if( FD_UNLIKELY( !ctx->tcp_sock_connected ) ) {
331 0 : if( FD_UNLIKELY( ctx->tcp_sock < 0 ) ) goto reconnect;
332 :
333 0 : struct pollfd pfds[1] = {
334 0 : { .fd = ctx->tcp_sock, .events = POLLOUT }
335 0 : };
336 0 : int poll_res = fd_syscall_poll( pfds, 1, 0 );
337 0 : if( FD_UNLIKELY( poll_res<0 ) ) {
338 0 : FD_LOG_ERR(( "fd_syscall_poll(tcp_sock) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
339 0 : }
340 0 : if( poll_res==0 ) return;
341 :
342 0 : int connect_result = 0;
343 0 : if( pfds[0].revents & (POLLERR|POLLHUP) ) {
344 0 : connect_result = fd_bundle_client_get_connect_result( ctx );
345 0 : connect_failed:
346 0 : FD_LOG_INFO(( "Bundle gRPC connect attempt failed (%i-%s)", connect_result, fd_io_strerror( connect_result ) ));
347 0 : fd_bundle_client_reset( ctx );
348 0 : ctx->metrics.transport_fail_cnt++;
349 0 : *charge_busy = 1;
350 0 : return;
351 0 : }
352 0 : if( pfds[0].revents & POLLOUT ) {
353 0 : connect_result = fd_bundle_client_get_connect_result( ctx );
354 0 : if( FD_UNLIKELY( connect_result!=0 ) ) {
355 0 : goto connect_failed;
356 0 : }
357 0 : FD_LOG_DEBUG(( "Bundle TCP socket connected" ));
358 0 : ctx->tcp_sock_connected = 1;
359 0 : *charge_busy = 1;
360 0 : return;
361 0 : }
362 0 : return;
363 0 : }
364 :
365 : /* gRPC conn died? */
366 30 : if( FD_UNLIKELY( !ctx->grpc_client ) ) {
367 0 : long sleep_start;
368 0 : reconnect:
369 0 : sleep_start = fd_bundle_now();
370 0 : if( FD_UNLIKELY( fd_bundle_tile_should_stall( ctx, sleep_start ) ) ) {
371 0 : long wait_dur = ctx->backoff_until - sleep_start;
372 0 : fd_log_sleep( fd_long_min( wait_dur, 1e6 ) );
373 0 : return;
374 0 : }
375 0 : fd_bundle_client_create_conn( ctx );
376 0 : *charge_busy = 1;
377 0 : return;
378 0 : }
379 :
380 : /* Did a HTTP/2 PING time out */
381 30 : long check_ts = ctx->cached_ts = fd_bundle_now();
382 30 : if( FD_UNLIKELY( fd_keepalive_is_timeout( ctx->keepalive, check_ts ) ) ) {
383 3 : FD_LOG_WARNING(( "Bundle gRPC timed out (HTTP/2 PING went unanswered for %.2f seconds)",
384 3 : (double)( check_ts - ctx->keepalive->ts_last_tx )/1e9 ));
385 3 : ctx->keepalive->inflight = 0;
386 3 : ctx->defer_reset = 1;
387 3 : *charge_busy = 1;
388 3 : return;
389 3 : }
390 :
391 : /* Drive I/O, SSL handshake, and any inflight requests */
392 27 : if( FD_UNLIKELY( -1==fd_bundle_client_drive_io( ctx, charge_busy ) || ctx->defer_reset /* new error? */ ) ) {
393 0 : fd_bundle_client_reset( ctx );
394 0 : ctx->metrics.transport_fail_cnt++;
395 0 : *charge_busy = 1;
396 0 : return;
397 0 : }
398 :
399 : /* Are we ready to issue a new request? */
400 27 : if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
401 18 : long io_ts = fd_bundle_now();
402 18 : if( FD_UNLIKELY( fd_bundle_tile_should_stall( ctx, io_ts ) ) ) return;
403 :
404 18 : *charge_busy |= fd_bundle_client_step_reconnect( ctx, io_ts );
405 18 : }
406 :
407 : static void
408 30 : fd_bundle_client_log_status( fd_bundle_tile_t * ctx ) {
409 30 : int status = fd_bundle_client_status( ctx );
410 :
411 30 : int const connected_now = ( status==FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTED );
412 30 : int const connected_before = ( ctx->bundle_status_logged==FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTED );
413 :
414 30 : if( FD_UNLIKELY( connected_now!=connected_before ) ) {
415 3 : long ts = fd_log_wallclock();
416 3 : if( FD_LIKELY( ts-(ctx->last_bundle_status_log_nanos) >= (long)1e6 ) ) {
417 3 : if( connected_now ) {
418 3 : FD_LOG_NOTICE(( "Connected to bundle server" ));
419 3 : } else {
420 0 : FD_LOG_WARNING(( "Disconnected from bundle server" ));
421 0 : }
422 3 : ctx->last_bundle_status_log_nanos = ts;
423 3 : ctx->bundle_status_logged = (uchar)status;
424 3 : }
425 3 : }
426 30 : }
427 :
428 : void
429 : fd_bundle_client_step( fd_bundle_tile_t * ctx,
430 30 : int * charge_busy ) {
431 : /* Edge-trigger logging with rate limiting */
432 30 : fd_bundle_client_step1( ctx, charge_busy );
433 30 : fd_bundle_client_log_status( ctx );
434 30 : }
435 :
436 : void
437 : fd_bundle_tile_backoff( fd_bundle_tile_t * ctx,
438 12 : long now ) {
439 12 : uint iter = ctx->backoff_iter;
440 12 : if( now < ctx->backoff_reset ) iter = 0U;
441 12 : iter++;
442 :
443 : /* FIXME proper backoff */
444 12 : long wait_ns = (long)2e9;
445 12 : wait_ns = (long)( fd_rng_ulong( ctx->rng ) & ( (1UL<<fd_ulong_find_msb_w_default( (ulong)wait_ns, 0 ))-1UL ) );
446 :
447 12 : ctx->backoff_until = now + wait_ns;
448 12 : ctx->backoff_reset = now + 2*wait_ns;
449 :
450 12 : ctx->backoff_iter = iter;
451 12 : }
452 :
453 : static void
454 0 : fd_bundle_client_grpc_conn_established( void * app_ctx ) {
455 0 : (void)app_ctx;
456 0 : FD_LOG_INFO(( "Bundle gRPC connection established" ));
457 0 : }
458 :
459 : static void
460 : fd_bundle_client_grpc_conn_dead( void * app_ctx,
461 : uint h2_err,
462 0 : int closed_by ) {
463 0 : fd_bundle_tile_t * ctx = app_ctx;
464 0 : FD_LOG_INFO(( "Bundle gRPC connection closed %s (%u-%s)",
465 0 : closed_by ? "by peer" : "due to error",
466 0 : h2_err, fd_h2_strerror( h2_err ) ));
467 0 : ctx->defer_reset = 1;
468 0 : }
469 :
470 : /* Forwards a bundle transaction to the tango message bus. */
471 :
472 : static void
473 : fd_bundle_tile_publish_bundle_txn(
474 : fd_bundle_tile_t * ctx,
475 : void const * txn,
476 : ulong txn_sz, /* <=FD_TXN_MTU */
477 : ulong bundle_txn_cnt,
478 : uint source_ipv4
479 30 : ) {
480 30 : if( FD_UNLIKELY( !ctx->builder_info_avail ) ) {
481 0 : ctx->metrics.missing_builder_info_fail_cnt++; /* unreachable */
482 0 : return;
483 0 : }
484 :
485 30 : fd_txn_m_t * txnm = fd_chunk_to_laddr( ctx->verify_out.mem, ctx->verify_out.chunk );
486 30 : *txnm = (fd_txn_m_t) {
487 30 : .reference_slot = 0UL,
488 30 : .payload_sz = (ushort)txn_sz,
489 30 : .txn_t_sz = 0U,
490 30 : .source_ipv4 = source_ipv4,
491 30 : .source_tpu = FD_TXN_M_TPU_SOURCE_BUNDLE,
492 30 : .block_engine = {
493 30 : .bundle_id = ctx->bundle_seq,
494 30 : .bundle_txn_cnt = bundle_txn_cnt,
495 30 : .commission = (uchar)ctx->builder_commission
496 30 : },
497 30 : };
498 30 : memcpy( txnm->block_engine.commission_pubkey, ctx->builder_pubkey, 32UL );
499 30 : fd_memcpy( fd_txn_m_payload( txnm ), txn, txn_sz );
500 :
501 30 : ulong sz = fd_txn_m_realized_footprint( txnm, 0, 0 );
502 30 : ulong sig = 1UL;
503 :
504 30 : if( FD_UNLIKELY( !ctx->stem ) ) {
505 0 : FD_LOG_CRIT(( "ctx->stem not set. This is a bug." ));
506 0 : }
507 :
508 30 : ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_bundle_now() );
509 30 : fd_stem_publish( ctx->stem, ctx->verify_out.idx, sig, ctx->verify_out.chunk, sz, 0UL, 0UL, tspub );
510 30 : ctx->verify_out.chunk = fd_dcache_compact_next( ctx->verify_out.chunk, sz, ctx->verify_out.chunk0, ctx->verify_out.wmark );
511 30 : ctx->metrics.txn_received_cnt++;
512 30 : }
513 :
514 : /* Forwards a regular transaction to the tango message bus. */
515 :
516 : static void
517 : fd_bundle_tile_publish_txn(
518 : fd_bundle_tile_t * ctx,
519 : void const * txn,
520 : ulong txn_sz, /* <=FD_TXN_MTU */
521 : uint source_ipv4
522 9 : ) {
523 9 : fd_txn_m_t * txnm = fd_chunk_to_laddr( ctx->verify_out.mem, ctx->verify_out.chunk );
524 9 : *txnm = (fd_txn_m_t) {
525 9 : .reference_slot = 0UL,
526 9 : .payload_sz = (ushort)txn_sz,
527 9 : .txn_t_sz = 0U,
528 9 : .source_ipv4 = source_ipv4,
529 9 : .source_tpu = FD_TXN_M_TPU_SOURCE_BUNDLE,
530 9 : .block_engine = {
531 9 : .bundle_id = 0UL,
532 9 : .bundle_txn_cnt = 1UL,
533 9 : .commission = 0U,
534 9 : .commission_pubkey = {0U}
535 9 : },
536 9 : };
537 9 : fd_memcpy( fd_txn_m_payload( txnm ), txn, txn_sz );
538 :
539 9 : ulong sz = fd_txn_m_realized_footprint( txnm, 0, 0 );
540 9 : ulong sig = 0UL;
541 :
542 9 : if( FD_UNLIKELY( !ctx->stem ) ) {
543 0 : FD_LOG_CRIT(( "ctx->stem not set. This is a bug." ));
544 0 : }
545 :
546 9 : ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_bundle_now() );
547 9 : fd_stem_publish( ctx->stem, ctx->verify_out.idx, sig, ctx->verify_out.chunk, sz, 0UL, 0UL, tspub );
548 9 : ctx->verify_out.chunk = fd_dcache_compact_next( ctx->verify_out.chunk, sz, ctx->verify_out.chunk0, ctx->verify_out.wmark );
549 9 : ctx->metrics.txn_received_cnt++;
550 9 : }
551 :
552 : /* Called for each transaction in a bundle. Simply counts up
553 : bundle_txn_cnt, but does not publish anything. */
554 :
555 : static bool
556 : fd_bundle_client_visit_pb_bundle_txn_preflight(
557 : pb_istream_t * istream,
558 : pb_field_t const * field,
559 : void ** arg
560 48 : ) {
561 48 : (void)istream; (void)field;
562 48 : fd_bundle_tile_t * ctx = *arg;
563 48 : ctx->bundle_txn_cnt++;
564 48 : return true;
565 48 : }
566 :
567 : /* Called for each transaction in a bundle. Publishes each transaction
568 : to the tango message bus. */
569 :
570 : static bool
571 : fd_bundle_client_visit_pb_bundle_txn(
572 : pb_istream_t * istream,
573 : pb_field_t const * field,
574 : void ** arg
575 30 : ) {
576 30 : (void)field;
577 30 : fd_bundle_tile_t * ctx = *arg;
578 :
579 30 : packet_Packet packet = packet_Packet_init_default;
580 30 : if( FD_UNLIKELY( !pb_decode( istream, &packet_Packet_msg, &packet ) ) ) {
581 0 : ctx->metrics.decode_fail_cnt++;
582 0 : FD_LOG_WARNING(( "Protobuf decode of (packet.Packet) failed" ));
583 0 : return false;
584 0 : }
585 :
586 30 : if( FD_UNLIKELY( packet.data.size == 0 ) ) {
587 0 : FD_LOG_WARNING(( "Bundle server delivered an empty packet, ignoring" ));
588 0 : return true;
589 0 : }
590 :
591 30 : if( FD_UNLIKELY( packet.data.size > FD_TXN_MTU ) ) {
592 0 : FD_LOG_WARNING(( "Bundle server delivered an oversize transaction, ignoring" ));
593 0 : return true;
594 0 : }
595 :
596 30 : uint _ip4; uint ip4 = fd_uint_if( packet.has_meta, fd_cstr_to_ip4_addr( packet.meta.addr, &_ip4 ) ? _ip4 : ctx->server_ip4_addr, ctx->server_ip4_addr );
597 30 : fd_bundle_tile_publish_bundle_txn(
598 30 : ctx,
599 30 : packet.data.bytes, packet.data.size,
600 30 : ctx->bundle_txn_cnt,
601 30 : ip4
602 30 : );
603 :
604 30 : return true;
605 30 : }
606 :
607 : static void
608 : fd_bundle_client_sample_rx_delay(
609 : fd_bundle_tile_t * ctx,
610 : google_protobuf_Timestamp const * ts
611 12 : ) {
612 12 : ulong tsorig = (ulong)ts->seconds*(ulong)1e9 + (ulong)ts->nanos;
613 12 : fd_histf_sample( ctx->metrics.msg_rx_delay, fd_ulong_sat_sub( (ulong)ctx->cached_ts, tsorig ) );
614 12 : }
615 :
616 : /* Called for each BundleUuid in a SubscribeBundlesResponse. */
617 :
618 : static bool
619 : fd_bundle_client_visit_pb_bundle_uuid(
620 : pb_istream_t * istream,
621 : pb_field_t const * field,
622 : void ** arg
623 9 : ) {
624 9 : (void)field;
625 9 : fd_bundle_tile_t * ctx = *arg;
626 :
627 : /* Reset bundle state */
628 :
629 9 : ctx->bundle_txn_cnt = 0UL;
630 :
631 : /* Do two decode passes. This is required because we need to know the
632 : number of transactions in a bundle ahead of time. However, due to
633 : the Protobuf wire encoding, we don't know the number of txns that
634 : will come until we've parsed everything.
635 :
636 : First pass: Count number of bundles. */
637 :
638 9 : pb_istream_t peek = *istream;
639 9 : bundle_BundleUuid bundle = bundle_BundleUuid_init_default;
640 9 : bundle.bundle.packets = (pb_callback_t) {
641 9 : .funcs.decode = fd_bundle_client_visit_pb_bundle_txn_preflight,
642 9 : .arg = ctx
643 9 : };
644 9 : if( FD_UNLIKELY( !pb_decode( &peek, &bundle_BundleUuid_msg, &bundle ) ) ) {
645 0 : ctx->metrics.decode_fail_cnt++;
646 0 : FD_LOG_WARNING(( "Protobuf decode of (bundle.BundleUuid) failed: %s", peek.errmsg ));
647 0 : return false;
648 0 : }
649 :
650 : /* At this opint, ctx->bundle_txn_cnt is correctly set. Too many txns
651 : is treated as a NOP.
652 :
653 : Second pass: Actually publish bundle packets */
654 :
655 9 : if( FD_UNLIKELY( ctx->bundle_txn_cnt>FD_BUNDLE_CLIENT_MAX_TXN_PER_BUNDLE ) ) return true;
656 :
657 6 : ctx->bundle_seq++;
658 6 : bundle = (bundle_BundleUuid)bundle_BundleUuid_init_default;
659 6 : bundle.bundle.packets = (pb_callback_t) {
660 6 : .funcs.decode = fd_bundle_client_visit_pb_bundle_txn,
661 6 : .arg = ctx
662 6 : };
663 :
664 6 : ctx->metrics.bundle_received_cnt++;
665 :
666 6 : if( FD_UNLIKELY( !pb_decode( istream, &bundle_BundleUuid_msg, &bundle ) ) ) {
667 0 : ctx->metrics.decode_fail_cnt++;
668 0 : FD_LOG_WARNING(( "Protobuf decode of (bundle.BundleUuid) failed (internal error): %s", istream->errmsg ));
669 0 : return false;
670 0 : }
671 :
672 6 : fd_bundle_client_sample_rx_delay( ctx, &bundle.bundle.header.ts );
673 :
674 6 : return true;
675 6 : }
676 :
677 : /* Handle a SubscribeBundlesResponse from a SubscribeBundles gRPC call. */
678 :
679 : static void
680 : fd_bundle_client_handle_bundle_batch(
681 : fd_bundle_tile_t * ctx,
682 : pb_istream_t * istream
683 12 : ) {
684 12 : if( FD_UNLIKELY( !ctx->builder_info_avail ) ) {
685 3 : ctx->metrics.missing_builder_info_fail_cnt++; /* unreachable */
686 3 : return;
687 3 : }
688 :
689 9 : block_engine_SubscribeBundlesResponse res = block_engine_SubscribeBundlesResponse_init_default;
690 9 : res.bundles = (pb_callback_t) {
691 9 : .funcs.decode = fd_bundle_client_visit_pb_bundle_uuid,
692 9 : .arg = ctx
693 9 : };
694 9 : if( FD_UNLIKELY( !pb_decode( istream, &block_engine_SubscribeBundlesResponse_msg, &res ) ) ) {
695 0 : ctx->metrics.decode_fail_cnt++;
696 0 : FD_LOG_WARNING(( "Protobuf decode of (block_engine.SubscribeBundlesResponse) failed: %s", istream->errmsg ));
697 0 : return;
698 0 : }
699 9 : }
700 :
701 : /* Called for each 'Packet' (a regular transaction) of a
702 : SubscribePacketsResponse. */
703 :
704 : static bool
705 : fd_bundle_client_visit_pb_packet(
706 : pb_istream_t * istream,
707 : pb_field_t const * field,
708 : void ** arg
709 9 : ) {
710 9 : (void)field;
711 9 : fd_bundle_tile_t * ctx = *arg;
712 :
713 9 : packet_Packet packet = packet_Packet_init_default;
714 9 : if( FD_UNLIKELY( !pb_decode( istream, &packet_Packet_msg, &packet ) ) ) {
715 0 : ctx->metrics.decode_fail_cnt++;
716 0 : FD_LOG_WARNING(( "Protobuf decode of (packet.Packet) failed" ));
717 0 : return false;
718 0 : }
719 :
720 9 : if( FD_UNLIKELY( packet.data.size == 0 ) ) {
721 0 : FD_LOG_WARNING(( "Bundle server delivered an empty packet, ignoring" ));
722 0 : return true;
723 0 : }
724 :
725 9 : if( FD_UNLIKELY( packet.data.size > FD_TXN_MTU ) ) {
726 0 : FD_LOG_WARNING(( "Bundle server delivered an oversize transaction, ignoring" ));
727 0 : return true;
728 0 : }
729 :
730 :
731 9 : uint _ip4; uint ip4 = fd_uint_if( packet.has_meta, fd_cstr_to_ip4_addr( packet.meta.addr, &_ip4 ) ? _ip4 : 0U, 0U );
732 9 : fd_bundle_tile_publish_txn( ctx, packet.data.bytes, packet.data.size, ip4 );
733 9 : ctx->metrics.packet_received_cnt++;
734 :
735 9 : return true;
736 9 : }
737 :
738 : /* Handle a SubscribePacketsResponse from a SubscribePackets gRPC call. */
739 :
740 : static void
741 : fd_bundle_client_handle_packet_batch(
742 : fd_bundle_tile_t * ctx,
743 : pb_istream_t * istream
744 6 : ) {
745 6 : block_engine_SubscribePacketsResponse res = block_engine_SubscribePacketsResponse_init_default;
746 6 : res.batch.packets = (pb_callback_t) {
747 6 : .funcs.decode = fd_bundle_client_visit_pb_packet,
748 6 : .arg = ctx
749 6 : };
750 6 : if( FD_UNLIKELY( !pb_decode( istream, &block_engine_SubscribePacketsResponse_msg, &res ) ) ) {
751 0 : ctx->metrics.decode_fail_cnt++;
752 0 : FD_LOG_WARNING(( "Protobuf decode of (block_engine.SubscribePacketsResponse) failed" ));
753 0 : return;
754 0 : }
755 :
756 6 : fd_bundle_client_sample_rx_delay( ctx, &res.header.ts );
757 6 : }
758 :
759 : /* Handle a BlockBuilderFeeInfoResponse from a GetBlockBuilderFeeInfo
760 : gRPC call. */
761 :
762 : static void
763 : fd_bundle_client_handle_builder_fee_info(
764 : fd_bundle_tile_t * ctx,
765 : pb_istream_t * istream
766 9 : ) {
767 9 : block_engine_BlockBuilderFeeInfoResponse res = block_engine_BlockBuilderFeeInfoResponse_init_default;
768 9 : if( FD_UNLIKELY( !pb_decode( istream, &block_engine_BlockBuilderFeeInfoResponse_msg, &res ) ) ) {
769 0 : ctx->metrics.decode_fail_cnt++;
770 0 : FD_LOG_WARNING(( "Protobuf decode of (block_engine.BlockBuilderFeeInfoResponse) failed" ));
771 0 : return;
772 0 : }
773 9 : if( FD_UNLIKELY( res.commission > 100 ) ) {
774 3 : ctx->metrics.decode_fail_cnt++;
775 3 : FD_LOG_WARNING(( "BlockBuilderFeeInfoResponse commission out of range (0-100): %lu", res.commission ));
776 3 : return;
777 3 : }
778 :
779 6 : ctx->builder_commission = (uchar)res.commission;
780 6 : if( FD_UNLIKELY( !fd_base58_decode_32( res.pubkey, ctx->builder_pubkey ) ) ) {
781 3 : FD_LOG_HEXDUMP_WARNING(( "Invalid pubkey in BlockBuilderFeeInfoResponse", res.pubkey, strnlen( res.pubkey, sizeof(res.pubkey) ) ));
782 3 : return;
783 3 : }
784 :
785 3 : long validity_duration_ns = (long)( 60e9 * 5. ); /* 5 minutes */
786 3 : ctx->builder_info_avail = 1;
787 3 : ctx->builder_info_valid_until = fd_bundle_now() + validity_duration_ns;
788 3 : }
789 :
790 : static void
791 : fd_bundle_client_grpc_tx_complete(
792 : void * app_ctx,
793 : ulong request_ctx
794 9 : ) {
795 9 : (void)app_ctx; (void)request_ctx;
796 9 : }
797 :
798 : void
799 : fd_bundle_client_grpc_rx_start(
800 : void * app_ctx,
801 : ulong request_ctx
802 9 : ) {
803 9 : fd_bundle_tile_t * ctx = app_ctx;
804 9 : switch( request_ctx ) {
805 3 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
806 3 : ctx->packet_subscription_live = 1;
807 3 : ctx->packet_subscription_wait = 0;
808 3 : break;
809 3 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
810 3 : ctx->bundle_subscription_live = 1;
811 3 : ctx->bundle_subscription_wait = 0;
812 3 : break;
813 9 : }
814 9 : }
815 :
816 : void
817 : fd_bundle_client_grpc_rx_msg(
818 : void * app_ctx,
819 : void const * protobuf,
820 : ulong protobuf_sz,
821 : ulong request_ctx
822 27 : ) {
823 27 : fd_bundle_tile_t * ctx = app_ctx;
824 27 : ctx->metrics.proto_received_bytes += protobuf_sz;
825 27 : pb_istream_t istream = pb_istream_from_buffer( protobuf, protobuf_sz );
826 27 : switch( request_ctx ) {
827 0 : case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
828 0 : if( FD_UNLIKELY( !fd_bundle_auther_handle_challenge_resp( &ctx->auther, protobuf, protobuf_sz ) ) ) {
829 0 : ctx->metrics.decode_fail_cnt++;
830 0 : fd_bundle_tile_backoff( ctx, fd_bundle_now() );
831 0 : }
832 0 : break;
833 0 : case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
834 0 : if( FD_UNLIKELY( !fd_bundle_auther_handle_tokens_resp( &ctx->auther, protobuf, protobuf_sz ) ) ) {
835 0 : ctx->metrics.decode_fail_cnt++;
836 0 : fd_bundle_tile_backoff( ctx, fd_bundle_now() );
837 0 : }
838 0 : break;
839 12 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
840 12 : fd_bundle_client_handle_bundle_batch( ctx, &istream );
841 12 : break;
842 6 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
843 6 : fd_bundle_client_handle_packet_batch( ctx, &istream );
844 6 : break;
845 9 : case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
846 9 : fd_bundle_client_handle_builder_fee_info( ctx, &istream );
847 9 : break;
848 0 : default:
849 0 : FD_LOG_ERR(( "Received unexpected gRPC message (request_ctx=%lu)", request_ctx ));
850 27 : }
851 27 : }
852 :
853 : static void
854 : fd_bundle_client_request_failed( fd_bundle_tile_t * ctx,
855 0 : ulong request_ctx ) {
856 0 : fd_bundle_tile_backoff( ctx, fd_bundle_now() );
857 0 : switch( request_ctx ) {
858 0 : case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
859 0 : case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
860 0 : fd_bundle_auther_handle_request_fail( &ctx->auther );
861 0 : break;
862 0 : }
863 0 : }
864 :
865 : void
866 : fd_bundle_client_grpc_rx_end(
867 : void * app_ctx,
868 : ulong request_ctx,
869 : fd_grpc_resp_hdrs_t * resp
870 12 : ) {
871 12 : fd_bundle_tile_t * ctx = app_ctx;
872 12 : if( FD_UNLIKELY( resp->h2_status!=200 ) ) {
873 0 : FD_LOG_WARNING(( "gRPC request failed (HTTP status %u)", resp->h2_status ));
874 0 : fd_bundle_client_request_failed( ctx, request_ctx );
875 0 : return;
876 0 : }
877 :
878 12 : resp->grpc_msg_len = (uint)fd_url_unescape( resp->grpc_msg, resp->grpc_msg_len );
879 12 : if( !resp->grpc_msg_len ) {
880 12 : fd_memcpy( resp->grpc_msg, "unknown error", 13 );
881 12 : resp->grpc_msg_len = 13;
882 12 : }
883 :
884 12 : switch( request_ctx ) {
885 0 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
886 0 : ctx->packet_subscription_live = 0;
887 0 : ctx->packet_subscription_wait = 0;
888 0 : fd_bundle_tile_backoff( ctx, fd_bundle_now() );
889 0 : ctx->defer_reset = 1;
890 0 : FD_LOG_INFO(( "SubscribePackets stream failed (gRPC status %u-%s). Reconnecting ...",
891 0 : resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ) ));
892 0 : return;
893 9 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
894 9 : ctx->bundle_subscription_live = 0;
895 9 : ctx->bundle_subscription_wait = 0;
896 9 : fd_bundle_tile_backoff( ctx, fd_bundle_now() );
897 9 : ctx->defer_reset = 1;
898 9 : FD_LOG_INFO(( "SubscribeBundles stream failed (gRPC status %u-%s). Reconnecting ...",
899 9 : resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ) ));
900 9 : return;
901 3 : case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
902 3 : ctx->builder_info_wait = 0;
903 3 : break;
904 0 : default:
905 0 : break;
906 12 : }
907 :
908 3 : if( FD_UNLIKELY( resp->grpc_status!=FD_GRPC_STATUS_OK ) ) {
909 0 : FD_LOG_INFO(( "gRPC request failed (gRPC status %u-%s): %.*s",
910 0 : resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ),
911 0 : (int)resp->grpc_msg_len, resp->grpc_msg ));
912 0 : fd_bundle_client_request_failed( ctx, request_ctx );
913 0 : if( resp->grpc_status==FD_GRPC_STATUS_UNAUTHENTICATED ||
914 0 : resp->grpc_status==FD_GRPC_STATUS_PERMISSION_DENIED ) {
915 0 : fd_bundle_auther_reset( &ctx->auther );
916 0 : }
917 0 : return;
918 0 : }
919 3 : }
920 :
921 : void
922 : fd_bundle_client_grpc_rx_timeout(
923 : void * app_ctx,
924 : ulong request_ctx, /* FD_BUNDLE_CLIENT_REQ_{...} */
925 : int deadline_kind /* FD_GRPC_DEADLINE_{HEADER|RX_END} */
926 6 : ) {
927 6 : (void)deadline_kind;
928 6 : FD_LOG_WARNING(( "Request timed out: %s", fd_bundle_request_ctx_cstr( request_ctx ) ));
929 6 : fd_bundle_tile_t * ctx = app_ctx;
930 6 : ctx->defer_reset = 1;
931 6 : }
932 :
933 : static void
934 3 : fd_bundle_client_grpc_ping_ack( void * app_ctx ) {
935 3 : fd_bundle_tile_t * ctx = app_ctx;
936 3 : long rtt_sample = fd_keepalive_rx( ctx->keepalive, fd_bundle_now() );
937 3 : if( FD_LIKELY( rtt_sample ) ) {
938 3 : fd_rtt_sample( ctx->rtt, (float)rtt_sample, 0 );
939 3 : FD_LOG_DEBUG(( "Keepalive ACK" ));
940 3 : }
941 3 : ctx->metrics.ping_ack_cnt++;
942 3 : }
943 :
944 : fd_grpc_client_callbacks_t fd_bundle_client_grpc_callbacks = {
945 : .conn_established = fd_bundle_client_grpc_conn_established,
946 : .conn_dead = fd_bundle_client_grpc_conn_dead,
947 : .tx_complete = fd_bundle_client_grpc_tx_complete,
948 : .rx_start = fd_bundle_client_grpc_rx_start,
949 : .rx_msg = fd_bundle_client_grpc_rx_msg,
950 : .rx_end = fd_bundle_client_grpc_rx_end,
951 : .rx_timeout = fd_bundle_client_grpc_rx_timeout,
952 : .ping_ack = fd_bundle_client_grpc_ping_ack,
953 : };
954 :
955 : /* Decrease verbosity */
956 12 : #define DISCONNECTED FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_DISCONNECTED
957 54 : #define CONNECTING FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTING
958 108 : #define CONNECTED FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTED
959 :
960 : int
961 174 : fd_bundle_client_status( fd_bundle_tile_t const * ctx ) {
962 174 : if( FD_UNLIKELY( ( !ctx->tcp_sock_connected ) |
963 174 : ( !ctx->grpc_client ) ) ) {
964 3 : return DISCONNECTED;
965 3 : }
966 :
967 171 : fd_h2_conn_t * conn = fd_grpc_client_h2_conn( ctx->grpc_client );
968 171 : if( FD_UNLIKELY( !conn ) ) {
969 0 : return DISCONNECTED; /* no conn */
970 0 : }
971 171 : if( FD_UNLIKELY( conn->flags &
972 171 : ( FD_H2_CONN_FLAGS_DEAD |
973 171 : FD_H2_CONN_FLAGS_SEND_GOAWAY ) ) ) {
974 6 : return DISCONNECTED;
975 6 : }
976 :
977 165 : if( FD_UNLIKELY( conn->flags &
978 165 : ( FD_H2_CONN_FLAGS_CLIENT_INITIAL |
979 165 : FD_H2_CONN_FLAGS_WAIT_SETTINGS_ACK_0 |
980 165 : FD_H2_CONN_FLAGS_WAIT_SETTINGS_0 |
981 165 : FD_H2_CONN_FLAGS_SERVER_INITIAL ) ) ) {
982 12 : return CONNECTING; /* connection is not ready */
983 12 : }
984 :
985 153 : if( FD_UNLIKELY( ctx->auther.state != FD_BUNDLE_AUTH_STATE_DONE_WAIT ) ) {
986 12 : return CONNECTING; /* not authenticated */
987 12 : }
988 :
989 141 : if( FD_UNLIKELY( ( !ctx->builder_info_avail ) |
990 141 : ( !ctx->packet_subscription_live ) |
991 141 : ( !ctx->bundle_subscription_live ) ) ) {
992 27 : return CONNECTING; /* not fully connected */
993 27 : }
994 :
995 114 : if( FD_UNLIKELY( fd_keepalive_is_timeout( ctx->keepalive, fd_bundle_now() ) ) ) {
996 3 : return DISCONNECTED; /* possible timeout */
997 3 : }
998 :
999 111 : if( FD_UNLIKELY( !fd_grpc_client_is_connected( ctx->grpc_client ) ) ) {
1000 3 : return CONNECTING;
1001 3 : }
1002 :
1003 : /* As far as we know, the bundle connection is alive and well. */
1004 108 : return CONNECTED;
1005 111 : }
1006 :
1007 : #undef DISCONNECTED
1008 : #undef CONNECTING
1009 : #undef CONNECTED
1010 :
1011 : FD_FN_CONST char const *
1012 6 : fd_bundle_request_ctx_cstr( ulong request_ctx ) {
1013 6 : switch( request_ctx ) {
1014 0 : case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
1015 0 : return "GenerateAuthChallenge";
1016 0 : case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
1017 0 : return "GenerateAuthTokens";
1018 0 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
1019 0 : return "SubscribePackets";
1020 6 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
1021 6 : return "SubscribeBundles";
1022 0 : case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
1023 0 : return "GetBlockBuilderFeeInfo";
1024 0 : default:
1025 0 : return "unknown";
1026 6 : }
1027 6 : }
|