Line data Source code
1 : /* fd_bundle_client.c steps gRPC related tasks. */
2 :
3 : #define _GNU_SOURCE /* SOL_TCP */
4 : #include "fd_bundle_auth.h"
5 : #include "fd_bundle_tile_private.h"
6 : #include "proto/block_engine.pb.h"
7 : #include "proto/bundle.pb.h"
8 : #include "proto/packet.pb.h"
9 : #include "../fd_txn_m_t.h"
10 : #include "../plugin/fd_plugin.h"
11 : #include "../../waltz/h2/fd_h2_conn.h"
12 : #include "../../waltz/http/fd_url.h" /* fd_url_unescape */
13 : #include "../../ballet/base58/fd_base58.h"
14 : #include "../../ballet/nanopb/pb_decode.h"
15 : #include "../../util/net/fd_ip4.h"
16 :
17 : #include <fcntl.h>
18 : #include <errno.h>
19 : #include <unistd.h> /* close */
20 : #include <poll.h> /* poll */
21 : #include <sys/socket.h> /* socket */
22 : #include <netinet/in.h>
23 : #include <netinet/ip.h>
24 : #include <netinet/tcp.h>
25 :
26 9 : #define FD_BUNDLE_CLIENT_REQUEST_TIMEOUT ((long)8e9) /* 8 seconds */
27 :
28 : #define FD_BUNDLE_CLIENT_MAX_TXN_PER_BUNDLE (5UL)
29 :
30 : __attribute__((weak)) long
31 684 : fd_bundle_now( void ) {
32 684 : return fd_log_wallclock();
33 684 : }
34 :
35 : void
36 3 : fd_bundle_client_reset( fd_bundle_tile_t * ctx ) {
37 3 : if( FD_UNLIKELY( ctx->tcp_sock >= 0 ) ) {
38 3 : if( FD_UNLIKELY( 0!=close( ctx->tcp_sock ) ) ) {
39 0 : FD_LOG_ERR(( "close(tcp_sock=%i) failed (%i-%s)", ctx->tcp_sock, errno, fd_io_strerror( errno ) ));
40 0 : }
41 3 : ctx->tcp_sock = -1;
42 3 : ctx->tcp_sock_connected = 0;
43 3 : }
44 3 : ctx->defer_reset = 0;
45 :
46 3 : ctx->builder_info_avail = 0;
47 3 : ctx->builder_info_wait = 0;
48 3 : ctx->packet_subscription_live = 0;
49 3 : ctx->packet_subscription_wait = 0;
50 3 : ctx->bundle_subscription_live = 0;
51 3 : ctx->bundle_subscription_wait = 0;
52 :
53 3 : memset( ctx->rtt, 0, sizeof(fd_rtt_estimate_t) );
54 :
55 3 : # if FD_HAS_OPENSSL
56 3 : if( FD_UNLIKELY( ctx->ssl ) ) {
57 0 : SSL_free( ctx->ssl );
58 0 : ctx->ssl = NULL;
59 0 : }
60 3 : # endif
61 :
62 3 : fd_bundle_tile_backoff( ctx, fd_bundle_now() );
63 :
64 3 : fd_bundle_auther_reset( &ctx->auther );
65 3 : fd_grpc_client_reset( ctx->grpc_client );
66 3 : }
67 :
68 : static int
69 : fd_bundle_client_do_connect( fd_bundle_tile_t const * ctx,
70 0 : uint ip4_addr ) {
71 0 : struct sockaddr_in addr = {
72 0 : .sin_family = AF_INET,
73 0 : .sin_addr.s_addr = ip4_addr,
74 0 : .sin_port = fd_ushort_bswap( ctx->server_tcp_port )
75 0 : };
76 0 : errno = 0;
77 0 : connect( ctx->tcp_sock, fd_type_pun_const( &addr ), sizeof(struct sockaddr_in) );
78 0 : return errno;
79 0 : }
80 :
81 : static void
82 0 : fd_bundle_client_create_conn( fd_bundle_tile_t * ctx ) {
83 0 : fd_bundle_client_reset( ctx );
84 :
85 : /* FIXME IPv6 support */
86 0 : fd_addrinfo_t hints = {0};
87 0 : hints.ai_family = AF_INET;
88 0 : fd_addrinfo_t * res = NULL;
89 0 : uchar scratch[ 4096 ];
90 0 : void * pscratch = scratch;
91 0 : int err = fd_getaddrinfo( ctx->server_fqdn, &hints, &res, &pscratch, sizeof(scratch) );
92 0 : if( FD_UNLIKELY( err ) ) {
93 0 : FD_LOG_WARNING(( "fd_getaddrinfo `%s` failed (%d-%s)", ctx->server_fqdn, err, fd_gai_strerror( err ) ));
94 0 : fd_bundle_client_reset( ctx );
95 0 : ctx->metrics.transport_fail_cnt++;
96 0 : return;
97 0 : }
98 0 : uint const ip4_addr = ((struct sockaddr_in *)res->ai_addr)->sin_addr.s_addr;
99 0 : ctx->server_ip4_addr = ip4_addr;
100 :
101 0 : int tcp_sock = socket( AF_INET, SOCK_STREAM|SOCK_CLOEXEC, 0 );
102 0 : if( FD_UNLIKELY( tcp_sock<0 ) ) {
103 0 : FD_LOG_ERR(( "socket(AF_INET,SOCK_STREAM|SOCK_CLOEXEC,0) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
104 0 : }
105 0 : ctx->tcp_sock = tcp_sock;
106 :
107 0 : if( FD_UNLIKELY( 0!=setsockopt( tcp_sock, SOL_SOCKET, SO_RCVBUF, &ctx->so_rcvbuf, sizeof(int) ) ) ) {
108 0 : FD_LOG_ERR(( "setsockopt(SOL_SOCKET,SO_RCVBUF,%i) failed (%i-%s)", ctx->so_rcvbuf, errno, fd_io_strerror( errno ) ));
109 0 : }
110 :
111 0 : int tcp_nodelay = 1;
112 0 : if( FD_UNLIKELY( 0!=setsockopt( tcp_sock, SOL_TCP, TCP_NODELAY, &tcp_nodelay, sizeof(int) ) ) ) {
113 0 : FD_LOG_ERR(( "setsockopt failed (%d-%s)", errno, fd_io_strerror( errno ) ));
114 0 : }
115 :
116 0 : if( FD_UNLIKELY( fcntl( tcp_sock, F_SETFL, O_NONBLOCK )==-1 ) ) {
117 0 : FD_LOG_ERR(( "fcntl(tcp_sock,F_SETFL,O_NONBLOCK) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
118 0 : }
119 :
120 0 : char const * scheme = "http";
121 0 : # if FD_HAS_OPENSSL
122 0 : if( ctx->is_ssl ) scheme = "https";
123 0 : # endif
124 :
125 0 : FD_LOG_INFO(( "Connecting to %s://" FD_IP4_ADDR_FMT ":%hu (%.*s)",
126 0 : scheme,
127 0 : FD_IP4_ADDR_FMT_ARGS( ip4_addr ), ctx->server_tcp_port,
128 0 : (int)ctx->server_sni_len, ctx->server_sni ));
129 :
130 0 : int connect_err = fd_bundle_client_do_connect( ctx, ip4_addr );
131 0 : if( FD_UNLIKELY( connect_err ) ) {
132 0 : if( FD_UNLIKELY( connect_err!=EINPROGRESS ) ) {
133 0 : FD_LOG_WARNING(( "connect(tcp_sock," FD_IP4_ADDR_FMT ":%u) failed (%i-%s)",
134 0 : FD_IP4_ADDR_FMT_ARGS( ip4_addr ), ctx->server_tcp_port,
135 0 : connect_err, fd_io_strerror( connect_err ) ));
136 0 : fd_bundle_client_reset( ctx );
137 0 : ctx->metrics.transport_fail_cnt++;
138 0 : return;
139 0 : }
140 0 : }
141 :
142 0 : # if FD_HAS_OPENSSL
143 0 : if( ctx->is_ssl ) {
144 0 : BIO * bio = BIO_new_socket( ctx->tcp_sock, BIO_NOCLOSE );
145 0 : if( FD_UNLIKELY( !bio ) ) {
146 0 : FD_LOG_ERR(( "BIO_new_socket failed" ));
147 0 : }
148 :
149 0 : SSL * ssl = SSL_new( ctx->ssl_ctx );
150 0 : if( FD_UNLIKELY( !ssl ) ) {
151 0 : FD_LOG_ERR(( "SSL_new failed" ));
152 0 : }
153 :
154 0 : SSL_set_bio( ssl, bio, bio ); /* moves ownership of bio */
155 0 : SSL_set_connect_state( ssl );
156 :
157 : /* Indicate to endpoint which server name we want */
158 0 : if( FD_UNLIKELY( !SSL_set_tlsext_host_name( ssl, ctx->server_sni ) ) ) {
159 0 : FD_LOG_ERR(( "SSL_set_tlsext_host_name failed" ));
160 0 : }
161 :
162 : /* Enable hostname verification */
163 0 : if( FD_UNLIKELY( !SSL_set1_host( ssl, ctx->server_sni ) ) ) {
164 0 : FD_LOG_ERR(( "SSL_set1_host failed" ));
165 0 : }
166 :
167 0 : ctx->ssl = ssl;
168 0 : }
169 0 : # endif /* FD_HAS_OPENSSL */
170 :
171 0 : fd_grpc_client_reset( ctx->grpc_client );
172 0 : fd_keepalive_init( ctx->keepalive, ctx->rng, ctx->keepalive_interval, ctx->keepalive_interval, fd_bundle_now() );
173 0 : }
174 :
175 : static int
176 : fd_bundle_client_drive_io( fd_bundle_tile_t * ctx,
177 27 : int * charge_busy ) {
178 27 : # if FD_HAS_OPENSSL
179 27 : if( ctx->is_ssl ) {
180 0 : return fd_grpc_client_rxtx_ossl( ctx->grpc_client, ctx->ssl, charge_busy );
181 0 : }
182 27 : # endif /* FD_HAS_OPENSSL */
183 :
184 27 : return fd_grpc_client_rxtx_socket( ctx->grpc_client, ctx->tcp_sock, charge_busy );
185 27 : }
186 :
187 : static void
188 3 : fd_bundle_client_request_builder_info( fd_bundle_tile_t * ctx ) {
189 3 : if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
190 :
191 3 : block_engine_BlockBuilderFeeInfoRequest req = block_engine_BlockBuilderFeeInfoRequest_init_default;
192 3 : static char const path[] = "/block_engine.BlockEngineValidator/GetBlockBuilderFeeInfo";
193 3 : fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
194 3 : ctx->grpc_client,
195 3 : path, sizeof(path)-1,
196 3 : FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo,
197 3 : &block_engine_BlockBuilderFeeInfoRequest_msg, &req,
198 3 : ctx->auther.access_token, ctx->auther.access_token_sz
199 3 : );
200 3 : if( FD_UNLIKELY( !request ) ) return;
201 3 : fd_grpc_client_deadline_set(
202 3 : request,
203 3 : FD_GRPC_DEADLINE_RX_END,
204 3 : fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
205 :
206 3 : ctx->builder_info_wait = 1;
207 3 : }
208 :
209 : static void
210 3 : fd_bundle_client_subscribe_packets( fd_bundle_tile_t * ctx ) {
211 3 : if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
212 :
213 3 : block_engine_SubscribePacketsRequest req = block_engine_SubscribePacketsRequest_init_default;
214 3 : static char const path[] = "/block_engine.BlockEngineValidator/SubscribePackets";
215 3 : fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
216 3 : ctx->grpc_client,
217 3 : path, sizeof(path)-1,
218 3 : FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets,
219 3 : &block_engine_SubscribePacketsRequest_msg, &req,
220 3 : ctx->auther.access_token, ctx->auther.access_token_sz
221 3 : );
222 3 : if( FD_UNLIKELY( !request ) ) return;
223 3 : fd_grpc_client_deadline_set(
224 3 : request,
225 3 : FD_GRPC_DEADLINE_HEADER,
226 3 : fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
227 :
228 3 : ctx->packet_subscription_wait = 1;
229 3 : }
230 :
231 : static void
232 3 : fd_bundle_client_subscribe_bundles( fd_bundle_tile_t * ctx ) {
233 3 : if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
234 :
235 3 : block_engine_SubscribeBundlesRequest req = block_engine_SubscribeBundlesRequest_init_default;
236 3 : static char const path[] = "/block_engine.BlockEngineValidator/SubscribeBundles";
237 3 : fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
238 3 : ctx->grpc_client,
239 3 : path, sizeof(path)-1,
240 3 : FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles,
241 3 : &block_engine_SubscribeBundlesRequest_msg, &req,
242 3 : ctx->auther.access_token, ctx->auther.access_token_sz
243 3 : );
244 3 : if( FD_UNLIKELY( !request ) ) return;
245 3 : fd_grpc_client_deadline_set(
246 3 : request,
247 3 : FD_GRPC_DEADLINE_HEADER,
248 3 : fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
249 :
250 3 : ctx->bundle_subscription_wait = 1;
251 3 : }
252 :
253 : void
254 3 : fd_bundle_client_send_ping( fd_bundle_tile_t * ctx ) {
255 3 : if( FD_UNLIKELY( !ctx->grpc_client ) ) return; /* no client */
256 3 : fd_h2_conn_t * conn = fd_grpc_client_h2_conn( ctx->grpc_client );
257 3 : if( FD_UNLIKELY( !conn ) ) return; /* no conn */
258 3 : if( FD_UNLIKELY( conn->flags ) ) return; /* conn busy */
259 3 : fd_h2_rbuf_t * rbuf_tx = fd_grpc_client_rbuf_tx( ctx->grpc_client );
260 :
261 3 : if( FD_LIKELY( fd_h2_tx_ping( conn, rbuf_tx ) ) ) {
262 3 : long now = fd_bundle_now();
263 3 : fd_keepalive_tx( ctx->keepalive, ctx->rng, now );
264 3 : FD_LOG_DEBUG(( "Keepalive TX (deadline=+%gs)", (double)( ctx->keepalive->ts_deadline-now )/1e9 ));
265 3 : }
266 3 : }
267 :
268 : int
269 : fd_bundle_client_step_reconnect( fd_bundle_tile_t * ctx,
270 18 : long now ) {
271 : /* Drive auth */
272 18 : if( FD_UNLIKELY( ctx->auther.needs_poll ) ) {
273 0 : fd_bundle_auther_poll( &ctx->auther, ctx->grpc_client, ctx->keyguard_client );
274 0 : return 1;
275 0 : }
276 18 : if( FD_UNLIKELY( ctx->auther.state!=FD_BUNDLE_AUTH_STATE_DONE_WAIT ) ) return 0;
277 :
278 : /* Request block builder info */
279 18 : int const builder_info_expired = ( ctx->builder_info_valid_until - now )<0;
280 18 : if( FD_UNLIKELY( ( ( !ctx->builder_info_avail ) |
281 18 : ( !builder_info_expired ) ) &
282 18 : ( !ctx->builder_info_wait ) ) ) {
283 3 : fd_bundle_client_request_builder_info( ctx );
284 3 : return 1;
285 3 : }
286 :
287 : /* Subscribe to packets */
288 15 : if( FD_UNLIKELY( !ctx->packet_subscription_live && !ctx->packet_subscription_wait ) ) {
289 3 : fd_bundle_client_subscribe_packets( ctx );
290 3 : return 1;
291 3 : }
292 :
293 : /* Subscribe to bundles */
294 12 : if( FD_UNLIKELY( !ctx->bundle_subscription_live && !ctx->bundle_subscription_wait ) ) {
295 3 : fd_bundle_client_subscribe_bundles( ctx );
296 3 : return 1;
297 3 : }
298 :
299 : /* Send a PING */
300 9 : if( FD_UNLIKELY( fd_keepalive_should_tx( ctx->keepalive, now ) ) ) {
301 3 : fd_bundle_client_send_ping( ctx );
302 3 : return 1;
303 3 : }
304 :
305 6 : return 0;
306 9 : }
307 :
308 : static void
309 : fd_bundle_client_step1( fd_bundle_tile_t * ctx,
310 30 : int * charge_busy ) {
311 :
312 : /* Wait for TCP socket to connect */
313 30 : if( FD_UNLIKELY( !ctx->tcp_sock_connected ) ) {
314 0 : if( FD_UNLIKELY( ctx->tcp_sock < 0 ) ) goto reconnect;
315 :
316 0 : struct pollfd pfds[1] = {
317 0 : { .fd = ctx->tcp_sock, .events = POLLOUT }
318 0 : };
319 0 : int poll_res = fd_syscall_poll( pfds, 1, 0 );
320 0 : if( FD_UNLIKELY( poll_res<0 ) ) {
321 0 : FD_LOG_ERR(( "fd_syscall_poll(tcp_sock) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
322 0 : }
323 0 : if( poll_res==0 ) return;
324 :
325 0 : if( pfds[0].revents & (POLLERR|POLLHUP) ) {
326 0 : int connect_err = fd_bundle_client_do_connect( ctx, 0 );
327 0 : FD_LOG_INFO(( "Bundle gRPC connect attempt failed (%i-%s)", connect_err, fd_io_strerror( connect_err ) ));
328 0 : fd_bundle_client_reset( ctx );
329 0 : ctx->metrics.transport_fail_cnt++;
330 0 : *charge_busy = 1;
331 0 : return;
332 0 : }
333 0 : if( pfds[0].revents & POLLOUT ) {
334 0 : FD_LOG_DEBUG(( "Bundle TCP socket connected" ));
335 0 : ctx->tcp_sock_connected = 1;
336 0 : *charge_busy = 1;
337 0 : return;
338 0 : }
339 0 : return;
340 0 : }
341 :
342 : /* gRPC conn died? */
343 30 : if( FD_UNLIKELY( !ctx->grpc_client ) ) {
344 0 : long sleep_start;
345 0 : reconnect:
346 0 : sleep_start = fd_bundle_now();
347 0 : if( FD_UNLIKELY( fd_bundle_tile_should_stall( ctx, sleep_start ) ) ) {
348 0 : long wait_dur = ctx->backoff_until - sleep_start;
349 0 : fd_log_sleep( fd_long_min( wait_dur, 1e6 ) );
350 0 : return;
351 0 : }
352 0 : fd_bundle_client_create_conn( ctx );
353 0 : *charge_busy = 1;
354 0 : return;
355 0 : }
356 :
357 : /* Did a HTTP/2 PING time out */
358 30 : long check_ts = ctx->cached_ts = fd_bundle_now();
359 30 : if( FD_UNLIKELY( fd_keepalive_is_timeout( ctx->keepalive, check_ts ) ) ) {
360 3 : FD_LOG_WARNING(( "Bundle gRPC timed out (HTTP/2 PING went unanswered for %.2f seconds)",
361 3 : (double)( check_ts - ctx->keepalive->ts_last_tx )/1e9 ));
362 3 : ctx->keepalive->inflight = 0;
363 3 : ctx->defer_reset = 1;
364 3 : *charge_busy = 1;
365 3 : return;
366 3 : }
367 :
368 : /* Drive I/O, SSL handshake, and any inflight requests */
369 27 : if( FD_UNLIKELY( !fd_bundle_client_drive_io( ctx, charge_busy ) ||
370 27 : ctx->defer_reset /* new error? */ ) ) {
371 0 : fd_bundle_client_reset( ctx );
372 0 : ctx->metrics.transport_fail_cnt++;
373 0 : *charge_busy = 1;
374 0 : return;
375 0 : }
376 :
377 : /* Are we ready to issue a new request? */
378 27 : if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
379 18 : long io_ts = fd_bundle_now();
380 18 : if( FD_UNLIKELY( fd_bundle_tile_should_stall( ctx, io_ts ) ) ) return;
381 :
382 18 : *charge_busy |= fd_bundle_client_step_reconnect( ctx, io_ts );
383 18 : }
384 :
385 : static void
386 30 : fd_bundle_client_log_status( fd_bundle_tile_t * ctx ) {
387 30 : int status = fd_bundle_client_status( ctx );
388 :
389 30 : int const connected_now = ( status==FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTED );
390 30 : int const connected_before = ( ctx->bundle_status_logged==FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTED );
391 :
392 30 : if( FD_UNLIKELY( connected_now!=connected_before ) ) {
393 3 : long ts = fd_log_wallclock();
394 3 : if( FD_LIKELY( ts-(ctx->last_bundle_status_log_nanos) >= (long)1e6 ) ) {
395 3 : if( connected_now ) {
396 3 : FD_LOG_NOTICE(( "Connected to bundle server" ));
397 3 : } else {
398 0 : FD_LOG_WARNING(( "Disconnected from bundle server" ));
399 0 : }
400 3 : ctx->last_bundle_status_log_nanos = ts;
401 3 : ctx->bundle_status_logged = (uchar)status;
402 3 : }
403 3 : }
404 30 : }
405 :
406 : void
407 : fd_bundle_client_step( fd_bundle_tile_t * ctx,
408 30 : int * charge_busy ) {
409 : /* Edge-trigger logging with rate limiting */
410 30 : fd_bundle_client_step1( ctx, charge_busy );
411 30 : fd_bundle_client_log_status( ctx );
412 30 : }
413 :
414 : void
415 : fd_bundle_tile_backoff( fd_bundle_tile_t * ctx,
416 12 : long now ) {
417 12 : uint iter = ctx->backoff_iter;
418 12 : if( now < ctx->backoff_reset ) iter = 0U;
419 12 : iter++;
420 :
421 : /* FIXME proper backoff */
422 12 : long wait_ns = (long)2e9;
423 12 : wait_ns = (long)( fd_rng_ulong( ctx->rng ) & ( (1UL<<fd_ulong_find_msb_w_default( (ulong)wait_ns, 0 ))-1UL ) );
424 :
425 12 : ctx->backoff_until = now + wait_ns;
426 12 : ctx->backoff_reset = now + 2*wait_ns;
427 :
428 12 : ctx->backoff_iter = iter;
429 12 : }
430 :
431 : static void
432 0 : fd_bundle_client_grpc_conn_established( void * app_ctx ) {
433 0 : (void)app_ctx;
434 0 : FD_LOG_INFO(( "Bundle gRPC connection established" ));
435 0 : }
436 :
437 : static void
438 : fd_bundle_client_grpc_conn_dead( void * app_ctx,
439 : uint h2_err,
440 0 : int closed_by ) {
441 0 : fd_bundle_tile_t * ctx = app_ctx;
442 0 : FD_LOG_INFO(( "Bundle gRPC connection closed %s (%u-%s)",
443 0 : closed_by ? "by peer" : "due to error",
444 0 : h2_err, fd_h2_strerror( h2_err ) ));
445 0 : ctx->defer_reset = 1;
446 0 : }
447 :
448 : /* Forwards a bundle transaction to the tango message bus. */
449 :
450 : static void
451 : fd_bundle_tile_publish_bundle_txn(
452 : fd_bundle_tile_t * ctx,
453 : void const * txn,
454 : ulong txn_sz, /* <=FD_TXN_MTU */
455 : ulong bundle_txn_cnt,
456 : uint source_ipv4
457 30 : ) {
458 30 : if( FD_UNLIKELY( !ctx->builder_info_avail ) ) {
459 0 : ctx->metrics.missing_builder_info_fail_cnt++; /* unreachable */
460 0 : return;
461 0 : }
462 :
463 30 : fd_txn_m_t * txnm = fd_chunk_to_laddr( ctx->verify_out.mem, ctx->verify_out.chunk );
464 30 : *txnm = (fd_txn_m_t) {
465 30 : .reference_slot = 0UL,
466 30 : .payload_sz = (ushort)txn_sz,
467 30 : .txn_t_sz = 0U,
468 30 : .source_ipv4 = source_ipv4,
469 30 : .source_tpu = FD_TXN_M_TPU_SOURCE_BUNDLE,
470 30 : .block_engine = {
471 30 : .bundle_id = ctx->bundle_seq,
472 30 : .bundle_txn_cnt = bundle_txn_cnt,
473 30 : .commission = (uchar)ctx->builder_commission
474 30 : },
475 30 : };
476 30 : memcpy( txnm->block_engine.commission_pubkey, ctx->builder_pubkey, 32UL );
477 30 : fd_memcpy( fd_txn_m_payload( txnm ), txn, txn_sz );
478 :
479 30 : ulong sz = fd_txn_m_realized_footprint( txnm, 0, 0 );
480 30 : ulong sig = 1UL;
481 :
482 30 : if( FD_UNLIKELY( !ctx->stem ) ) {
483 0 : FD_LOG_CRIT(( "ctx->stem not set. This is a bug." ));
484 0 : }
485 :
486 30 : ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_bundle_now() );
487 30 : fd_stem_publish( ctx->stem, ctx->verify_out.idx, sig, ctx->verify_out.chunk, sz, 0UL, 0UL, tspub );
488 30 : ctx->verify_out.chunk = fd_dcache_compact_next( ctx->verify_out.chunk, sz, ctx->verify_out.chunk0, ctx->verify_out.wmark );
489 30 : ctx->metrics.txn_received_cnt++;
490 30 : }
491 :
492 : /* Forwards a regular transaction to the tango message bus. */
493 :
494 : static void
495 : fd_bundle_tile_publish_txn(
496 : fd_bundle_tile_t * ctx,
497 : void const * txn,
498 : ulong txn_sz, /* <=FD_TXN_MTU */
499 : uint source_ipv4
500 9 : ) {
501 9 : fd_txn_m_t * txnm = fd_chunk_to_laddr( ctx->verify_out.mem, ctx->verify_out.chunk );
502 9 : *txnm = (fd_txn_m_t) {
503 9 : .reference_slot = 0UL,
504 9 : .payload_sz = (ushort)txn_sz,
505 9 : .txn_t_sz = 0U,
506 9 : .source_ipv4 = source_ipv4,
507 9 : .source_tpu = FD_TXN_M_TPU_SOURCE_BUNDLE,
508 9 : .block_engine = {
509 9 : .bundle_id = 0UL,
510 9 : .bundle_txn_cnt = 1UL,
511 9 : .commission = 0U,
512 9 : .commission_pubkey = {0U}
513 9 : },
514 9 : };
515 9 : fd_memcpy( fd_txn_m_payload( txnm ), txn, txn_sz );
516 :
517 9 : ulong sz = fd_txn_m_realized_footprint( txnm, 0, 0 );
518 9 : ulong sig = 0UL;
519 :
520 9 : if( FD_UNLIKELY( !ctx->stem ) ) {
521 0 : FD_LOG_CRIT(( "ctx->stem not set. This is a bug." ));
522 0 : }
523 :
524 9 : ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_bundle_now() );
525 9 : fd_stem_publish( ctx->stem, ctx->verify_out.idx, sig, ctx->verify_out.chunk, sz, 0UL, 0UL, tspub );
526 9 : ctx->verify_out.chunk = fd_dcache_compact_next( ctx->verify_out.chunk, sz, ctx->verify_out.chunk0, ctx->verify_out.wmark );
527 9 : ctx->metrics.txn_received_cnt++;
528 9 : }
529 :
530 : /* Called for each transaction in a bundle. Simply counts up
531 : bundle_txn_cnt, but does not publish anything. */
532 :
533 : static bool
534 : fd_bundle_client_visit_pb_bundle_txn_preflight(
535 : pb_istream_t * istream,
536 : pb_field_t const * field,
537 : void ** arg
538 48 : ) {
539 48 : (void)istream; (void)field;
540 48 : fd_bundle_tile_t * ctx = *arg;
541 48 : ctx->bundle_txn_cnt++;
542 48 : return true;
543 48 : }
544 :
545 : /* Called for each transaction in a bundle. Publishes each transaction
546 : to the tango message bus. */
547 :
548 : static bool
549 : fd_bundle_client_visit_pb_bundle_txn(
550 : pb_istream_t * istream,
551 : pb_field_t const * field,
552 : void ** arg
553 30 : ) {
554 30 : (void)field;
555 30 : fd_bundle_tile_t * ctx = *arg;
556 :
557 30 : packet_Packet packet = packet_Packet_init_default;
558 30 : if( FD_UNLIKELY( !pb_decode( istream, &packet_Packet_msg, &packet ) ) ) {
559 0 : ctx->metrics.decode_fail_cnt++;
560 0 : FD_LOG_WARNING(( "Protobuf decode of (packet.Packet) failed" ));
561 0 : return false;
562 0 : }
563 :
564 30 : if( FD_UNLIKELY( packet.data.size == 0 ) ) {
565 0 : FD_LOG_WARNING(( "Bundle server delivered an empty packet, ignoring" ));
566 0 : return true;
567 0 : }
568 :
569 30 : if( FD_UNLIKELY( packet.data.size > FD_TXN_MTU ) ) {
570 0 : FD_LOG_WARNING(( "Bundle server delivered an oversize transaction, ignoring" ));
571 0 : return true;
572 0 : }
573 :
574 30 : uint _ip4; uint ip4 = fd_uint_if( packet.has_meta, fd_cstr_to_ip4_addr( packet.meta.addr, &_ip4 ) ? _ip4 : ctx->server_ip4_addr, ctx->server_ip4_addr );
575 30 : fd_bundle_tile_publish_bundle_txn(
576 30 : ctx,
577 30 : packet.data.bytes, packet.data.size,
578 30 : ctx->bundle_txn_cnt,
579 30 : ip4
580 30 : );
581 :
582 30 : return true;
583 30 : }
584 :
585 : static void
586 : fd_bundle_client_sample_rx_delay(
587 : fd_bundle_tile_t * ctx,
588 : google_protobuf_Timestamp const * ts
589 12 : ) {
590 12 : ulong tsorig = (ulong)ts->seconds*(ulong)1e9 + (ulong)ts->nanos;
591 12 : fd_histf_sample( ctx->metrics.msg_rx_delay, fd_ulong_sat_sub( (ulong)ctx->cached_ts, tsorig ) );
592 12 : }
593 :
594 : /* Called for each BundleUuid in a SubscribeBundlesResponse. */
595 :
596 : static bool
597 : fd_bundle_client_visit_pb_bundle_uuid(
598 : pb_istream_t * istream,
599 : pb_field_t const * field,
600 : void ** arg
601 9 : ) {
602 9 : (void)field;
603 9 : fd_bundle_tile_t * ctx = *arg;
604 :
605 : /* Reset bundle state */
606 :
607 9 : ctx->bundle_txn_cnt = 0UL;
608 :
609 : /* Do two decode passes. This is required because we need to know the
610 : number of transactions in a bundle ahead of time. However, due to
611 : the Protobuf wire encoding, we don't know the number of txns that
612 : will come until we've parsed everything.
613 :
614 : First pass: Count number of bundles. */
615 :
616 9 : pb_istream_t peek = *istream;
617 9 : bundle_BundleUuid bundle = bundle_BundleUuid_init_default;
618 9 : bundle.bundle.packets = (pb_callback_t) {
619 9 : .funcs.decode = fd_bundle_client_visit_pb_bundle_txn_preflight,
620 9 : .arg = ctx
621 9 : };
622 9 : if( FD_UNLIKELY( !pb_decode( &peek, &bundle_BundleUuid_msg, &bundle ) ) ) {
623 0 : ctx->metrics.decode_fail_cnt++;
624 0 : FD_LOG_WARNING(( "Protobuf decode of (bundle.BundleUuid) failed: %s", peek.errmsg ));
625 0 : return false;
626 0 : }
627 :
628 : /* At this opint, ctx->bundle_txn_cnt is correctly set. Too many txns
629 : is treated as a NOP.
630 :
631 : Second pass: Actually publish bundle packets */
632 :
633 9 : if( FD_UNLIKELY( ctx->bundle_txn_cnt>FD_BUNDLE_CLIENT_MAX_TXN_PER_BUNDLE ) ) return true;
634 :
635 6 : ctx->bundle_seq++;
636 6 : bundle = (bundle_BundleUuid)bundle_BundleUuid_init_default;
637 6 : bundle.bundle.packets = (pb_callback_t) {
638 6 : .funcs.decode = fd_bundle_client_visit_pb_bundle_txn,
639 6 : .arg = ctx
640 6 : };
641 :
642 6 : ctx->metrics.bundle_received_cnt++;
643 :
644 6 : if( FD_UNLIKELY( !pb_decode( istream, &bundle_BundleUuid_msg, &bundle ) ) ) {
645 0 : ctx->metrics.decode_fail_cnt++;
646 0 : FD_LOG_WARNING(( "Protobuf decode of (bundle.BundleUuid) failed (internal error): %s", istream->errmsg ));
647 0 : return false;
648 0 : }
649 :
650 6 : fd_bundle_client_sample_rx_delay( ctx, &bundle.bundle.header.ts );
651 :
652 6 : return true;
653 6 : }
654 :
655 : /* Handle a SubscribeBundlesResponse from a SubscribeBundles gRPC call. */
656 :
657 : static void
658 : fd_bundle_client_handle_bundle_batch(
659 : fd_bundle_tile_t * ctx,
660 : pb_istream_t * istream
661 12 : ) {
662 12 : if( FD_UNLIKELY( !ctx->builder_info_avail ) ) {
663 3 : ctx->metrics.missing_builder_info_fail_cnt++; /* unreachable */
664 3 : return;
665 3 : }
666 :
667 9 : block_engine_SubscribeBundlesResponse res = block_engine_SubscribeBundlesResponse_init_default;
668 9 : res.bundles = (pb_callback_t) {
669 9 : .funcs.decode = fd_bundle_client_visit_pb_bundle_uuid,
670 9 : .arg = ctx
671 9 : };
672 9 : if( FD_UNLIKELY( !pb_decode( istream, &block_engine_SubscribeBundlesResponse_msg, &res ) ) ) {
673 0 : ctx->metrics.decode_fail_cnt++;
674 0 : FD_LOG_WARNING(( "Protobuf decode of (block_engine.SubscribeBundlesResponse) failed: %s", istream->errmsg ));
675 0 : return;
676 0 : }
677 9 : }
678 :
679 : /* Called for each 'Packet' (a regular transaction) of a
680 : SubscribePacketsResponse. */
681 :
682 : static bool
683 : fd_bundle_client_visit_pb_packet(
684 : pb_istream_t * istream,
685 : pb_field_t const * field,
686 : void ** arg
687 9 : ) {
688 9 : (void)field;
689 9 : fd_bundle_tile_t * ctx = *arg;
690 :
691 9 : packet_Packet packet = packet_Packet_init_default;
692 9 : if( FD_UNLIKELY( !pb_decode( istream, &packet_Packet_msg, &packet ) ) ) {
693 0 : ctx->metrics.decode_fail_cnt++;
694 0 : FD_LOG_WARNING(( "Protobuf decode of (packet.Packet) failed" ));
695 0 : return false;
696 0 : }
697 :
698 9 : if( FD_UNLIKELY( packet.data.size == 0 ) ) {
699 0 : FD_LOG_WARNING(( "Bundle server delivered an empty packet, ignoring" ));
700 0 : return true;
701 0 : }
702 :
703 9 : if( FD_UNLIKELY( packet.data.size > FD_TXN_MTU ) ) {
704 0 : FD_LOG_WARNING(( "Bundle server delivered an oversize transaction, ignoring" ));
705 0 : return true;
706 0 : }
707 :
708 :
709 9 : uint _ip4; uint ip4 = fd_uint_if( packet.has_meta, fd_cstr_to_ip4_addr( packet.meta.addr, &_ip4 ) ? _ip4 : 0U, 0U );
710 9 : fd_bundle_tile_publish_txn( ctx, packet.data.bytes, packet.data.size, ip4 );
711 9 : ctx->metrics.packet_received_cnt++;
712 :
713 9 : return true;
714 9 : }
715 :
716 : /* Handle a SubscribePacketsResponse from a SubscribePackets gRPC call. */
717 :
718 : static void
719 : fd_bundle_client_handle_packet_batch(
720 : fd_bundle_tile_t * ctx,
721 : pb_istream_t * istream
722 6 : ) {
723 6 : block_engine_SubscribePacketsResponse res = block_engine_SubscribePacketsResponse_init_default;
724 6 : res.batch.packets = (pb_callback_t) {
725 6 : .funcs.decode = fd_bundle_client_visit_pb_packet,
726 6 : .arg = ctx
727 6 : };
728 6 : if( FD_UNLIKELY( !pb_decode( istream, &block_engine_SubscribePacketsResponse_msg, &res ) ) ) {
729 0 : ctx->metrics.decode_fail_cnt++;
730 0 : FD_LOG_WARNING(( "Protobuf decode of (block_engine.SubscribePacketsResponse) failed" ));
731 0 : return;
732 0 : }
733 :
734 6 : fd_bundle_client_sample_rx_delay( ctx, &res.header.ts );
735 6 : }
736 :
737 : /* Handle a BlockBuilderFeeInfoResponse from a GetBlockBuilderFeeInfo
738 : gRPC call. */
739 :
740 : static void
741 : fd_bundle_client_handle_builder_fee_info(
742 : fd_bundle_tile_t * ctx,
743 : pb_istream_t * istream
744 9 : ) {
745 9 : block_engine_BlockBuilderFeeInfoResponse res = block_engine_BlockBuilderFeeInfoResponse_init_default;
746 9 : if( FD_UNLIKELY( !pb_decode( istream, &block_engine_BlockBuilderFeeInfoResponse_msg, &res ) ) ) {
747 0 : ctx->metrics.decode_fail_cnt++;
748 0 : FD_LOG_WARNING(( "Protobuf decode of (block_engine.BlockBuilderFeeInfoResponse) failed" ));
749 0 : return;
750 0 : }
751 9 : if( FD_UNLIKELY( res.commission > 100 ) ) {
752 3 : ctx->metrics.decode_fail_cnt++;
753 3 : FD_LOG_WARNING(( "BlockBuilderFeeInfoResponse commission out of range (0-100): %lu", res.commission ));
754 3 : return;
755 3 : }
756 :
757 6 : ctx->builder_commission = (uchar)res.commission;
758 6 : if( FD_UNLIKELY( !fd_base58_decode_32( res.pubkey, ctx->builder_pubkey ) ) ) {
759 3 : FD_LOG_HEXDUMP_WARNING(( "Invalid pubkey in BlockBuilderFeeInfoResponse", res.pubkey, strnlen( res.pubkey, sizeof(res.pubkey) ) ));
760 3 : return;
761 3 : }
762 :
763 3 : long validity_duration_ns = (long)( 60e9 * 5. ); /* 5 minutes */
764 3 : ctx->builder_info_avail = 1;
765 3 : ctx->builder_info_valid_until = fd_bundle_now() + validity_duration_ns;
766 3 : }
767 :
768 : static void
769 : fd_bundle_client_grpc_tx_complete(
770 : void * app_ctx,
771 : ulong request_ctx
772 9 : ) {
773 9 : (void)app_ctx; (void)request_ctx;
774 9 : }
775 :
776 : void
777 : fd_bundle_client_grpc_rx_start(
778 : void * app_ctx,
779 : ulong request_ctx
780 9 : ) {
781 9 : fd_bundle_tile_t * ctx = app_ctx;
782 9 : switch( request_ctx ) {
783 3 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
784 3 : ctx->packet_subscription_live = 1;
785 3 : ctx->packet_subscription_wait = 0;
786 3 : break;
787 3 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
788 3 : ctx->bundle_subscription_live = 1;
789 3 : ctx->bundle_subscription_wait = 0;
790 3 : break;
791 9 : }
792 9 : }
793 :
794 : void
795 : fd_bundle_client_grpc_rx_msg(
796 : void * app_ctx,
797 : void const * protobuf,
798 : ulong protobuf_sz,
799 : ulong request_ctx
800 27 : ) {
801 27 : fd_bundle_tile_t * ctx = app_ctx;
802 27 : pb_istream_t istream = pb_istream_from_buffer( protobuf, protobuf_sz );
803 27 : switch( request_ctx ) {
804 0 : case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
805 0 : if( FD_UNLIKELY( !fd_bundle_auther_handle_challenge_resp( &ctx->auther, protobuf, protobuf_sz ) ) ) {
806 0 : ctx->metrics.decode_fail_cnt++;
807 0 : fd_bundle_tile_backoff( ctx, fd_bundle_now() );
808 0 : }
809 0 : break;
810 0 : case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
811 0 : if( FD_UNLIKELY( !fd_bundle_auther_handle_tokens_resp( &ctx->auther, protobuf, protobuf_sz ) ) ) {
812 0 : ctx->metrics.decode_fail_cnt++;
813 0 : fd_bundle_tile_backoff( ctx, fd_bundle_now() );
814 0 : }
815 0 : break;
816 12 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
817 12 : fd_bundle_client_handle_bundle_batch( ctx, &istream );
818 12 : break;
819 6 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
820 6 : fd_bundle_client_handle_packet_batch( ctx, &istream );
821 6 : break;
822 9 : case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
823 9 : fd_bundle_client_handle_builder_fee_info( ctx, &istream );
824 9 : break;
825 0 : default:
826 0 : FD_LOG_ERR(( "Received unexpected gRPC message (request_ctx=%lu)", request_ctx ));
827 27 : }
828 27 : }
829 :
830 : static void
831 : fd_bundle_client_request_failed( fd_bundle_tile_t * ctx,
832 0 : ulong request_ctx ) {
833 0 : fd_bundle_tile_backoff( ctx, fd_bundle_now() );
834 0 : switch( request_ctx ) {
835 0 : case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
836 0 : case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
837 0 : fd_bundle_auther_handle_request_fail( &ctx->auther );
838 0 : break;
839 0 : }
840 0 : }
841 :
842 : void
843 : fd_bundle_client_grpc_rx_end(
844 : void * app_ctx,
845 : ulong request_ctx,
846 : fd_grpc_resp_hdrs_t * resp
847 12 : ) {
848 12 : fd_bundle_tile_t * ctx = app_ctx;
849 12 : if( FD_UNLIKELY( resp->h2_status!=200 ) ) {
850 0 : FD_LOG_WARNING(( "gRPC request failed (HTTP status %u)", resp->h2_status ));
851 0 : fd_bundle_client_request_failed( ctx, request_ctx );
852 0 : return;
853 0 : }
854 :
855 12 : resp->grpc_msg_len = (uint)fd_url_unescape( resp->grpc_msg, resp->grpc_msg_len );
856 12 : if( !resp->grpc_msg_len ) {
857 12 : fd_memcpy( resp->grpc_msg, "unknown error", 13 );
858 12 : resp->grpc_msg_len = 13;
859 12 : }
860 :
861 12 : switch( request_ctx ) {
862 0 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
863 0 : ctx->packet_subscription_live = 0;
864 0 : ctx->packet_subscription_wait = 0;
865 0 : fd_bundle_tile_backoff( ctx, fd_bundle_now() );
866 0 : ctx->defer_reset = 1;
867 0 : FD_LOG_INFO(( "SubscribePackets stream failed (gRPC status %u-%s). Reconnecting ...",
868 0 : resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ) ));
869 0 : return;
870 9 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
871 9 : ctx->bundle_subscription_live = 0;
872 9 : ctx->bundle_subscription_wait = 0;
873 9 : fd_bundle_tile_backoff( ctx, fd_bundle_now() );
874 9 : ctx->defer_reset = 1;
875 9 : FD_LOG_INFO(( "SubscribeBundles stream failed (gRPC status %u-%s). Reconnecting ...",
876 9 : resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ) ));
877 9 : return;
878 3 : case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
879 3 : ctx->builder_info_wait = 0;
880 3 : break;
881 0 : default:
882 0 : break;
883 12 : }
884 :
885 3 : if( FD_UNLIKELY( resp->grpc_status!=FD_GRPC_STATUS_OK ) ) {
886 0 : FD_LOG_INFO(( "gRPC request failed (gRPC status %u-%s): %.*s",
887 0 : resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ),
888 0 : (int)resp->grpc_msg_len, resp->grpc_msg ));
889 0 : fd_bundle_client_request_failed( ctx, request_ctx );
890 0 : if( resp->grpc_status==FD_GRPC_STATUS_UNAUTHENTICATED ||
891 0 : resp->grpc_status==FD_GRPC_STATUS_PERMISSION_DENIED ) {
892 0 : fd_bundle_auther_reset( &ctx->auther );
893 0 : }
894 0 : return;
895 0 : }
896 3 : }
897 :
898 : void
899 : fd_bundle_client_grpc_rx_timeout(
900 : void * app_ctx,
901 : ulong request_ctx, /* FD_BUNDLE_CLIENT_REQ_{...} */
902 : int deadline_kind /* FD_GRPC_DEADLINE_{HEADER|RX_END} */
903 6 : ) {
904 6 : (void)deadline_kind;
905 6 : FD_LOG_WARNING(( "Request timed out: %s", fd_bundle_request_ctx_cstr( request_ctx ) ));
906 6 : fd_bundle_tile_t * ctx = app_ctx;
907 6 : ctx->defer_reset = 1;
908 6 : }
909 :
910 : static void
911 3 : fd_bundle_client_grpc_ping_ack( void * app_ctx ) {
912 3 : fd_bundle_tile_t * ctx = app_ctx;
913 3 : long rtt_sample = fd_keepalive_rx( ctx->keepalive, fd_bundle_now() );
914 3 : if( FD_LIKELY( rtt_sample ) ) {
915 3 : fd_rtt_sample( ctx->rtt, (float)rtt_sample, 0 );
916 3 : FD_LOG_DEBUG(( "Keepalive ACK" ));
917 3 : }
918 3 : ctx->metrics.ping_ack_cnt++;
919 3 : }
920 :
921 : fd_grpc_client_callbacks_t fd_bundle_client_grpc_callbacks = {
922 : .conn_established = fd_bundle_client_grpc_conn_established,
923 : .conn_dead = fd_bundle_client_grpc_conn_dead,
924 : .tx_complete = fd_bundle_client_grpc_tx_complete,
925 : .rx_start = fd_bundle_client_grpc_rx_start,
926 : .rx_msg = fd_bundle_client_grpc_rx_msg,
927 : .rx_end = fd_bundle_client_grpc_rx_end,
928 : .rx_timeout = fd_bundle_client_grpc_rx_timeout,
929 : .ping_ack = fd_bundle_client_grpc_ping_ack,
930 : };
931 :
932 : /* Decrease verbosity */
933 12 : #define DISCONNECTED FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_DISCONNECTED
934 54 : #define CONNECTING FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTING
935 108 : #define CONNECTED FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTED
936 :
937 : int
938 174 : fd_bundle_client_status( fd_bundle_tile_t const * ctx ) {
939 174 : if( FD_UNLIKELY( ( !ctx->tcp_sock_connected ) |
940 174 : ( !ctx->grpc_client ) ) ) {
941 3 : return DISCONNECTED;
942 3 : }
943 :
944 171 : fd_h2_conn_t * conn = fd_grpc_client_h2_conn( ctx->grpc_client );
945 171 : if( FD_UNLIKELY( !conn ) ) {
946 0 : return DISCONNECTED; /* no conn */
947 0 : }
948 171 : if( FD_UNLIKELY( conn->flags &
949 171 : ( FD_H2_CONN_FLAGS_DEAD |
950 171 : FD_H2_CONN_FLAGS_SEND_GOAWAY ) ) ) {
951 6 : return DISCONNECTED;
952 6 : }
953 :
954 165 : if( FD_UNLIKELY( conn->flags &
955 165 : ( FD_H2_CONN_FLAGS_CLIENT_INITIAL |
956 165 : FD_H2_CONN_FLAGS_WAIT_SETTINGS_ACK_0 |
957 165 : FD_H2_CONN_FLAGS_WAIT_SETTINGS_0 |
958 165 : FD_H2_CONN_FLAGS_SERVER_INITIAL ) ) ) {
959 12 : return CONNECTING; /* connection is not ready */
960 12 : }
961 :
962 153 : if( FD_UNLIKELY( ctx->auther.state != FD_BUNDLE_AUTH_STATE_DONE_WAIT ) ) {
963 12 : return CONNECTING; /* not authenticated */
964 12 : }
965 :
966 141 : if( FD_UNLIKELY( ( !ctx->builder_info_avail ) |
967 141 : ( !ctx->packet_subscription_live ) |
968 141 : ( !ctx->bundle_subscription_live ) ) ) {
969 27 : return CONNECTING; /* not fully connected */
970 27 : }
971 :
972 114 : if( FD_UNLIKELY( fd_keepalive_is_timeout( ctx->keepalive, fd_bundle_now() ) ) ) {
973 3 : return DISCONNECTED; /* possible timeout */
974 3 : }
975 :
976 111 : if( FD_UNLIKELY( !fd_grpc_client_is_connected( ctx->grpc_client ) ) ) {
977 3 : return CONNECTING;
978 3 : }
979 :
980 : /* As far as we know, the bundle connection is alive and well. */
981 108 : return CONNECTED;
982 111 : }
983 :
984 : #undef DISCONNECTED
985 : #undef CONNECTING
986 : #undef CONNECTED
987 :
988 : FD_FN_CONST char const *
989 6 : fd_bundle_request_ctx_cstr( ulong request_ctx ) {
990 6 : switch( request_ctx ) {
991 0 : case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
992 0 : return "GenerateAuthChallenge";
993 0 : case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
994 0 : return "GenerateAuthTokens";
995 0 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
996 0 : return "SubscribePackets";
997 6 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
998 6 : return "SubscribeBundles";
999 0 : case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
1000 0 : return "GetBlockBuilderFeeInfo";
1001 0 : default:
1002 0 : return "unknown";
1003 6 : }
1004 6 : }
|