Line data Source code
1 : #define _GNU_SOURCE
2 : #include "fd_event_client.h"
3 :
4 : #include "../../waltz/resolv/fd_netdb.h"
5 : #include "../../waltz/http/fd_url.h"
6 : #include "../../waltz/grpc/fd_grpc_client.h"
7 : #include "../../waltz/grpc/fd_grpc_client_private.h"
8 : #include "../../ballet/pb/fd_pb_tokenize.h"
9 : #include "../../ballet/pb/fd_pb_encode.h"
10 : #include "../../util/net/fd_ip4.h"
11 : #include "../../util/log/fd_log.h"
12 : #include "../keyguard/fd_keyguard.h"
13 :
14 : #include <netinet/tcp.h>
15 : #include <unistd.h>
16 : #include <errno.h>
17 : #include <sys/socket.h>
18 : #include <netinet/in.h>
19 :
20 0 : #define DISCONNECT_REASON_IDENTITY_CHANGED (0)
21 0 : #define DISCONNECT_REASON_CONNECT_FAILED (1)
22 0 : #define DISCONNECT_REASON_DNS_RESOLVE_FAILED (2)
23 0 : #define DISCONNECT_REASON_TIMEOUT (3)
24 0 : #define DISCONNECT_REASON_TRANSPORT_FAILED (4)
25 0 : #define DISCONNECT_REASON_PEER_CLOSED (5)
26 0 : #define DISCONNECT_REASON_INVALID_CURSOR (6)
27 0 : #define DISCONNECT_REASON_AUTH_FAILED (7)
28 :
29 0 : #define FD_EVENT_CLIENT_REQ_CTX_AUTHENTICATE (1UL)
30 0 : #define FD_EVENT_CLIENT_REQ_CTX_CONFIRM_AUTH (2UL)
31 0 : #define FD_EVENT_CLIENT_REQ_CTX_STREAM_EVENTS (3UL)
32 :
33 : struct fd_event_client {
34 : fd_grpc_client_t * grpc_client;
35 : fd_grpc_client_metrics_t grpc_metrics[1];
36 : fd_grpc_h2_stream_t * event_stream;
37 :
38 : char client_version[ 10UL ];
39 : uchar identity_pubkey[ 32UL ];
40 :
41 : int has_genesis_hash;
42 : uchar genesis_hash[ 32UL ];
43 :
44 : ushort has_shred_version;
45 : ushort shred_version;
46 :
47 : ulong event_id;
48 :
49 : ulong instance_id;
50 : ulong boot_id;
51 : ulong machine_id;
52 :
53 : int defer_disconnect;
54 : ulong consecutive_failure_count;
55 :
56 : ulong state;
57 : union {
58 : struct {
59 : long reconnect_deadline;
60 : } disconnected;
61 :
62 : struct {
63 : long connect_deadline;
64 : } connecting;
65 :
66 : struct {
67 : long connected_timestamp;
68 : } connected;
69 : };
70 :
71 : int so_sndbuf;
72 : int sockfd;
73 :
74 : char server_fqdn[ 256 ]; /* cstr */
75 : ulong server_fqdn_len;
76 : uint server_ip4_addr;
77 : ushort server_tcp_port;
78 :
79 : fd_rng_t * rng;
80 : fd_circq_t * circq;
81 : fd_keyguard_client_t * keyguard_client;
82 :
83 : fd_event_client_metrics_t metrics;
84 : };
85 :
86 : FD_FN_CONST ulong
87 0 : fd_event_client_align( void ) {
88 0 : return alignof( fd_event_client_t );
89 0 : }
90 :
91 : FD_FN_CONST ulong
92 0 : fd_event_client_footprint( ulong buf_max ) {
93 0 : ulong l;
94 0 : l = FD_LAYOUT_INIT;
95 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_event_client_t), sizeof(fd_event_client_t) );
96 0 : l = FD_LAYOUT_APPEND( l, fd_grpc_client_align(), fd_grpc_client_footprint( buf_max ) );
97 0 : return FD_LAYOUT_FINI( l, alignof(fd_event_client_t) );
98 0 : }
99 :
100 : static void
101 : parse_url( fd_url_t * url_,
102 : char const * url_str,
103 : ulong url_str_len,
104 0 : ushort * tcp_port ) {
105 :
106 : /* Parse URL */
107 :
108 0 : int url_err[1];
109 0 : fd_url_t * url = fd_url_parse_cstr( url_, url_str, url_str_len, url_err );
110 0 : if( FD_UNLIKELY( !url ) ) {
111 0 : switch( *url_err ) {
112 0 : scheme_err:
113 0 : case FD_URL_ERR_SCHEME:
114 0 : FD_LOG_ERR(( "Invalid [tiles.event.url] `%.*s`: must start with `http://`", (int)url_str_len, url_str ));
115 0 : break;
116 0 : case FD_URL_ERR_HOST_OVERSZ:
117 0 : FD_LOG_ERR(( "Invalid [tiles.event.url] `%.*s`: domain name is too long", (int)url_str_len, url_str ));
118 0 : break;
119 0 : default:
120 0 : FD_LOG_ERR(( "Invalid [tiles.event.url] `%.*s`", (int)url_str_len, url_str ));
121 0 : break;
122 0 : }
123 0 : }
124 :
125 : /* FIXME the URL scheme path technically shouldn't contain slashes */
126 0 : if( url->scheme_len==7UL && fd_memeq( url->scheme, "http://", 7UL ) ) {
127 0 : } else {
128 0 : goto scheme_err;
129 0 : }
130 :
131 : /* Parse port number */
132 :
133 0 : *tcp_port = 7878;
134 0 : if( url->port_len ) {
135 0 : if( FD_UNLIKELY( url->port_len > 5 ) ) {
136 0 : invalid_port:
137 0 : FD_LOG_ERR(( "Invalid [tiles.event.url] `%.*s`: invalid port number", (int)url_str_len, url_str ));
138 0 : }
139 :
140 0 : char port_cstr[6];
141 0 : fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( port_cstr ), url->port, url->port_len ) );
142 0 : ulong port_no = fd_cstr_to_ulong( port_cstr );
143 0 : if( FD_UNLIKELY( !port_no || port_no>USHORT_MAX ) ) goto invalid_port;
144 :
145 0 : *tcp_port = (ushort)port_no;
146 0 : }
147 :
148 : /* Resolve domain */
149 :
150 0 : if( FD_UNLIKELY( url->host_len > 255 ) ) {
151 0 : FD_LOG_CRIT(( "Invalid url->host_len" )); /* unreachable */
152 0 : }
153 0 : char host_cstr[ 256 ];
154 0 : fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( host_cstr ), url->host, url->host_len ) );
155 0 : }
156 :
157 : void *
158 : fd_event_client_new( void * shmem,
159 : fd_keyguard_client_t * keyguard_client,
160 : fd_rng_t * rng,
161 : fd_circq_t * circq,
162 : int so_sndbuf,
163 : char const * _url,
164 : uchar const * identity_pubkey,
165 : char const * client_version,
166 : ulong instance_id,
167 : ulong boot_id,
168 : ulong machine_id,
169 0 : ulong buf_max ) {
170 0 : if( FD_UNLIKELY( !shmem ) ) {
171 0 : FD_LOG_WARNING(( "NULL shmem" ));
172 0 : return NULL;
173 0 : }
174 :
175 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_event_client_align() ) ) ) {
176 0 : FD_LOG_WARNING(( "misaligned shmem" ));
177 0 : return NULL;
178 0 : }
179 :
180 0 : FD_SCRATCH_ALLOC_INIT( l, shmem );
181 0 : fd_event_client_t * client = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_event_client_t), sizeof(fd_event_client_t) );
182 0 : void * grpc_client_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_grpc_client_align(), fd_grpc_client_footprint( buf_max ) );
183 :
184 0 : fd_url_t url[1];
185 0 : parse_url(
186 0 : url,
187 0 : _url,
188 0 : strlen( _url ),
189 0 : &client->server_tcp_port );
190 0 : if( FD_UNLIKELY( url->host_len > 255 ) ) {
191 0 : FD_LOG_CRIT(( "Invalid url->host_len" )); /* unreachable */
192 0 : }
193 0 : fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( client->server_fqdn ), url->host, url->host_len ) );
194 0 : client->server_fqdn_len = url->host_len;
195 :
196 0 : fd_memcpy( client->identity_pubkey, identity_pubkey, 32UL );
197 0 : strncpy( client->client_version, client_version, sizeof( client->client_version ) );
198 0 : client->client_version[ sizeof( client->client_version ) - 1UL ] = '\0';
199 :
200 0 : client->event_id = 0UL;
201 :
202 0 : client->instance_id = instance_id;
203 0 : client->boot_id = boot_id;
204 0 : client->machine_id = machine_id;
205 :
206 0 : client->has_genesis_hash = 0;
207 0 : client->has_shred_version = 0;
208 :
209 0 : client->so_sndbuf = so_sndbuf;
210 0 : client->sockfd = -1;
211 0 : client->state = FD_EVENT_CLIENT_STATE_DISCONNECTED;
212 0 : client->disconnected.reconnect_deadline = 0L;
213 :
214 0 : client->defer_disconnect = INT_MAX;
215 0 : client->consecutive_failure_count = 7UL; /* Start high, so if server is down we don't keep retrying on boot */
216 :
217 0 : client->circq = circq;
218 0 : client->rng = rng;
219 0 : client->keyguard_client = keyguard_client;
220 :
221 0 : extern fd_grpc_client_callbacks_t fd_event_client_grpc_callbacks;
222 0 : client->grpc_client = fd_grpc_client_new( grpc_client_mem, &fd_event_client_grpc_callbacks, client->grpc_metrics, client, buf_max, fd_rng_ulong( rng ) );
223 0 : FD_TEST( client->grpc_client );
224 :
225 0 : memset( &client->metrics, 0, sizeof( client->metrics ) );
226 :
227 0 : fd_grpc_client_set_version( client->grpc_client, client->client_version, strlen( client->client_version ) );
228 0 : fd_grpc_client_set_authority( client->grpc_client, client->server_fqdn, client->server_fqdn_len, client->server_tcp_port );
229 :
230 0 : return (void *)client;
231 0 : }
232 :
233 : fd_event_client_t *
234 0 : fd_event_client_join( void * shec ) {
235 0 : if( FD_UNLIKELY( !shec ) ) {
236 0 : FD_LOG_WARNING(( "NULL shec" ));
237 0 : return NULL;
238 0 : }
239 :
240 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shec, fd_event_client_align() ) ) ) {
241 0 : FD_LOG_WARNING(( "misaligned shec" ));
242 0 : return NULL;
243 0 : }
244 :
245 0 : fd_event_client_t * client = (fd_event_client_t *)shec;
246 :
247 0 : return client;
248 0 : }
249 :
250 : fd_event_client_metrics_t const *
251 0 : fd_event_client_metrics( fd_event_client_t const * client ) {
252 : /* Update bytes from grpc metrics */
253 0 : ((fd_event_client_t *)client)->metrics.bytes_written = client->grpc_metrics->stream_chunks_tx_bytes;
254 0 : ((fd_event_client_t *)client)->metrics.bytes_read = client->grpc_metrics->stream_chunks_rx_bytes;
255 0 : return &client->metrics;
256 0 : }
257 :
258 : ulong
259 0 : fd_event_client_state( fd_event_client_t const * client ) {
260 0 : return client->state;
261 0 : }
262 :
263 : ulong
264 0 : fd_event_client_id_reserve( fd_event_client_t * client ) {
265 0 : return client->event_id++;
266 0 : }
267 :
268 : void
269 : fd_event_client_init_genesis_hash( fd_event_client_t * client,
270 0 : uchar const * genesis_hash ) {
271 0 : fd_memcpy( client->genesis_hash, genesis_hash, 32UL );
272 0 : client->has_genesis_hash = 1;
273 0 : }
274 :
275 : void
276 : fd_event_client_init_shred_version( fd_event_client_t * client,
277 0 : ushort shred_version ) {
278 0 : client->shred_version = shred_version;
279 0 : client->has_shred_version = 1;
280 0 : }
281 :
282 : static void
283 0 : backoff( fd_event_client_t * client ) {
284 0 : long now = fd_log_wallclock();
285 0 : ulong backoff_base = 1UL << fd_ulong_min( client->consecutive_failure_count, 7UL ); /* max 4 mins */
286 0 : ulong backoff_jitter = fd_rng_ulong_roll( client->rng, backoff_base );
287 0 : client->disconnected.reconnect_deadline = now + (long)( backoff_base + backoff_jitter )*(long)1e9;
288 0 : client->consecutive_failure_count++;
289 0 : }
290 :
291 : static void
292 : disconnect( fd_event_client_t * client,
293 : int reason,
294 : int err,
295 0 : int _backoff ) {
296 0 : if( FD_LIKELY( -1!=client->sockfd ) ) {
297 0 : if( FD_UNLIKELY( -1==close( client->sockfd ) ) ) FD_LOG_ERR(( "close() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
298 0 : client->sockfd = -1;
299 0 : client->state = FD_EVENT_CLIENT_STATE_DISCONNECTED;
300 0 : fd_circq_reset_cursor( client->circq );
301 0 : }
302 :
303 0 : switch( reason ) {
304 0 : case DISCONNECT_REASON_IDENTITY_CHANGED:
305 0 : FD_LOG_INFO(( "disconnected: identity changed" ));
306 0 : break;
307 0 : case DISCONNECT_REASON_CONNECT_FAILED:
308 0 : FD_LOG_WARNING(( "connecting to " FD_IP4_ADDR_FMT ":%u failed (%i-%s)", FD_IP4_ADDR_FMT_ARGS( client->server_ip4_addr ), client->server_tcp_port, errno, fd_io_strerror( errno ) ));
309 0 : client->metrics.transport_fail_cnt++;
310 0 : break;
311 0 : case DISCONNECT_REASON_DNS_RESOLVE_FAILED:
312 0 : FD_LOG_WARNING(( "failed to resolve host `%.*s` (%d-%s)", (int)client->server_fqdn_len, client->server_fqdn, err, fd_gai_strerror( err ) ));
313 0 : client->metrics.transport_fail_cnt++;
314 0 : break;
315 0 : case DISCONNECT_REASON_TIMEOUT:
316 0 : FD_LOG_INFO(( "connection failed: timeout" ));
317 0 : client->metrics.transport_fail_cnt++;
318 0 : break;
319 0 : case DISCONNECT_REASON_TRANSPORT_FAILED:
320 0 : FD_LOG_WARNING(( "disconnected: transport failed (%d-%s)", err, fd_io_strerror( err ) ));
321 0 : client->metrics.transport_fail_cnt++;
322 0 : break;
323 0 : case DISCONNECT_REASON_PEER_CLOSED:
324 0 : FD_LOG_WARNING(( "disconnected: peer closed connection" ));
325 0 : client->metrics.transport_fail_cnt++;
326 0 : break;
327 0 : case DISCONNECT_REASON_INVALID_CURSOR:
328 0 : FD_LOG_WARNING(( "disconnected: invalid cursor" ));
329 0 : client->metrics.transport_fail_cnt++;
330 0 : break;
331 0 : case DISCONNECT_REASON_AUTH_FAILED:
332 0 : FD_LOG_WARNING(( "disconnected: authentication failed" ));
333 0 : client->metrics.transport_fail_cnt++;
334 0 : break;
335 0 : default:
336 0 : FD_LOG_WARNING(( "disconnected: unknown reason %d", reason ));
337 0 : client->metrics.transport_fail_cnt++;
338 0 : break;
339 0 : }
340 :
341 0 : if( FD_LIKELY( _backoff ) ) backoff( client );
342 0 : }
343 :
344 : void
345 : fd_event_client_set_identity( fd_event_client_t * client,
346 0 : uchar const * identity_pubkey ) {
347 0 : fd_memcpy( client->identity_pubkey, identity_pubkey, 32UL );
348 0 : disconnect( client, DISCONNECT_REASON_IDENTITY_CHANGED, 0, 0 );
349 0 : }
350 :
351 : static void
352 : reconnect( fd_event_client_t * client,
353 0 : int * charge_busy ) {
354 0 : FD_TEST( client->state==FD_EVENT_CLIENT_STATE_DISCONNECTED );
355 :
356 0 : long now = fd_log_wallclock();
357 0 : if( FD_UNLIKELY( now<client->disconnected.reconnect_deadline ) ) return;
358 :
359 0 : *charge_busy = 1;
360 :
361 0 : FD_LOG_INFO(( "connecting to event server http://%.*s:%u", (int)client->server_fqdn_len, client->server_fqdn, client->server_tcp_port ));
362 :
363 : /* FIXME IPv6 support */
364 0 : fd_addrinfo_t hints = {0};
365 0 : hints.ai_family = AF_INET;
366 0 : fd_addrinfo_t * res = NULL;
367 0 : uchar scratch[ 4096 ];
368 0 : void * pscratch = scratch;
369 0 : int err = fd_getaddrinfo( client->server_fqdn, &hints, &res, &pscratch, sizeof(scratch) );
370 0 : if( FD_UNLIKELY( err ) ) {
371 0 : disconnect( client, DISCONNECT_REASON_DNS_RESOLVE_FAILED, err, 1 );
372 0 : return;
373 0 : }
374 :
375 0 : uint const ip4_addr = ((struct sockaddr_in *)res->ai_addr)->sin_addr.s_addr;
376 0 : client->server_ip4_addr = ip4_addr;
377 :
378 0 : client->sockfd = socket( AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0 );
379 0 : if( FD_UNLIKELY( -1==client->sockfd ) ) FD_LOG_ERR(( "socket() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
380 :
381 0 : struct sockaddr_in addr;
382 0 : fd_memset( &addr, 0, sizeof( addr ) );
383 0 : addr.sin_family = AF_INET;
384 0 : addr.sin_port = fd_ushort_bswap( client->server_tcp_port );
385 0 : addr.sin_addr.s_addr = ip4_addr;
386 :
387 0 : int tcp_nodelay = 1;
388 0 : if( FD_UNLIKELY( -1==setsockopt( client->sockfd, SOL_TCP, TCP_NODELAY, &tcp_nodelay, sizeof(int) ) ) ) FD_LOG_ERR(( "setsockopt failed (%d-%s)", errno, fd_io_strerror( errno ) ));
389 0 : if( FD_UNLIKELY( -1==setsockopt( client->sockfd, SOL_SOCKET, SO_SNDBUF, &client->so_sndbuf, sizeof(int) ) ) ) FD_LOG_ERR(( "setsockopt(SOL_SOCKET,SO_SNDBUF,%i) failed (%i-%s)", client->so_sndbuf, errno, fd_io_strerror( errno ) ));
390 :
391 0 : if( FD_UNLIKELY( -1==connect( client->sockfd, fd_type_pun_const( &addr ), sizeof(struct sockaddr_in) ) && errno!=EINPROGRESS ) ) {
392 0 : disconnect( client, DISCONNECT_REASON_CONNECT_FAILED, errno, 1 );
393 0 : return;
394 0 : }
395 :
396 0 : fd_grpc_client_reset( client->grpc_client );
397 :
398 0 : client->state = FD_EVENT_CLIENT_STATE_CONNECTING;
399 0 : client->connecting.connect_deadline = now+(long)1L*(long)1e9; /* 1 second to connect */
400 0 : }
401 :
402 : static void
403 0 : fd_event_client_grpc_conn_established( void * app_ctx ) {
404 0 : fd_event_client_t * client = app_ctx;
405 :
406 0 : fd_pb_encoder_t auth_req[1];
407 0 : uchar buffer[ 256UL ];
408 0 : fd_pb_encoder_init( auth_req, buffer, sizeof(buffer) );
409 :
410 0 : fd_pb_push_bytes( auth_req, 1U, client->identity_pubkey, 32UL );
411 0 : fd_pb_push_string( auth_req, 2U, client->client_version, strlen( client->client_version ) );
412 0 : fd_pb_push_bytes( auth_req, 3U, client->genesis_hash, 32UL );
413 0 : fd_pb_push_uint64( auth_req, 4U, client->shred_version );
414 0 : fd_pb_push_uint64( auth_req, 5U, client->instance_id );
415 0 : fd_pb_push_uint64( auth_req, 6U, client->machine_id );
416 0 : fd_pb_push_uint64( auth_req, 7U, client->boot_id );
417 :
418 0 : fd_grpc_h2_stream_t * stream = fd_grpc_client_request_start1(
419 0 : client->grpc_client,
420 0 : "/events.v1.EventService/Authenticate", strlen("/events.v1.EventService/Authenticate"),
421 0 : FD_EVENT_CLIENT_REQ_CTX_AUTHENTICATE,
422 0 : buffer, fd_pb_encoder_out_sz( auth_req ),
423 0 : NULL, 0UL,
424 0 : 0 /* not streaming */ );
425 :
426 0 : if( FD_UNLIKELY( !stream ) ) {
427 0 : FD_LOG_WARNING(( "Failed to start Authenticate request" ));
428 0 : return;
429 0 : }
430 :
431 0 : client->state = FD_EVENT_CLIENT_STATE_AUTHENTICATING;
432 0 : FD_LOG_INFO(( "Requesting auth challenge from event server " FD_IP4_ADDR_FMT ":%u (%.*s)",
433 0 : FD_IP4_ADDR_FMT_ARGS( client->server_ip4_addr ), client->server_tcp_port,
434 0 : (int)client->server_fqdn_len, client->server_fqdn ));
435 0 : }
436 :
437 : static void
438 : fd_event_client_handle_auth_challenge_resp( fd_event_client_t * client,
439 : void const * protobuf,
440 0 : ulong protobuf_sz ) {
441 0 : fd_pb_inbuf_t inbuf[1];
442 0 : fd_pb_inbuf_init( inbuf, protobuf, protobuf_sz );
443 :
444 0 : if( FD_UNLIKELY( protobuf_sz==0UL ) ) {
445 0 : FD_LOG_WARNING(( "Empty auth challenge response" ));
446 0 : client->defer_disconnect = DISCONNECT_REASON_AUTH_FAILED;
447 0 : return;
448 0 : }
449 :
450 0 : fd_pb_tlv_t challenge_tlv;
451 0 : if( FD_UNLIKELY( !fd_pb_read_tlv( inbuf, &challenge_tlv ) ) ) {
452 0 : FD_LOG_WARNING(( "Failed to parse auth challenge response" ));
453 0 : client->defer_disconnect = DISCONNECT_REASON_AUTH_FAILED;
454 0 : return;
455 0 : }
456 :
457 0 : if( FD_UNLIKELY( challenge_tlv.field_id!=1U || challenge_tlv.wire_type!=FD_PB_WIRE_TYPE_LEN ) ) {
458 0 : FD_LOG_WARNING(( "Unexpected field in auth challenge response" ));
459 0 : client->defer_disconnect = DISCONNECT_REASON_AUTH_FAILED;
460 0 : return;
461 0 : }
462 :
463 0 : ulong challenge_len = challenge_tlv.len;
464 0 : if( FD_UNLIKELY( challenge_len!=32UL ) ) {
465 0 : FD_LOG_WARNING(( "Invalid challenge size: %lu bytes", challenge_len ));
466 0 : client->defer_disconnect = DISCONNECT_REASON_AUTH_FAILED;
467 0 : return;
468 0 : }
469 :
470 0 : if( FD_UNLIKELY( fd_pb_inbuf_sz( inbuf )<challenge_len ) ) {
471 0 : FD_LOG_WARNING(( "Truncated auth challenge response" ));
472 0 : client->defer_disconnect = DISCONNECT_REASON_AUTH_FAILED;
473 0 : return;
474 0 : }
475 :
476 0 : uchar signed_challenge[ 64UL ];
477 0 : fd_keyguard_client_sign( client->keyguard_client,
478 0 : signed_challenge,
479 0 : inbuf->cur,
480 0 : 32UL,
481 0 : FD_KEYGUARD_SIGN_TYPE_FD_EVENTS_AUTH_CONCAT_ED25519 );
482 :
483 0 : fd_pb_encoder_t confirm_req[1];
484 0 : uchar buffer[ 128UL ];
485 0 : fd_pb_encoder_init( confirm_req, buffer, sizeof(buffer) );
486 0 : fd_pb_push_bytes( confirm_req, 1U, signed_challenge, 64UL );
487 :
488 0 : fd_grpc_h2_stream_t * stream = fd_grpc_client_request_start1(
489 0 : client->grpc_client,
490 0 : "/events.v1.EventService/ConfirmAuthChallenge", strlen("/events.v1.EventService/ConfirmAuthChallenge"),
491 0 : FD_EVENT_CLIENT_REQ_CTX_CONFIRM_AUTH,
492 0 : buffer, fd_pb_encoder_out_sz( confirm_req ),
493 0 : NULL, 0UL,
494 0 : 0 /* not streaming */ );
495 :
496 0 : if( FD_UNLIKELY( !stream ) ) {
497 0 : FD_LOG_WARNING(( "Failed to start ConfirmAuthChallenge request" ));
498 0 : client->defer_disconnect = DISCONNECT_REASON_AUTH_FAILED;
499 0 : return;
500 0 : }
501 :
502 0 : client->state = FD_EVENT_CLIENT_STATE_CONFIRMING_AUTH;
503 0 : FD_LOG_DEBUG(( "Sent signed auth challenge" ));
504 0 : }
505 :
506 : static void
507 : fd_event_client_handle_confirm_auth_resp( fd_event_client_t * client,
508 : void const * protobuf,
509 0 : ulong protobuf_sz ) {
510 0 : (void)protobuf;
511 0 : (void)protobuf_sz;
512 :
513 0 : client->event_stream = NULL;
514 0 : client->metrics.transport_success_cnt++;
515 0 : client->state = FD_EVENT_CLIENT_STATE_CONNECTED;
516 0 : client->connected.connected_timestamp = fd_log_wallclock();
517 0 : FD_LOG_NOTICE(( "connected to event server " FD_IP4_ADDR_FMT ":%u (%.*s)",
518 0 : FD_IP4_ADDR_FMT_ARGS( client->server_ip4_addr ), client->server_tcp_port,
519 0 : (int)client->server_fqdn_len, client->server_fqdn ));
520 0 : }
521 :
522 : static void
523 : fd_event_client_grpc_conn_dead( void * app_ctx,
524 : uint h2_err,
525 0 : int closed_by ) {
526 0 : fd_event_client_t * client = app_ctx;
527 0 : FD_LOG_WARNING(( "Event gRPC connection closed %s (%u-%s)",
528 0 : closed_by ? "by peer" : "due to error",
529 0 : h2_err, fd_h2_strerror( h2_err ) ));
530 0 : disconnect( client, DISCONNECT_REASON_PEER_CLOSED, 0, 1 );
531 0 : }
532 :
533 : static void
534 : fd_event_client_grpc_tx_complete( void * app_ctx,
535 0 : ulong request_ctx ) {
536 0 : (void)app_ctx; (void)request_ctx;
537 0 : }
538 :
539 : void
540 : fd_event_client_grpc_rx_start( void * app_ctx,
541 0 : ulong request_ctx ) {
542 0 : (void)app_ctx; (void)request_ctx;
543 0 : }
544 :
545 : static void
546 : fd_event_client_handle_stream_events_resp( fd_event_client_t * client,
547 : void const * protobuf,
548 0 : ulong protobuf_sz ) {
549 0 : fd_pb_inbuf_t inbuf[1];
550 0 : fd_pb_inbuf_init( inbuf, protobuf, protobuf_sz );
551 :
552 0 : ulong nonce_ack;
553 0 : if( FD_UNLIKELY( protobuf_sz==0UL ) ) {
554 0 : nonce_ack = 0UL;
555 0 : } else {
556 0 : fd_pb_tlv_t event_id;
557 0 : FD_TEST( fd_pb_read_tlv( inbuf, &event_id ) );
558 0 : FD_TEST( event_id.field_id==1U ); /* event_id */
559 0 : FD_TEST( event_id.wire_type==FD_PB_WIRE_TYPE_VARINT );
560 0 : nonce_ack = event_id.varint;
561 0 : }
562 :
563 0 : client->metrics.events_acked++;
564 0 : if( FD_UNLIKELY( nonce_ack==ULONG_MAX ) ) return;
565 :
566 0 : int err = fd_circq_pop_until( client->circq, nonce_ack );
567 0 : if( FD_UNLIKELY( -1==err ) ) {
568 0 : FD_LOG_WARNING(( "Event gRPC rx msg: invalid cursor ack %lu", nonce_ack ));
569 0 : client->defer_disconnect = DISCONNECT_REASON_INVALID_CURSOR;
570 0 : }
571 0 : }
572 :
573 : void
574 : fd_event_client_grpc_rx_msg( void * app_ctx,
575 : void const * protobuf,
576 : ulong protobuf_sz,
577 0 : ulong request_ctx ) {
578 0 : fd_event_client_t * client = app_ctx;
579 :
580 0 : switch( request_ctx ) {
581 0 : case FD_EVENT_CLIENT_REQ_CTX_AUTHENTICATE:
582 0 : fd_event_client_handle_auth_challenge_resp( client, protobuf, protobuf_sz );
583 0 : break;
584 0 : case FD_EVENT_CLIENT_REQ_CTX_CONFIRM_AUTH:
585 0 : fd_event_client_handle_confirm_auth_resp( client, protobuf, protobuf_sz );
586 0 : break;
587 0 : case FD_EVENT_CLIENT_REQ_CTX_STREAM_EVENTS:
588 0 : fd_event_client_handle_stream_events_resp( client, protobuf, protobuf_sz );
589 0 : break;
590 0 : default:
591 0 : FD_LOG_WARNING(( "Unknown request_ctx: %lu", request_ctx ));
592 0 : break;
593 0 : }
594 0 : }
595 :
596 : void
597 : fd_event_client_grpc_rx_end( void * app_ctx,
598 : ulong request_ctx,
599 0 : fd_grpc_resp_hdrs_t * resp ) {
600 0 : fd_event_client_t * client = app_ctx;
601 :
602 0 : if( FD_UNLIKELY( resp->h2_status!=200 ) ) {
603 0 : FD_LOG_WARNING(( "Event gRPC request failed (HTTP status %u)", resp->h2_status ));
604 0 : client->defer_disconnect = DISCONNECT_REASON_TRANSPORT_FAILED;
605 0 : return;
606 0 : }
607 :
608 0 : resp->grpc_msg_len = (uint)fd_url_unescape( resp->grpc_msg, resp->grpc_msg_len );
609 0 : if( !resp->grpc_msg_len ) {
610 0 : fd_memcpy( resp->grpc_msg, "unknown error", 13 );
611 0 : resp->grpc_msg_len = 13;
612 0 : }
613 :
614 0 : if( FD_UNLIKELY( resp->grpc_status!=FD_GRPC_STATUS_OK ) ) {
615 0 : switch( request_ctx ) {
616 0 : case FD_EVENT_CLIENT_REQ_CTX_AUTHENTICATE:
617 0 : case FD_EVENT_CLIENT_REQ_CTX_CONFIRM_AUTH:
618 0 : FD_LOG_WARNING(( "Event authentication failed (gRPC status %u-%s): %.*s",
619 0 : resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ),
620 0 : (int)resp->grpc_msg_len, resp->grpc_msg ));
621 0 : client->defer_disconnect = DISCONNECT_REASON_AUTH_FAILED;
622 0 : return;
623 0 : case FD_EVENT_CLIENT_REQ_CTX_STREAM_EVENTS:
624 0 : FD_LOG_WARNING(( "Event stream failed (gRPC status %u-%s): %.*s",
625 0 : resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ),
626 0 : (int)resp->grpc_msg_len, resp->grpc_msg ));
627 0 : client->defer_disconnect = DISCONNECT_REASON_PEER_CLOSED;
628 0 : return;
629 0 : default:
630 0 : FD_LOG_WARNING(( "Event gRPC request failed (gRPC status %u-%s): %.*s",
631 0 : resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ),
632 0 : (int)resp->grpc_msg_len, resp->grpc_msg ));
633 0 : client->defer_disconnect = DISCONNECT_REASON_TRANSPORT_FAILED;
634 0 : return;
635 0 : }
636 0 : }
637 :
638 0 : if( request_ctx==FD_EVENT_CLIENT_REQ_CTX_STREAM_EVENTS ) {
639 0 : FD_LOG_INFO(( "Event gRPC stream ended gracefully" ));
640 0 : client->defer_disconnect = DISCONNECT_REASON_PEER_CLOSED;
641 0 : }
642 0 : }
643 :
644 : void
645 : fd_event_client_grpc_rx_timeout( void * app_ctx,
646 : ulong request_ctx,
647 0 : int deadline_kind ) {
648 0 : (void)app_ctx; (void)request_ctx; (void)deadline_kind;
649 0 : FD_LOG_WARNING(( "Event gRPC rx timeout" ));
650 0 : }
651 :
652 : static void
653 0 : fd_event_client_grpc_ping_ack( void * app_ctx ) {
654 0 : (void)app_ctx;
655 0 : FD_LOG_WARNING(( "Event gRPC ping ack" ));
656 0 : }
657 :
658 : static void
659 : tx( fd_event_client_t * client,
660 0 : int * charge_busy ) {
661 0 : FD_TEST( client->state==FD_EVENT_CLIENT_STATE_CONNECTED );
662 :
663 0 : if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( client->grpc_client ) ) ) return;
664 0 : if( FD_UNLIKELY( client->event_stream && client->grpc_client->request_stream != NULL && client->grpc_client->request_stream!=client->event_stream ) ) return;
665 :
666 0 : ulong msg_sz;
667 0 : uchar const * msg = fd_circq_cursor_advance( client->circq, &msg_sz );
668 0 : if( FD_LIKELY( !msg ) ) return;
669 :
670 0 : if( FD_UNLIKELY( !client->event_stream ) ) {
671 0 : client->event_stream = fd_grpc_client_request_start1(
672 0 : client->grpc_client,
673 0 : "/events.v1.EventService/StreamEvents", strlen("/events.v1.EventService/StreamEvents"),
674 0 : FD_EVENT_CLIENT_REQ_CTX_STREAM_EVENTS,
675 0 : msg, msg_sz,
676 0 : NULL, 0UL,
677 0 : 1 /* streaming */ );
678 0 : if( FD_UNLIKELY( !client->event_stream ) ) return; /* Only reason for failure is too big message, so just skip it */
679 0 : } else {
680 0 : int result = fd_grpc_client_stream_send_msg1( client->grpc_client, client->event_stream, msg, msg_sz );
681 0 : if( FD_UNLIKELY( !result ) ) return; /* Only reason for failure is too big message, so just skip it */
682 0 : }
683 :
684 0 : client->metrics.events_sent++;
685 0 : *charge_busy = 1;
686 0 : }
687 :
688 : void
689 : fd_event_client_poll( fd_event_client_t * client,
690 0 : int * charge_busy ) {
691 0 : if( FD_UNLIKELY( !client->has_genesis_hash || !client->has_shred_version ) ) return;
692 :
693 0 : long now = fd_log_wallclock();
694 :
695 0 : if( FD_UNLIKELY( client->state==FD_EVENT_CLIENT_STATE_DISCONNECTED ) ) reconnect( client, charge_busy );
696 0 : if( FD_UNLIKELY( client->state==FD_EVENT_CLIENT_STATE_CONNECTING ) ) {
697 0 : if( FD_UNLIKELY( now>client->connecting.connect_deadline ) ) {
698 0 : disconnect( client, DISCONNECT_REASON_TIMEOUT, 0, 1 );
699 0 : return;
700 0 : }
701 0 : }
702 0 : if( FD_LIKELY( client->state!=FD_EVENT_CLIENT_STATE_DISCONNECTED ) ) {
703 0 : if( FD_UNLIKELY( -1==fd_grpc_client_rxtx_socket( client->grpc_client, client->sockfd, charge_busy ) ) ) {
704 0 : disconnect( client, DISCONNECT_REASON_TRANSPORT_FAILED, errno, 1 );
705 0 : return;
706 0 : }
707 0 : }
708 :
709 0 : if( FD_UNLIKELY( client->defer_disconnect!=INT_MAX ) ) {
710 0 : int reason = client->defer_disconnect;
711 0 : client->defer_disconnect = INT_MAX;
712 0 : disconnect( client, reason, 0, 1 );
713 0 : return;
714 0 : }
715 :
716 0 : if( FD_LIKELY( client->state==FD_EVENT_CLIENT_STATE_CONNECTED ) ) {
717 0 : if( FD_UNLIKELY( client->consecutive_failure_count && (now-client->connected.connected_timestamp>10L*(long)1e9 ) ) ) client->consecutive_failure_count = 0UL;
718 0 : tx( client, charge_busy );
719 0 : }
720 0 : }
721 :
722 : fd_grpc_client_callbacks_t fd_event_client_grpc_callbacks = {
723 : .conn_established = fd_event_client_grpc_conn_established,
724 : .conn_dead = fd_event_client_grpc_conn_dead,
725 : .tx_complete = fd_event_client_grpc_tx_complete,
726 : .rx_start = fd_event_client_grpc_rx_start,
727 : .rx_msg = fd_event_client_grpc_rx_msg,
728 : .rx_end = fd_event_client_grpc_rx_end,
729 : .rx_timeout = fd_event_client_grpc_rx_timeout,
730 : .ping_ack = fd_event_client_grpc_ping_ack,
731 : };
|