Line data Source code
1 : /* fd_bundle_client.c steps gRPC related tasks. */
2 :
3 : #define _GNU_SOURCE /* SOL_TCP */
4 : #include "fd_bundle_auth.h"
5 : #include "fd_bundle_tile_private.h"
6 : #include "proto/block_engine.pb.h"
7 : #include "proto/bundle.pb.h"
8 : #include "proto/packet.pb.h"
9 : #include "../fd_txn_m_t.h"
10 : #include "../plugin/fd_plugin.h"
11 : #include "../../waltz/h2/fd_h2_conn.h"
12 : #include "../../waltz/http/fd_url.h" /* fd_url_unescape */
13 : #include "../../ballet/base58/fd_base58.h"
14 : #include "../../ballet/nanopb/pb_decode.h"
15 : #include "../../util/net/fd_ip4.h"
16 :
17 : #include <fcntl.h>
18 : #include <errno.h>
19 : #include <unistd.h> /* close */
20 : #include <poll.h> /* poll */
21 : #include <sys/socket.h> /* socket */
22 : #include <netinet/in.h>
23 : #include <netinet/ip.h>
24 : #include <netinet/tcp.h>
25 :
26 9 : #define FD_BUNDLE_CLIENT_REQUEST_TIMEOUT ((long)8e9) /* 8 seconds */
27 :
28 : __attribute__((weak)) long
29 642 : fd_bundle_now( void ) {
30 642 : return fd_log_wallclock();
31 642 : }
32 :
33 : void
34 3 : fd_bundle_client_reset( fd_bundle_tile_t * ctx ) {
35 3 : if( FD_UNLIKELY( ctx->tcp_sock >= 0 ) ) {
36 3 : if( FD_UNLIKELY( 0!=close( ctx->tcp_sock ) ) ) {
37 0 : FD_LOG_ERR(( "close(tcp_sock=%i) failed (%i-%s)", ctx->tcp_sock, errno, fd_io_strerror( errno ) ));
38 0 : }
39 3 : ctx->tcp_sock = -1;
40 3 : ctx->tcp_sock_connected = 0;
41 3 : }
42 3 : ctx->defer_reset = 0;
43 :
44 3 : ctx->builder_info_avail = 0;
45 3 : ctx->builder_info_wait = 0;
46 3 : ctx->packet_subscription_live = 0;
47 3 : ctx->packet_subscription_wait = 0;
48 3 : ctx->bundle_subscription_live = 0;
49 3 : ctx->bundle_subscription_wait = 0;
50 :
51 3 : memset( ctx->rtt, 0, sizeof(fd_rtt_estimate_t) );
52 :
53 3 : # if FD_HAS_OPENSSL
54 3 : if( FD_UNLIKELY( ctx->ssl ) ) {
55 0 : SSL_free( ctx->ssl );
56 0 : ctx->ssl = NULL;
57 0 : }
58 3 : # endif
59 :
60 3 : fd_bundle_tile_backoff( ctx, fd_bundle_now() );
61 :
62 3 : fd_bundle_auther_reset( &ctx->auther );
63 3 : fd_grpc_client_reset( ctx->grpc_client );
64 3 : }
65 :
66 : static int
67 : fd_bundle_client_do_connect( fd_bundle_tile_t const * ctx,
68 0 : uint ip4_addr ) {
69 0 : struct sockaddr_in addr = {
70 0 : .sin_family = AF_INET,
71 0 : .sin_addr.s_addr = ip4_addr,
72 0 : .sin_port = fd_ushort_bswap( ctx->server_tcp_port )
73 0 : };
74 0 : errno = 0;
75 0 : connect( ctx->tcp_sock, fd_type_pun_const( &addr ), sizeof(struct sockaddr_in) );
76 0 : return errno;
77 0 : }
78 :
79 : static void
80 0 : fd_bundle_client_create_conn( fd_bundle_tile_t * ctx ) {
81 0 : fd_bundle_client_reset( ctx );
82 :
83 : /* FIXME IPv6 support */
84 0 : fd_addrinfo_t hints = {0};
85 0 : hints.ai_family = AF_INET;
86 0 : fd_addrinfo_t * res = NULL;
87 0 : uchar scratch[ 4096 ];
88 0 : void * pscratch = scratch;
89 0 : int err = fd_getaddrinfo( ctx->server_fqdn, &hints, &res, &pscratch, sizeof(scratch) );
90 0 : if( FD_UNLIKELY( err ) ) {
91 0 : FD_LOG_WARNING(( "fd_getaddrinfo `%s` failed (%d-%s)", ctx->server_fqdn, err, fd_gai_strerror( err ) ));
92 0 : fd_bundle_client_reset( ctx );
93 0 : ctx->metrics.transport_fail_cnt++;
94 0 : return;
95 0 : }
96 0 : uint const ip4_addr = ((struct sockaddr_in *)res->ai_addr)->sin_addr.s_addr;
97 0 : ctx->server_ip4_addr = ip4_addr;
98 :
99 0 : int tcp_sock = socket( AF_INET, SOCK_STREAM|SOCK_CLOEXEC, 0 );
100 0 : if( FD_UNLIKELY( tcp_sock<0 ) ) {
101 0 : FD_LOG_ERR(( "socket(AF_INET,SOCK_STREAM|SOCK_CLOEXEC,0) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
102 0 : }
103 0 : ctx->tcp_sock = tcp_sock;
104 :
105 0 : if( FD_UNLIKELY( 0!=setsockopt( tcp_sock, SOL_SOCKET, SO_RCVBUF, &ctx->so_rcvbuf, sizeof(int) ) ) ) {
106 0 : FD_LOG_ERR(( "setsockopt(SOL_SOCKET,SO_RCVBUF,%i) failed (%i-%s)", ctx->so_rcvbuf, errno, fd_io_strerror( errno ) ));
107 0 : }
108 :
109 0 : int tcp_nodelay = 1;
110 0 : if( FD_UNLIKELY( 0!=setsockopt( tcp_sock, SOL_TCP, TCP_NODELAY, &tcp_nodelay, sizeof(int) ) ) ) {
111 0 : FD_LOG_ERR(( "setsockopt failed (%d-%s)", errno, fd_io_strerror( errno ) ));
112 0 : }
113 :
114 0 : if( FD_UNLIKELY( fcntl( tcp_sock, F_SETFL, O_NONBLOCK )==-1 ) ) {
115 0 : FD_LOG_ERR(( "fcntl(tcp_sock,F_SETFL,O_NONBLOCK) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
116 0 : }
117 :
118 0 : char const * scheme = "http";
119 0 : # if FD_HAS_OPENSSL
120 0 : if( ctx->is_ssl ) scheme = "https";
121 0 : # endif
122 :
123 0 : FD_LOG_INFO(( "Connecting to %s://" FD_IP4_ADDR_FMT ":%hu (%.*s)",
124 0 : scheme,
125 0 : FD_IP4_ADDR_FMT_ARGS( ip4_addr ), ctx->server_tcp_port,
126 0 : (int)ctx->server_sni_len, ctx->server_sni ));
127 :
128 0 : int connect_err = fd_bundle_client_do_connect( ctx, ip4_addr );
129 0 : if( FD_UNLIKELY( connect_err ) ) {
130 0 : if( FD_UNLIKELY( connect_err!=EINPROGRESS ) ) {
131 0 : FD_LOG_WARNING(( "connect(tcp_sock," FD_IP4_ADDR_FMT ":%u) failed (%i-%s)",
132 0 : FD_IP4_ADDR_FMT_ARGS( ip4_addr ), ctx->server_tcp_port,
133 0 : connect_err, fd_io_strerror( connect_err ) ));
134 0 : fd_bundle_client_reset( ctx );
135 0 : ctx->metrics.transport_fail_cnt++;
136 0 : return;
137 0 : }
138 0 : }
139 :
140 0 : # if FD_HAS_OPENSSL
141 0 : if( ctx->is_ssl ) {
142 0 : BIO * bio = BIO_new_socket( ctx->tcp_sock, BIO_NOCLOSE );
143 0 : if( FD_UNLIKELY( !bio ) ) {
144 0 : FD_LOG_ERR(( "BIO_new_socket failed" ));
145 0 : }
146 :
147 0 : SSL * ssl = SSL_new( ctx->ssl_ctx );
148 0 : if( FD_UNLIKELY( !ssl ) ) {
149 0 : FD_LOG_ERR(( "SSL_new failed" ));
150 0 : }
151 :
152 0 : SSL_set_bio( ssl, bio, bio ); /* moves ownership of bio */
153 0 : SSL_set_connect_state( ssl );
154 :
155 : /* Indicate to endpoint which server name we want */
156 0 : if( FD_UNLIKELY( !SSL_set_tlsext_host_name( ssl, ctx->server_sni ) ) ) {
157 0 : FD_LOG_ERR(( "SSL_set_tlsext_host_name failed" ));
158 0 : }
159 :
160 : /* Enable hostname verification */
161 0 : if( FD_UNLIKELY( !SSL_set1_host( ssl, ctx->server_sni ) ) ) {
162 0 : FD_LOG_ERR(( "SSL_set1_host failed" ));
163 0 : }
164 :
165 0 : ctx->ssl = ssl;
166 0 : }
167 0 : # endif /* FD_HAS_OPENSSL */
168 :
169 0 : fd_grpc_client_reset( ctx->grpc_client );
170 0 : fd_keepalive_init( ctx->keepalive, ctx->rng, ctx->keepalive_interval, ctx->keepalive_interval, fd_bundle_now() );
171 0 : }
172 :
173 : static int
174 : fd_bundle_client_drive_io( fd_bundle_tile_t * ctx,
175 27 : int * charge_busy ) {
176 27 : # if FD_HAS_OPENSSL
177 27 : if( ctx->is_ssl ) {
178 0 : return fd_grpc_client_rxtx_ossl( ctx->grpc_client, ctx->ssl, charge_busy );
179 0 : }
180 27 : # endif /* FD_HAS_OPENSSL */
181 :
182 27 : return fd_grpc_client_rxtx_socket( ctx->grpc_client, ctx->tcp_sock, charge_busy );
183 27 : }
184 :
185 : static void
186 3 : fd_bundle_client_request_builder_info( fd_bundle_tile_t * ctx ) {
187 3 : if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
188 :
189 3 : block_engine_BlockBuilderFeeInfoRequest req = block_engine_BlockBuilderFeeInfoRequest_init_default;
190 3 : static char const path[] = "/block_engine.BlockEngineValidator/GetBlockBuilderFeeInfo";
191 3 : fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
192 3 : ctx->grpc_client,
193 3 : path, sizeof(path)-1,
194 3 : FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo,
195 3 : &block_engine_BlockBuilderFeeInfoRequest_msg, &req,
196 3 : ctx->auther.access_token, ctx->auther.access_token_sz
197 3 : );
198 3 : if( FD_UNLIKELY( !request ) ) return;
199 3 : fd_grpc_client_deadline_set(
200 3 : request,
201 3 : FD_GRPC_DEADLINE_RX_END,
202 3 : fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
203 :
204 3 : ctx->builder_info_wait = 1;
205 3 : }
206 :
207 : static void
208 3 : fd_bundle_client_subscribe_packets( fd_bundle_tile_t * ctx ) {
209 3 : if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
210 :
211 3 : block_engine_SubscribePacketsRequest req = block_engine_SubscribePacketsRequest_init_default;
212 3 : static char const path[] = "/block_engine.BlockEngineValidator/SubscribePackets";
213 3 : fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
214 3 : ctx->grpc_client,
215 3 : path, sizeof(path)-1,
216 3 : FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets,
217 3 : &block_engine_SubscribePacketsRequest_msg, &req,
218 3 : ctx->auther.access_token, ctx->auther.access_token_sz
219 3 : );
220 3 : if( FD_UNLIKELY( !request ) ) return;
221 3 : fd_grpc_client_deadline_set(
222 3 : request,
223 3 : FD_GRPC_DEADLINE_HEADER,
224 3 : fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
225 :
226 3 : ctx->packet_subscription_wait = 1;
227 3 : }
228 :
229 : static void
230 3 : fd_bundle_client_subscribe_bundles( fd_bundle_tile_t * ctx ) {
231 3 : if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
232 :
233 3 : block_engine_SubscribeBundlesRequest req = block_engine_SubscribeBundlesRequest_init_default;
234 3 : static char const path[] = "/block_engine.BlockEngineValidator/SubscribeBundles";
235 3 : fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
236 3 : ctx->grpc_client,
237 3 : path, sizeof(path)-1,
238 3 : FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles,
239 3 : &block_engine_SubscribeBundlesRequest_msg, &req,
240 3 : ctx->auther.access_token, ctx->auther.access_token_sz
241 3 : );
242 3 : if( FD_UNLIKELY( !request ) ) return;
243 3 : fd_grpc_client_deadline_set(
244 3 : request,
245 3 : FD_GRPC_DEADLINE_HEADER,
246 3 : fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
247 :
248 3 : ctx->bundle_subscription_wait = 1;
249 3 : }
250 :
251 : void
252 3 : fd_bundle_client_send_ping( fd_bundle_tile_t * ctx ) {
253 3 : if( FD_UNLIKELY( !ctx->grpc_client ) ) return; /* no client */
254 3 : fd_h2_conn_t * conn = fd_grpc_client_h2_conn( ctx->grpc_client );
255 3 : if( FD_UNLIKELY( !conn ) ) return; /* no conn */
256 3 : if( FD_UNLIKELY( conn->flags ) ) return; /* conn busy */
257 3 : fd_h2_rbuf_t * rbuf_tx = fd_grpc_client_rbuf_tx( ctx->grpc_client );
258 :
259 3 : if( FD_LIKELY( fd_h2_tx_ping( conn, rbuf_tx ) ) ) {
260 3 : long now = fd_bundle_now();
261 3 : fd_keepalive_tx( ctx->keepalive, ctx->rng, now );
262 3 : FD_LOG_DEBUG(( "Keepalive TX (deadline=+%gs)", (double)( ctx->keepalive->ts_deadline-now )/1e9 ));
263 3 : }
264 3 : }
265 :
266 : int
267 : fd_bundle_client_step_reconnect( fd_bundle_tile_t * ctx,
268 18 : long now ) {
269 : /* Drive auth */
270 18 : if( FD_UNLIKELY( ctx->auther.needs_poll ) ) {
271 0 : fd_bundle_auther_poll( &ctx->auther, ctx->grpc_client, ctx->keyguard_client );
272 0 : return 1;
273 0 : }
274 18 : if( FD_UNLIKELY( ctx->auther.state!=FD_BUNDLE_AUTH_STATE_DONE_WAIT ) ) return 0;
275 :
276 : /* Request block builder info */
277 18 : int const builder_info_expired = ( ctx->builder_info_valid_until - now )<0;
278 18 : if( FD_UNLIKELY( ( ( !ctx->builder_info_avail ) |
279 18 : ( !builder_info_expired ) ) &
280 18 : ( !ctx->builder_info_wait ) ) ) {
281 3 : fd_bundle_client_request_builder_info( ctx );
282 3 : return 1;
283 3 : }
284 :
285 : /* Subscribe to packets */
286 15 : if( FD_UNLIKELY( !ctx->packet_subscription_live && !ctx->packet_subscription_wait ) ) {
287 3 : fd_bundle_client_subscribe_packets( ctx );
288 3 : return 1;
289 3 : }
290 :
291 : /* Subscribe to bundles */
292 12 : if( FD_UNLIKELY( !ctx->bundle_subscription_live && !ctx->bundle_subscription_wait ) ) {
293 3 : fd_bundle_client_subscribe_bundles( ctx );
294 3 : return 1;
295 3 : }
296 :
297 : /* Send a PING */
298 9 : if( FD_UNLIKELY( fd_keepalive_should_tx( ctx->keepalive, now ) ) ) {
299 3 : fd_bundle_client_send_ping( ctx );
300 3 : return 1;
301 3 : }
302 :
303 6 : return 0;
304 9 : }
305 :
306 : static void
307 : fd_bundle_client_step1( fd_bundle_tile_t * ctx,
308 30 : int * charge_busy ) {
309 :
310 : /* Wait for TCP socket to connect */
311 30 : if( FD_UNLIKELY( !ctx->tcp_sock_connected ) ) {
312 0 : if( FD_UNLIKELY( ctx->tcp_sock < 0 ) ) goto reconnect;
313 :
314 0 : struct pollfd pfds[1] = {
315 0 : { .fd = ctx->tcp_sock, .events = POLLOUT }
316 0 : };
317 0 : int poll_res = fd_syscall_poll( pfds, 1, 0 );
318 0 : if( FD_UNLIKELY( poll_res<0 ) ) {
319 0 : FD_LOG_ERR(( "fd_syscall_poll(tcp_sock) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
320 0 : }
321 0 : if( poll_res==0 ) return;
322 :
323 0 : if( pfds[0].revents & (POLLERR|POLLHUP) ) {
324 0 : int connect_err = fd_bundle_client_do_connect( ctx, 0 );
325 0 : FD_LOG_INFO(( "Bundle gRPC connect attempt failed (%i-%s)", connect_err, fd_io_strerror( connect_err ) ));
326 0 : fd_bundle_client_reset( ctx );
327 0 : ctx->metrics.transport_fail_cnt++;
328 0 : *charge_busy = 1;
329 0 : return;
330 0 : }
331 0 : if( pfds[0].revents & POLLOUT ) {
332 0 : FD_LOG_DEBUG(( "Bundle TCP socket connected" ));
333 0 : ctx->tcp_sock_connected = 1;
334 0 : *charge_busy = 1;
335 0 : return;
336 0 : }
337 0 : return;
338 0 : }
339 :
340 : /* gRPC conn died? */
341 30 : if( FD_UNLIKELY( !ctx->grpc_client ) ) {
342 0 : reconnect:
343 0 : if( FD_UNLIKELY( fd_bundle_tile_should_stall( ctx, fd_bundle_now() ) ) ) {
344 0 : return;
345 0 : }
346 0 : fd_bundle_client_create_conn( ctx );
347 0 : *charge_busy = 1;
348 0 : return;
349 0 : }
350 :
351 : /* Did a HTTP/2 PING time out */
352 30 : long check_ts = ctx->cached_ts = fd_bundle_now();
353 30 : if( FD_UNLIKELY( fd_keepalive_is_timeout( ctx->keepalive, check_ts ) ) ) {
354 3 : FD_LOG_WARNING(( "Bundle gRPC timed out (HTTP/2 PING went unanswered for %.2f seconds)",
355 3 : (double)( check_ts - ctx->keepalive->ts_last_tx )/1e9 ));
356 3 : ctx->keepalive->inflight = 0;
357 3 : ctx->defer_reset = 1;
358 3 : *charge_busy = 1;
359 3 : return;
360 3 : }
361 :
362 : /* Drive I/O, SSL handshake, and any inflight requests */
363 27 : if( FD_UNLIKELY( !fd_bundle_client_drive_io( ctx, charge_busy ) ||
364 27 : ctx->defer_reset /* new error? */ ) ) {
365 0 : fd_bundle_client_reset( ctx );
366 0 : ctx->metrics.transport_fail_cnt++;
367 0 : *charge_busy = 1;
368 0 : return;
369 0 : }
370 :
371 : /* Are we ready to issue a new request? */
372 27 : if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
373 18 : long io_ts = fd_bundle_now();
374 18 : if( FD_UNLIKELY( fd_bundle_tile_should_stall( ctx, io_ts ) ) ) return;
375 :
376 18 : *charge_busy |= fd_bundle_client_step_reconnect( ctx, io_ts );
377 18 : }
378 :
379 : static void
380 30 : fd_bundle_client_log_status( fd_bundle_tile_t * ctx ) {
381 30 : int status = fd_bundle_client_status( ctx );
382 :
383 30 : int const connected_now = ( status==FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTED );
384 30 : int const connected_before = ( ctx->bundle_status_logged==FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTED );
385 :
386 30 : if( FD_UNLIKELY( connected_now!=connected_before ) ) {
387 3 : long ts = fd_log_wallclock();
388 3 : if( FD_LIKELY( ts-(ctx->last_bundle_status_log_nanos) >= (long)1e6 ) ) {
389 3 : if( connected_now ) {
390 3 : FD_LOG_NOTICE(( "Connected to bundle server" ));
391 3 : } else {
392 0 : FD_LOG_WARNING(( "Disconnected from bundle server" ));
393 0 : }
394 3 : ctx->last_bundle_status_log_nanos = ts;
395 3 : ctx->bundle_status_logged = (uchar)status;
396 3 : }
397 3 : }
398 30 : }
399 :
400 : void
401 : fd_bundle_client_step( fd_bundle_tile_t * ctx,
402 30 : int * charge_busy ) {
403 : /* Edge-trigger logging with rate limiting */
404 30 : fd_bundle_client_step1( ctx, charge_busy );
405 30 : fd_bundle_client_log_status( ctx );
406 30 : }
407 :
408 : void
409 : fd_bundle_tile_backoff( fd_bundle_tile_t * ctx,
410 12 : long now ) {
411 12 : uint iter = ctx->backoff_iter;
412 12 : if( now < ctx->backoff_reset ) iter = 0U;
413 12 : iter++;
414 :
415 : /* FIXME proper backoff */
416 12 : long wait_ns = (long)2e9;
417 12 : wait_ns = (long)( fd_rng_ulong( ctx->rng ) & ( (1UL<<fd_ulong_find_msb_w_default( (ulong)wait_ns, 0 ))-1UL ) );
418 :
419 12 : ctx->backoff_until = now + wait_ns;
420 12 : ctx->backoff_reset = now + 2*wait_ns;
421 :
422 12 : ctx->backoff_iter = iter;
423 12 : }
424 :
425 : static void
426 0 : fd_bundle_client_grpc_conn_established( void * app_ctx ) {
427 0 : (void)app_ctx;
428 0 : FD_LOG_INFO(( "Bundle gRPC connection established" ));
429 0 : }
430 :
431 : static void
432 : fd_bundle_client_grpc_conn_dead( void * app_ctx,
433 : uint h2_err,
434 0 : int closed_by ) {
435 0 : fd_bundle_tile_t * ctx = app_ctx;
436 0 : FD_LOG_INFO(( "Bundle gRPC connection closed %s (%u-%s)",
437 0 : closed_by ? "by peer" : "due to error",
438 0 : h2_err, fd_h2_strerror( h2_err ) ));
439 0 : ctx->defer_reset = 1;
440 0 : }
441 :
442 : /* Forwards a bundle transaction to the tango message bus. */
443 :
444 : static void
445 : fd_bundle_tile_publish_bundle_txn(
446 : fd_bundle_tile_t * ctx,
447 : void const * txn,
448 : ulong txn_sz, /* <=FD_TXN_MTU */
449 : ulong bundle_txn_cnt
450 15 : ) {
451 15 : if( FD_UNLIKELY( !ctx->builder_info_avail ) ) {
452 0 : ctx->metrics.missing_builder_info_fail_cnt++; /* unreachable */
453 0 : return;
454 0 : }
455 :
456 15 : fd_txn_m_t * txnm = fd_chunk_to_laddr( ctx->verify_out.mem, ctx->verify_out.chunk );
457 15 : *txnm = (fd_txn_m_t) {
458 15 : .reference_slot = 0UL,
459 15 : .payload_sz = (ushort)txn_sz,
460 15 : .txn_t_sz = 0,
461 15 : .block_engine = {
462 15 : .bundle_id = ctx->bundle_seq,
463 15 : .bundle_txn_cnt = bundle_txn_cnt,
464 15 : .commission = (uchar)ctx->builder_commission
465 15 : },
466 15 : };
467 15 : memcpy( txnm->block_engine.commission_pubkey, ctx->builder_pubkey, 32UL );
468 15 : fd_memcpy( fd_txn_m_payload( txnm ), txn, txn_sz );
469 :
470 15 : ulong sz = fd_txn_m_realized_footprint( txnm, 0, 0 );
471 15 : ulong sig = 1UL;
472 :
473 15 : if( FD_UNLIKELY( !ctx->stem ) ) {
474 0 : FD_LOG_CRIT(( "ctx->stem not set. This is a bug." ));
475 0 : }
476 :
477 15 : ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_bundle_now() );
478 15 : fd_stem_publish( ctx->stem, ctx->verify_out.idx, sig, ctx->verify_out.chunk, sz, 0UL, 0UL, tspub );
479 15 : ctx->verify_out.chunk = fd_dcache_compact_next( ctx->verify_out.chunk, sz, ctx->verify_out.chunk0, ctx->verify_out.wmark );
480 15 : ctx->metrics.txn_received_cnt++;
481 15 : }
482 :
483 : /* Forwards a regular transaction to the tango message bus. */
484 :
485 : static void
486 : fd_bundle_tile_publish_txn(
487 : fd_bundle_tile_t * ctx,
488 : void const * txn,
489 : ulong txn_sz /* <=FD_TXN_MTU */
490 9 : ) {
491 9 : fd_txn_m_t * txnm = fd_chunk_to_laddr( ctx->verify_out.mem, ctx->verify_out.chunk );
492 9 : *txnm = (fd_txn_m_t) {
493 9 : .reference_slot = 0UL,
494 9 : .payload_sz = (ushort)txn_sz,
495 9 : .txn_t_sz = 0,
496 9 : .block_engine = {
497 9 : .bundle_id = 0UL,
498 9 : .bundle_txn_cnt = 1UL,
499 9 : .commission = 0,
500 9 : .commission_pubkey = {0}
501 9 : },
502 9 : };
503 9 : fd_memcpy( fd_txn_m_payload( txnm ), txn, txn_sz );
504 :
505 9 : ulong sz = fd_txn_m_realized_footprint( txnm, 0, 0 );
506 9 : ulong sig = 0UL;
507 :
508 9 : if( FD_UNLIKELY( !ctx->stem ) ) {
509 0 : FD_LOG_CRIT(( "ctx->stem not set. This is a bug." ));
510 0 : }
511 :
512 9 : ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_bundle_now() );
513 9 : fd_stem_publish( ctx->stem, ctx->verify_out.idx, sig, ctx->verify_out.chunk, sz, 0UL, 0UL, tspub );
514 9 : ctx->verify_out.chunk = fd_dcache_compact_next( ctx->verify_out.chunk, sz, ctx->verify_out.chunk0, ctx->verify_out.wmark );
515 9 : ctx->metrics.txn_received_cnt++;
516 9 : }
517 :
518 : /* Called for each transaction in a bundle. Simply counts up
519 : bundle_txn_cnt, but does not publish anything. */
520 :
521 : static bool
522 : fd_bundle_client_visit_pb_bundle_txn_preflight(
523 : pb_istream_t * istream,
524 : pb_field_t const * field,
525 : void ** arg
526 15 : ) {
527 15 : (void)istream; (void)field;
528 15 : fd_bundle_tile_t * ctx = *arg;
529 15 : ctx->bundle_txn_cnt++;
530 15 : return true;
531 15 : }
532 :
533 : /* Called for each transaction in a bundle. Publishes each transaction
534 : to the tango message bus. */
535 :
536 : static bool
537 : fd_bundle_client_visit_pb_bundle_txn(
538 : pb_istream_t * istream,
539 : pb_field_t const * field,
540 : void ** arg
541 15 : ) {
542 15 : (void)field;
543 15 : fd_bundle_tile_t * ctx = *arg;
544 :
545 15 : packet_Packet packet = packet_Packet_init_default;
546 15 : if( FD_UNLIKELY( !pb_decode( istream, &packet_Packet_msg, &packet ) ) ) {
547 0 : ctx->metrics.decode_fail_cnt++;
548 0 : FD_LOG_WARNING(( "Protobuf decode of (packet.Packet) failed" ));
549 0 : return false;
550 0 : }
551 :
552 15 : if( FD_UNLIKELY( packet.data.size > FD_TXN_MTU ) ) {
553 0 : FD_LOG_WARNING(( "Bundle server delivered an oversize transaction, ignoring" ));
554 0 : return true;
555 0 : }
556 :
557 15 : fd_bundle_tile_publish_bundle_txn(
558 15 : ctx,
559 15 : packet.data.bytes, packet.data.size,
560 15 : ctx->bundle_txn_cnt
561 15 : );
562 :
563 15 : return true;
564 15 : }
565 :
566 : static void
567 : fd_bundle_client_sample_rx_delay(
568 : fd_bundle_tile_t * ctx,
569 : google_protobuf_Timestamp const * ts
570 9 : ) {
571 9 : ulong tsorig = (ulong)ts->seconds*(ulong)1e9 + (ulong)ts->nanos;
572 9 : fd_histf_sample( ctx->metrics.msg_rx_delay, fd_ulong_sat_sub( (ulong)ctx->cached_ts, tsorig ) );
573 9 : }
574 :
575 : /* Called for each BundleUuid in a SubscribeBundlesResponse. */
576 :
577 : static bool
578 : fd_bundle_client_visit_pb_bundle_uuid(
579 : pb_istream_t * istream,
580 : pb_field_t const * field,
581 : void ** arg
582 3 : ) {
583 3 : (void)field;
584 3 : fd_bundle_tile_t * ctx = *arg;
585 :
586 : /* Reset bundle state */
587 :
588 3 : ctx->bundle_txn_cnt = 0UL;
589 3 : ctx->bundle_seq++;
590 :
591 : /* Do two decode passes. This is required because we need to know the
592 : number of transactions in a bundle ahead of time. However, due to
593 : the Protobuf wire encoding, we don't know the number of txns that
594 : will come until we've parsed everything.
595 :
596 : First pass: Count number of bundles. */
597 :
598 3 : pb_istream_t peek = *istream;
599 3 : bundle_BundleUuid bundle = bundle_BundleUuid_init_default;
600 3 : bundle.bundle.packets = (pb_callback_t) {
601 3 : .funcs.decode = fd_bundle_client_visit_pb_bundle_txn_preflight,
602 3 : .arg = ctx
603 3 : };
604 3 : if( FD_UNLIKELY( !pb_decode( &peek, &bundle_BundleUuid_msg, &bundle ) ) ) {
605 0 : ctx->metrics.decode_fail_cnt++;
606 0 : FD_LOG_WARNING(( "Protobuf decode of (bundle.BundleUuid) failed: %s", peek.errmsg ));
607 0 : return false;
608 0 : }
609 :
610 : /* At this opint, ctx->bundle_txn_cnt is correctly set.
611 : Second pass: Actually publish bundle packets */
612 :
613 3 : bundle = (bundle_BundleUuid)bundle_BundleUuid_init_default;
614 3 : bundle.bundle.packets = (pb_callback_t) {
615 3 : .funcs.decode = fd_bundle_client_visit_pb_bundle_txn,
616 3 : .arg = ctx
617 3 : };
618 :
619 3 : ctx->metrics.bundle_received_cnt++;
620 :
621 3 : if( FD_UNLIKELY( !pb_decode( istream, &bundle_BundleUuid_msg, &bundle ) ) ) {
622 0 : ctx->metrics.decode_fail_cnt++;
623 0 : FD_LOG_WARNING(( "Protobuf decode of (bundle.BundleUuid) failed (internal error): %s", istream->errmsg ));
624 0 : return false;
625 0 : }
626 :
627 3 : fd_bundle_client_sample_rx_delay( ctx, &bundle.bundle.header.ts );
628 :
629 3 : return true;
630 3 : }
631 :
632 : /* Handle a SubscribeBundlesResponse from a SubscribeBundles gRPC call. */
633 :
634 : static void
635 : fd_bundle_client_handle_bundle_batch(
636 : fd_bundle_tile_t * ctx,
637 : pb_istream_t * istream
638 6 : ) {
639 6 : if( FD_UNLIKELY( !ctx->builder_info_avail ) ) {
640 3 : ctx->metrics.missing_builder_info_fail_cnt++; /* unreachable */
641 3 : return;
642 3 : }
643 :
644 3 : block_engine_SubscribeBundlesResponse res = block_engine_SubscribeBundlesResponse_init_default;
645 3 : res.bundles = (pb_callback_t) {
646 3 : .funcs.decode = fd_bundle_client_visit_pb_bundle_uuid,
647 3 : .arg = ctx
648 3 : };
649 3 : if( FD_UNLIKELY( !pb_decode( istream, &block_engine_SubscribeBundlesResponse_msg, &res ) ) ) {
650 0 : ctx->metrics.decode_fail_cnt++;
651 0 : FD_LOG_WARNING(( "Protobuf decode of (block_engine.SubscribeBundlesResponse) failed: %s", istream->errmsg ));
652 0 : return;
653 0 : }
654 3 : }
655 :
656 : /* Called for each 'Packet' (a regular transaction) of a
657 : SubscribePacketsResponse. */
658 :
659 : static bool
660 : fd_bundle_client_visit_pb_packet(
661 : pb_istream_t * istream,
662 : pb_field_t const * field,
663 : void ** arg
664 9 : ) {
665 9 : (void)field;
666 9 : fd_bundle_tile_t * ctx = *arg;
667 :
668 9 : packet_Packet packet = packet_Packet_init_default;
669 9 : if( FD_UNLIKELY( !pb_decode( istream, &packet_Packet_msg, &packet ) ) ) {
670 0 : ctx->metrics.decode_fail_cnt++;
671 0 : FD_LOG_WARNING(( "Protobuf decode of (packet.Packet) failed" ));
672 0 : return false;
673 0 : }
674 :
675 9 : if( FD_UNLIKELY( packet.data.size > FD_TXN_MTU ) ) {
676 0 : FD_LOG_WARNING(( "Bundle server delivered an oversize transaction, ignoring" ));
677 0 : return true;
678 0 : }
679 :
680 9 : fd_bundle_tile_publish_txn( ctx, packet.data.bytes, packet.data.size );
681 9 : ctx->metrics.packet_received_cnt++;
682 :
683 9 : return true;
684 9 : }
685 :
686 : /* Handle a SubscribePacketsResponse from a SubscribePackets gRPC call. */
687 :
688 : static void
689 : fd_bundle_client_handle_packet_batch(
690 : fd_bundle_tile_t * ctx,
691 : pb_istream_t * istream
692 6 : ) {
693 6 : block_engine_SubscribePacketsResponse res = block_engine_SubscribePacketsResponse_init_default;
694 6 : res.batch.packets = (pb_callback_t) {
695 6 : .funcs.decode = fd_bundle_client_visit_pb_packet,
696 6 : .arg = ctx
697 6 : };
698 6 : if( FD_UNLIKELY( !pb_decode( istream, &block_engine_SubscribePacketsResponse_msg, &res ) ) ) {
699 0 : ctx->metrics.decode_fail_cnt++;
700 0 : FD_LOG_WARNING(( "Protobuf decode of (block_engine.SubscribePacketsResponse) failed" ));
701 0 : return;
702 0 : }
703 :
704 6 : fd_bundle_client_sample_rx_delay( ctx, &res.header.ts );
705 6 : }
706 :
707 : /* Handle a BlockBuilderFeeInfoResponse from a GetBlockBuilderFeeInfo
708 : gRPC call. */
709 :
710 : static void
711 : fd_bundle_client_handle_builder_fee_info(
712 : fd_bundle_tile_t * ctx,
713 : pb_istream_t * istream
714 9 : ) {
715 9 : block_engine_BlockBuilderFeeInfoResponse res = block_engine_BlockBuilderFeeInfoResponse_init_default;
716 9 : if( FD_UNLIKELY( !pb_decode( istream, &block_engine_BlockBuilderFeeInfoResponse_msg, &res ) ) ) {
717 0 : ctx->metrics.decode_fail_cnt++;
718 0 : FD_LOG_WARNING(( "Protobuf decode of (block_engine.BlockBuilderFeeInfoResponse) failed" ));
719 0 : return;
720 0 : }
721 9 : if( FD_UNLIKELY( res.commission > 100 ) ) {
722 3 : ctx->metrics.decode_fail_cnt++;
723 3 : FD_LOG_WARNING(( "BlockBuilderFeeInfoResponse commission out of range (0-100): %lu", res.commission ));
724 3 : return;
725 3 : }
726 :
727 6 : ctx->builder_commission = (uchar)res.commission;
728 6 : if( FD_UNLIKELY( !fd_base58_decode_32( res.pubkey, ctx->builder_pubkey ) ) ) {
729 3 : FD_LOG_HEXDUMP_WARNING(( "Invalid pubkey in BlockBuilderFeeInfoResponse", res.pubkey, strnlen( res.pubkey, sizeof(res.pubkey) ) ));
730 3 : return;
731 3 : }
732 :
733 3 : long validity_duration_ns = (long)( 60e9 * 5. ); /* 5 minutes */
734 3 : ctx->builder_info_avail = 1;
735 3 : ctx->builder_info_valid_until = fd_bundle_now() + validity_duration_ns;
736 3 : }
737 :
738 : static void
739 : fd_bundle_client_grpc_tx_complete(
740 : void * app_ctx,
741 : ulong request_ctx
742 9 : ) {
743 9 : (void)app_ctx; (void)request_ctx;
744 9 : }
745 :
746 : void
747 : fd_bundle_client_grpc_rx_start(
748 : void * app_ctx,
749 : ulong request_ctx
750 9 : ) {
751 9 : fd_bundle_tile_t * ctx = app_ctx;
752 9 : switch( request_ctx ) {
753 3 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
754 3 : ctx->packet_subscription_live = 1;
755 3 : ctx->packet_subscription_wait = 0;
756 3 : break;
757 3 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
758 3 : ctx->bundle_subscription_live = 1;
759 3 : ctx->bundle_subscription_wait = 0;
760 3 : break;
761 9 : }
762 9 : }
763 :
764 : void
765 : fd_bundle_client_grpc_rx_msg(
766 : void * app_ctx,
767 : void const * protobuf,
768 : ulong protobuf_sz,
769 : ulong request_ctx
770 21 : ) {
771 21 : fd_bundle_tile_t * ctx = app_ctx;
772 21 : pb_istream_t istream = pb_istream_from_buffer( protobuf, protobuf_sz );
773 21 : switch( request_ctx ) {
774 0 : case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
775 0 : if( FD_UNLIKELY( !fd_bundle_auther_handle_challenge_resp( &ctx->auther, protobuf, protobuf_sz ) ) ) {
776 0 : ctx->metrics.decode_fail_cnt++;
777 0 : fd_bundle_tile_backoff( ctx, fd_bundle_now() );
778 0 : }
779 0 : break;
780 0 : case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
781 0 : if( FD_UNLIKELY( !fd_bundle_auther_handle_tokens_resp( &ctx->auther, protobuf, protobuf_sz ) ) ) {
782 0 : ctx->metrics.decode_fail_cnt++;
783 0 : fd_bundle_tile_backoff( ctx, fd_bundle_now() );
784 0 : }
785 0 : break;
786 6 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
787 6 : fd_bundle_client_handle_bundle_batch( ctx, &istream );
788 6 : break;
789 6 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
790 6 : fd_bundle_client_handle_packet_batch( ctx, &istream );
791 6 : break;
792 9 : case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
793 9 : fd_bundle_client_handle_builder_fee_info( ctx, &istream );
794 9 : break;
795 0 : default:
796 0 : FD_LOG_ERR(( "Received unexpected gRPC message (request_ctx=%lu)", request_ctx ));
797 21 : }
798 21 : }
799 :
800 : static void
801 : fd_bundle_client_request_failed( fd_bundle_tile_t * ctx,
802 0 : ulong request_ctx ) {
803 0 : fd_bundle_tile_backoff( ctx, fd_bundle_now() );
804 0 : switch( request_ctx ) {
805 0 : case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
806 0 : case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
807 0 : fd_bundle_auther_handle_request_fail( &ctx->auther );
808 0 : break;
809 0 : }
810 0 : }
811 :
812 : void
813 : fd_bundle_client_grpc_rx_end(
814 : void * app_ctx,
815 : ulong request_ctx,
816 : fd_grpc_resp_hdrs_t * resp
817 12 : ) {
818 12 : fd_bundle_tile_t * ctx = app_ctx;
819 12 : if( FD_UNLIKELY( resp->h2_status!=200 ) ) {
820 0 : FD_LOG_WARNING(( "gRPC request failed (HTTP status %u)", resp->h2_status ));
821 0 : fd_bundle_client_request_failed( ctx, request_ctx );
822 0 : return;
823 0 : }
824 :
825 12 : resp->grpc_msg_len = (uint)fd_url_unescape( resp->grpc_msg, resp->grpc_msg_len );
826 12 : if( !resp->grpc_msg_len ) {
827 12 : fd_memcpy( resp->grpc_msg, "unknown error", 13 );
828 12 : resp->grpc_msg_len = 13;
829 12 : }
830 :
831 12 : switch( request_ctx ) {
832 0 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
833 0 : ctx->packet_subscription_live = 0;
834 0 : ctx->packet_subscription_wait = 0;
835 0 : fd_bundle_tile_backoff( ctx, fd_bundle_now() );
836 0 : ctx->defer_reset = 1;
837 0 : FD_LOG_INFO(( "SubscribePackets stream failed (gRPC status %u-%s). Reconnecting ...",
838 0 : resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ) ));
839 0 : return;
840 9 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
841 9 : ctx->bundle_subscription_live = 0;
842 9 : ctx->bundle_subscription_wait = 0;
843 9 : fd_bundle_tile_backoff( ctx, fd_bundle_now() );
844 9 : ctx->defer_reset = 1;
845 9 : FD_LOG_INFO(( "SubscribeBundles stream failed (gRPC status %u-%s). Reconnecting ...",
846 9 : resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ) ));
847 9 : return;
848 3 : case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
849 3 : ctx->builder_info_wait = 0;
850 3 : break;
851 0 : default:
852 0 : break;
853 12 : }
854 :
855 3 : if( FD_UNLIKELY( resp->grpc_status!=FD_GRPC_STATUS_OK ) ) {
856 0 : FD_LOG_INFO(( "gRPC request failed (gRPC status %u-%s): %.*s",
857 0 : resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ),
858 0 : (int)resp->grpc_msg_len, resp->grpc_msg ));
859 0 : fd_bundle_client_request_failed( ctx, request_ctx );
860 0 : if( resp->grpc_status==FD_GRPC_STATUS_UNAUTHENTICATED ||
861 0 : resp->grpc_status==FD_GRPC_STATUS_PERMISSION_DENIED ) {
862 0 : fd_bundle_auther_reset( &ctx->auther );
863 0 : }
864 0 : return;
865 0 : }
866 3 : }
867 :
868 : void
869 : fd_bundle_client_grpc_rx_timeout(
870 : void * app_ctx,
871 : ulong request_ctx, /* FD_BUNDLE_CLIENT_REQ_{...} */
872 : int deadline_kind /* FD_GRPC_DEADLINE_{HEADER|RX_END} */
873 6 : ) {
874 6 : (void)deadline_kind;
875 6 : FD_LOG_WARNING(( "Request timed out: %s", fd_bundle_request_ctx_cstr( request_ctx ) ));
876 6 : fd_bundle_tile_t * ctx = app_ctx;
877 6 : ctx->defer_reset = 1;
878 6 : }
879 :
880 : static void
881 3 : fd_bundle_client_grpc_ping_ack( void * app_ctx ) {
882 3 : fd_bundle_tile_t * ctx = app_ctx;
883 3 : long rtt_sample = fd_keepalive_rx( ctx->keepalive, fd_bundle_now() );
884 3 : if( FD_LIKELY( rtt_sample ) ) {
885 3 : fd_rtt_sample( ctx->rtt, (float)rtt_sample, 0 );
886 3 : FD_LOG_DEBUG(( "Keepalive ACK" ));
887 3 : }
888 3 : ctx->metrics.ping_ack_cnt++;
889 3 : }
890 :
891 : fd_grpc_client_callbacks_t fd_bundle_client_grpc_callbacks = {
892 : .conn_established = fd_bundle_client_grpc_conn_established,
893 : .conn_dead = fd_bundle_client_grpc_conn_dead,
894 : .tx_complete = fd_bundle_client_grpc_tx_complete,
895 : .rx_start = fd_bundle_client_grpc_rx_start,
896 : .rx_msg = fd_bundle_client_grpc_rx_msg,
897 : .rx_end = fd_bundle_client_grpc_rx_end,
898 : .rx_timeout = fd_bundle_client_grpc_rx_timeout,
899 : .ping_ack = fd_bundle_client_grpc_ping_ack,
900 : };
901 :
902 : /* Decrease verbosity */
903 12 : #define DISCONNECTED FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_DISCONNECTED
904 54 : #define CONNECTING FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTING
905 108 : #define CONNECTED FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTED
906 :
907 : int
908 174 : fd_bundle_client_status( fd_bundle_tile_t const * ctx ) {
909 174 : if( FD_UNLIKELY( ( !ctx->tcp_sock_connected ) |
910 174 : ( !ctx->grpc_client ) ) ) {
911 3 : return DISCONNECTED;
912 3 : }
913 :
914 171 : fd_h2_conn_t * conn = fd_grpc_client_h2_conn( ctx->grpc_client );
915 171 : if( FD_UNLIKELY( !conn ) ) {
916 0 : return DISCONNECTED; /* no conn */
917 0 : }
918 171 : if( FD_UNLIKELY( conn->flags &
919 171 : ( FD_H2_CONN_FLAGS_DEAD |
920 171 : FD_H2_CONN_FLAGS_SEND_GOAWAY ) ) ) {
921 6 : return DISCONNECTED;
922 6 : }
923 :
924 165 : if( FD_UNLIKELY( conn->flags &
925 165 : ( FD_H2_CONN_FLAGS_CLIENT_INITIAL |
926 165 : FD_H2_CONN_FLAGS_WAIT_SETTINGS_ACK_0 |
927 165 : FD_H2_CONN_FLAGS_WAIT_SETTINGS_0 |
928 165 : FD_H2_CONN_FLAGS_SERVER_INITIAL ) ) ) {
929 12 : return CONNECTING; /* connection is not ready */
930 12 : }
931 :
932 153 : if( FD_UNLIKELY( ctx->auther.state != FD_BUNDLE_AUTH_STATE_DONE_WAIT ) ) {
933 12 : return CONNECTING; /* not authenticated */
934 12 : }
935 :
936 141 : if( FD_UNLIKELY( ( !ctx->builder_info_avail ) |
937 141 : ( !ctx->packet_subscription_live ) |
938 141 : ( !ctx->bundle_subscription_live ) ) ) {
939 27 : return CONNECTING; /* not fully connected */
940 27 : }
941 :
942 114 : if( FD_UNLIKELY( fd_keepalive_is_timeout( ctx->keepalive, fd_bundle_now() ) ) ) {
943 3 : return DISCONNECTED; /* possible timeout */
944 3 : }
945 :
946 111 : if( FD_UNLIKELY( !fd_grpc_client_is_connected( ctx->grpc_client ) ) ) {
947 3 : return CONNECTING;
948 3 : }
949 :
950 : /* As far as we know, the bundle connection is alive and well. */
951 108 : return CONNECTED;
952 111 : }
953 :
954 : #undef DISCONNECTED
955 : #undef CONNECTING
956 : #undef CONNECTED
957 :
958 : FD_FN_CONST char const *
959 6 : fd_bundle_request_ctx_cstr( ulong request_ctx ) {
960 6 : switch( request_ctx ) {
961 0 : case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
962 0 : return "GenerateAuthChallenge";
963 0 : case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
964 0 : return "GenerateAuthTokens";
965 0 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
966 0 : return "SubscribePackets";
967 6 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
968 6 : return "SubscribeBundles";
969 0 : case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
970 0 : return "GetBlockBuilderFeeInfo";
971 0 : default:
972 0 : return "unknown";
973 6 : }
974 6 : }
|