Line data Source code
1 : #define _GNU_SOURCE
2 : #include "fd_event_client.h"
3 :
4 : #include "../../waltz/resolv/fd_netdb.h"
5 : #include "../../waltz/http/fd_url.h"
6 : #include "../../waltz/grpc/fd_grpc_client.h"
7 : #include "../../waltz/grpc/fd_grpc_client_private.h"
8 : #include "../../ballet/pb/fd_pb_tokenize.h"
9 : #include "../../ballet/pb/fd_pb_encode.h"
10 : #include "../../ballet/hex/fd_hex.h"
11 : #include "../../util/net/fd_ip4.h"
12 : #include "../../util/log/fd_log.h"
13 : #include "../keyguard/fd_keyguard.h"
14 :
15 : #if FD_HAS_OPENSSL
16 : #include "../../waltz/openssl/fd_openssl.h"
17 : #include <openssl/ssl.h>
18 : #include <openssl/err.h>
19 : #endif
20 :
21 : #include <netinet/tcp.h>
22 : #include <unistd.h>
23 : #include <errno.h>
24 : #include <sys/socket.h>
25 : #include <netinet/in.h>
26 :
27 0 : #define DISCONNECT_REASON_IDENTITY_CHANGED (0)
28 0 : #define DISCONNECT_REASON_CONNECT_FAILED (1)
29 0 : #define DISCONNECT_REASON_DNS_RESOLVE_FAILED (2)
30 0 : #define DISCONNECT_REASON_TIMEOUT (3)
31 0 : #define DISCONNECT_REASON_TRANSPORT_FAILED (4)
32 0 : #define DISCONNECT_REASON_PEER_CLOSED (5)
33 0 : #define DISCONNECT_REASON_INVALID_CURSOR (6)
34 0 : #define DISCONNECT_REASON_AUTH_FAILED (7)
35 0 : #define DISCONNECT_REASON_INVALID_PROTOBUF (8)
36 :
37 0 : #define FD_EVENT_CLIENT_REQ_CTX_AUTHENTICATE (1UL)
38 : #define FD_EVENT_CLIENT_REQ_CTX_CONFIRM_AUTH (2UL)
39 0 : #define FD_EVENT_CLIENT_REQ_CTX_STREAM_EVENTS (3UL)
40 :
41 0 : #define FD_EVENT_CLIENT_TOKEN_SZ (217UL)
42 :
43 : struct fd_event_client {
44 : fd_grpc_client_t * grpc_client;
45 : fd_grpc_client_metrics_t grpc_metrics[1];
46 : fd_grpc_h2_stream_t * event_stream;
47 :
48 : char client_version[ 10UL ];
49 : char commit_hash[ 41UL ];
50 : char action[ 16UL ];
51 : uchar identity_pubkey[ 32UL ];
52 :
53 : int has_genesis_hash;
54 : fd_hash_t genesis_hash[1];
55 :
56 : ushort has_shred_version;
57 : ushort shred_version;
58 :
59 : ulong event_id;
60 :
61 : ulong instance_id;
62 : ulong boot_id;
63 : ulong machine_id;
64 :
65 : int defer_disconnect;
66 : ulong consecutive_failure_count;
67 :
68 : int auth_send_pending;
69 :
70 : ulong state;
71 : union {
72 : struct {
73 : long reconnect_deadline;
74 : } disconnected;
75 :
76 : struct {
77 : long connect_deadline;
78 : } connecting;
79 :
80 : struct {
81 : long connected_timestamp;
82 : } connected;
83 : };
84 :
85 : int so_sndbuf;
86 : int sockfd;
87 :
88 : int use_tls;
89 : #if FD_HAS_OPENSSL
90 : SSL_CTX * ssl_ctx;
91 : SSL * ssl;
92 : #endif
93 :
94 : /* wallclock deadline for auth handshake, LONG_MAX if not
95 : authenticating. */
96 : long auth_deadline;
97 :
98 : char server_fqdn[ 256 ]; /* cstr */
99 : ulong server_fqdn_len;
100 : uint server_ip4_addr;
101 : ushort server_tcp_port;
102 :
103 : fd_rng_t * rng;
104 : fd_circq_t * circq;
105 : fd_keyguard_client_t * keyguard_client;
106 :
107 : /* Stateless-auth bearer value: "hex(challenge_token).hex(signature)",
108 : built from the challenge token returned by Authenticate and our
109 : ed25519 signature over it. Presented as the `authorization: Bearer
110 : <...>` header on the StreamEvents request. */
111 : char auth_bearer[ 2UL*FD_EVENT_CLIENT_TOKEN_SZ + 1UL + 2UL*64UL + 1UL ];
112 : ulong auth_bearer_len;
113 :
114 : fd_event_client_metrics_t metrics;
115 : };
116 :
117 : FD_FN_CONST ulong
118 0 : fd_event_client_align( void ) {
119 0 : return alignof( fd_event_client_t );
120 0 : }
121 :
122 : FD_FN_CONST ulong
123 0 : fd_event_client_footprint( ulong buf_max ) {
124 0 : ulong l;
125 0 : l = FD_LAYOUT_INIT;
126 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_event_client_t), sizeof(fd_event_client_t) );
127 0 : l = FD_LAYOUT_APPEND( l, fd_grpc_client_align(), fd_grpc_client_footprint( buf_max ) );
128 0 : return FD_LAYOUT_FINI( l, alignof(fd_event_client_t) );
129 0 : }
130 :
131 : void *
132 : fd_event_client_new( void * shmem,
133 : fd_keyguard_client_t * keyguard_client,
134 : fd_rng_t * rng,
135 : fd_circq_t * circq,
136 : int so_sndbuf,
137 : char const * _url,
138 : uchar const * identity_pubkey,
139 : char const * client_version,
140 : char const * commit_hash,
141 : char const * action,
142 : ulong instance_id,
143 : ulong boot_id,
144 : ulong machine_id,
145 : ulong buf_max,
146 : int use_tls,
147 0 : void * ssl_ctx ) {
148 0 : if( FD_UNLIKELY( !shmem ) ) {
149 0 : FD_LOG_WARNING(( "NULL shmem" ));
150 0 : return NULL;
151 0 : }
152 :
153 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_event_client_align() ) ) ) {
154 0 : FD_LOG_WARNING(( "misaligned shmem" ));
155 0 : return NULL;
156 0 : }
157 :
158 0 : FD_SCRATCH_ALLOC_INIT( l, shmem );
159 0 : fd_event_client_t * client = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_event_client_t), sizeof(fd_event_client_t) );
160 0 : void * grpc_client_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_grpc_client_align(), fd_grpc_client_footprint( buf_max ) );
161 :
162 0 : fd_url_t url[1];
163 0 : _Bool _is_ssl = 0;
164 0 : if( FD_UNLIKELY( fd_url_parse_endpoint( url,
165 0 : _url,
166 0 : strlen( _url ),
167 0 : &client->server_tcp_port,
168 0 : &_is_ssl,
169 0 : "[tiles.event.url]" ) ) ) {
170 0 : FD_LOG_ERR(( "Could not parse [tiles.event.url]" ));
171 0 : }
172 0 : if( FD_UNLIKELY( url->host_len > 255 ) ) {
173 0 : FD_LOG_CRIT(( "Invalid url->host_len" )); /* unreachable */
174 0 : }
175 0 : fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( client->server_fqdn ), url->host, url->host_len ) );
176 0 : client->server_fqdn_len = url->host_len;
177 :
178 0 : fd_memcpy( client->identity_pubkey, identity_pubkey, 32UL );
179 0 : strncpy( client->client_version, client_version, sizeof( client->client_version ) );
180 0 : client->client_version[ sizeof( client->client_version ) - 1UL ] = '\0';
181 0 : fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( client->commit_hash ), commit_hash, fd_ulong_min( strlen( commit_hash ), sizeof( client->commit_hash )-1UL ) ) );
182 0 : fd_cstr_ncpy( client->action, action, sizeof( client->action ) );
183 :
184 0 : client->event_id = 0UL;
185 :
186 0 : client->instance_id = instance_id;
187 0 : client->boot_id = boot_id;
188 0 : client->machine_id = machine_id;
189 :
190 0 : client->has_genesis_hash = 0;
191 0 : client->has_shred_version = 0;
192 :
193 0 : client->so_sndbuf = so_sndbuf;
194 0 : client->sockfd = -1;
195 0 : client->use_tls = use_tls;
196 0 : #if FD_HAS_OPENSSL
197 0 : client->ssl_ctx = (SSL_CTX *)ssl_ctx;
198 0 : client->ssl = NULL;
199 : #else
200 : (void)ssl_ctx;
201 : if( FD_UNLIKELY( use_tls ) ) {
202 : FD_LOG_ERR(( "TLS requested for event service but this build does not include OpenSSL. "
203 : "To install OpenSSL, re-run ./deps.sh and do a clean rebuild." ));
204 : }
205 : #endif
206 0 : client->auth_deadline = LONG_MAX;
207 0 : client->auth_send_pending = 0;
208 0 : client->state = FD_EVENT_CLIENT_STATE_DISCONNECTED;
209 0 : client->disconnected.reconnect_deadline = 0L;
210 :
211 0 : client->defer_disconnect = INT_MAX;
212 0 : client->consecutive_failure_count = 7UL; /* Start high, so if server is down we don't keep retrying on boot */
213 :
214 0 : client->circq = circq;
215 0 : client->rng = rng;
216 0 : client->keyguard_client = keyguard_client;
217 :
218 0 : extern fd_grpc_client_callbacks_t fd_event_client_grpc_callbacks;
219 0 : client->grpc_client = fd_grpc_client_new( grpc_client_mem, &fd_event_client_grpc_callbacks, client->grpc_metrics, client, buf_max, fd_rng_ulong( rng ) );
220 0 : FD_TEST( client->grpc_client );
221 :
222 0 : memset( &client->metrics, 0, sizeof(client->metrics) );
223 0 : memset( client->grpc_metrics, 0, sizeof(fd_grpc_client_metrics_t) );
224 :
225 0 : fd_grpc_client_set_version( client->grpc_client, client->client_version, strlen( client->client_version ) );
226 0 : fd_grpc_client_set_authority( client->grpc_client, client->server_fqdn, client->server_fqdn_len, client->server_tcp_port );
227 :
228 0 : return (void *)client;
229 0 : }
230 :
231 : fd_event_client_t *
232 0 : fd_event_client_join( void * shec ) {
233 0 : if( FD_UNLIKELY( !shec ) ) {
234 0 : FD_LOG_WARNING(( "NULL shec" ));
235 0 : return NULL;
236 0 : }
237 :
238 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shec, fd_event_client_align() ) ) ) {
239 0 : FD_LOG_WARNING(( "misaligned shec" ));
240 0 : return NULL;
241 0 : }
242 :
243 0 : fd_event_client_t * client = (fd_event_client_t *)shec;
244 :
245 0 : return client;
246 0 : }
247 :
248 : fd_event_client_metrics_t const *
249 0 : fd_event_client_metrics( fd_event_client_t const * client ) {
250 : /* Update bytes from grpc metrics */
251 0 : ((fd_event_client_t *)client)->metrics.bytes_written = client->grpc_metrics->stream_chunks_tx_bytes;
252 0 : ((fd_event_client_t *)client)->metrics.bytes_read = client->grpc_metrics->stream_chunks_rx_bytes;
253 0 : return &client->metrics;
254 0 : }
255 :
256 : ulong
257 0 : fd_event_client_state( fd_event_client_t const * client ) {
258 0 : return client->state;
259 0 : }
260 :
261 : ulong
262 0 : fd_event_client_id_reserve( fd_event_client_t * client ) {
263 0 : return client->event_id++;
264 0 : }
265 :
266 : void
267 : fd_event_client_init_genesis( fd_event_client_t * client,
268 0 : fd_genesis_meta_t const * meta ) {
269 0 : *client->genesis_hash = meta->genesis_hash;
270 0 : client->has_genesis_hash = 1;
271 0 : }
272 :
273 : void
274 : fd_event_client_init_shred_version( fd_event_client_t * client,
275 0 : ushort shred_version ) {
276 0 : client->shred_version = shred_version;
277 0 : client->has_shred_version = 1;
278 0 : }
279 :
280 : static void
281 0 : backoff( fd_event_client_t * client ) {
282 0 : long now = fd_log_wallclock();
283 0 : ulong backoff_base = 1UL << fd_ulong_min( client->consecutive_failure_count, 7UL ); /* max 4 mins */
284 0 : ulong backoff_jitter = fd_rng_ulong_roll( client->rng, backoff_base );
285 0 : client->disconnected.reconnect_deadline = now + (long)( backoff_base + backoff_jitter )*(long)1e9;
286 0 : if( FD_UNLIKELY( client->consecutive_failure_count < 8UL ) ) client->consecutive_failure_count++;
287 0 : }
288 :
289 : static void
290 : disconnect( fd_event_client_t * client,
291 : int reason,
292 : int err,
293 0 : int _backoff ) {
294 0 : #if FD_HAS_OPENSSL
295 0 : if( FD_UNLIKELY( client->ssl ) ) {
296 0 : SSL_free( client->ssl );
297 0 : client->ssl = NULL;
298 0 : }
299 0 : #endif
300 0 : if( FD_LIKELY( -1!=client->sockfd ) ) {
301 0 : if( FD_UNLIKELY( -1==close( client->sockfd ) ) ) FD_LOG_ERR(( "close() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
302 0 : client->sockfd = -1;
303 0 : client->state = FD_EVENT_CLIENT_STATE_DISCONNECTED;
304 0 : fd_circq_reset_cursor( client->circq );
305 0 : }
306 :
307 0 : client->event_stream = NULL;
308 0 : client->auth_deadline = LONG_MAX;
309 0 : client->auth_send_pending = 0;
310 :
311 0 : client->auth_bearer[ 0 ] = '\0';
312 0 : client->auth_bearer_len = 0UL;
313 :
314 0 : switch( reason ) {
315 0 : case DISCONNECT_REASON_IDENTITY_CHANGED:
316 0 : FD_LOG_INFO(( "disconnected: identity changed" ));
317 0 : break;
318 0 : case DISCONNECT_REASON_CONNECT_FAILED:
319 0 : FD_LOG_WARNING(( "connecting to " FD_IP4_ADDR_FMT ":%u failed (%i-%s)", FD_IP4_ADDR_FMT_ARGS( client->server_ip4_addr ), client->server_tcp_port, errno, fd_io_strerror( errno ) ));
320 0 : client->metrics.transport_fail_cnt++;
321 0 : break;
322 0 : case DISCONNECT_REASON_DNS_RESOLVE_FAILED:
323 0 : FD_LOG_WARNING(( "failed to resolve host `%.*s` (%d-%s)", (int)client->server_fqdn_len, client->server_fqdn, err, fd_gai_strerror( err ) ));
324 0 : client->metrics.transport_fail_cnt++;
325 0 : break;
326 0 : case DISCONNECT_REASON_TIMEOUT:
327 0 : FD_LOG_INFO(( "connection failed: timeout" ));
328 0 : client->metrics.transport_fail_cnt++;
329 0 : break;
330 0 : case DISCONNECT_REASON_TRANSPORT_FAILED:
331 0 : FD_LOG_WARNING(( "disconnected: transport failed (%d-%s)", err, fd_io_strerror( err ) ));
332 0 : client->metrics.transport_fail_cnt++;
333 0 : break;
334 0 : case DISCONNECT_REASON_PEER_CLOSED:
335 0 : FD_LOG_WARNING(( "disconnected: peer closed connection" ));
336 0 : client->metrics.transport_fail_cnt++;
337 0 : break;
338 0 : case DISCONNECT_REASON_INVALID_CURSOR:
339 0 : FD_LOG_WARNING(( "disconnected: invalid cursor" ));
340 0 : client->metrics.transport_fail_cnt++;
341 0 : break;
342 0 : case DISCONNECT_REASON_AUTH_FAILED:
343 0 : FD_LOG_WARNING(( "disconnected: authentication failed" ));
344 0 : client->metrics.transport_fail_cnt++;
345 0 : break;
346 0 : case DISCONNECT_REASON_INVALID_PROTOBUF:
347 0 : FD_LOG_WARNING(( "disconnected: invalid protobuf message received" ));
348 0 : client->metrics.transport_fail_cnt++;
349 0 : break;
350 0 : default:
351 0 : FD_LOG_WARNING(( "disconnected: unknown reason %d", reason ));
352 0 : client->metrics.transport_fail_cnt++;
353 0 : break;
354 0 : }
355 :
356 0 : if( FD_LIKELY( _backoff ) ) backoff( client );
357 0 : }
358 :
359 : void
360 : fd_event_client_set_identity( fd_event_client_t * client,
361 0 : uchar const * identity_pubkey ) {
362 0 : fd_memcpy( client->identity_pubkey, identity_pubkey, 32UL );
363 0 : disconnect( client, DISCONNECT_REASON_IDENTITY_CHANGED, 0, 0 );
364 0 : }
365 :
366 : static void
367 : reconnect( fd_event_client_t * client,
368 0 : int * charge_busy ) {
369 0 : FD_TEST( client->state==FD_EVENT_CLIENT_STATE_DISCONNECTED );
370 :
371 0 : long now = fd_log_wallclock();
372 0 : if( FD_UNLIKELY( now<client->disconnected.reconnect_deadline ) ) return;
373 :
374 0 : *charge_busy = 1;
375 0 : client->metrics.connect_attempt_cnt++;
376 :
377 0 : FD_LOG_INFO(( "connecting to event server %s://%.*s:%u", client->use_tls ? "https" : "http", (int)client->server_fqdn_len, client->server_fqdn, client->server_tcp_port ));
378 :
379 : /* FIXME IPv6 support */
380 0 : fd_addrinfo_t hints = {0};
381 0 : hints.ai_family = AF_INET;
382 0 : fd_addrinfo_t * res = NULL;
383 0 : uchar scratch[ 4096 ];
384 0 : void * pscratch = scratch;
385 0 : int err = fd_getaddrinfo( client->server_fqdn, &hints, &res, &pscratch, sizeof(scratch) );
386 0 : if( FD_UNLIKELY( err ) ) {
387 0 : disconnect( client, DISCONNECT_REASON_DNS_RESOLVE_FAILED, err, 1 );
388 0 : return;
389 0 : }
390 :
391 0 : if( FD_UNLIKELY( !res || !res->ai_addr ) ) {
392 0 : disconnect( client, DISCONNECT_REASON_DNS_RESOLVE_FAILED, 0, 1 );
393 0 : return;
394 0 : }
395 :
396 0 : uint const ip4_addr = ((struct sockaddr_in *)res->ai_addr)->sin_addr.s_addr;
397 0 : client->server_ip4_addr = ip4_addr;
398 :
399 0 : client->sockfd = socket( AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0 );
400 0 : if( FD_UNLIKELY( -1==client->sockfd ) ) FD_LOG_ERR(( "socket() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
401 :
402 0 : struct sockaddr_in addr;
403 0 : fd_memset( &addr, 0, sizeof( addr ) );
404 0 : addr.sin_family = AF_INET;
405 0 : addr.sin_port = fd_ushort_bswap( client->server_tcp_port );
406 0 : addr.sin_addr.s_addr = ip4_addr;
407 :
408 0 : int tcp_nodelay = 1;
409 0 : if( FD_UNLIKELY( -1==setsockopt( client->sockfd, SOL_TCP, TCP_NODELAY, &tcp_nodelay, sizeof(int) ) ) ) FD_LOG_ERR(( "setsockopt failed (%d-%s)", errno, fd_io_strerror( errno ) ));
410 0 : if( FD_UNLIKELY( -1==setsockopt( client->sockfd, SOL_SOCKET, SO_SNDBUF, &client->so_sndbuf, sizeof(int) ) ) ) FD_LOG_ERR(( "setsockopt(SOL_SOCKET,SO_SNDBUF,%i) failed (%i-%s)", client->so_sndbuf, errno, fd_io_strerror( errno ) ));
411 :
412 0 : if( FD_UNLIKELY( -1==connect( client->sockfd, fd_type_pun_const( &addr ), sizeof(struct sockaddr_in) ) && errno!=EINPROGRESS ) ) {
413 0 : disconnect( client, DISCONNECT_REASON_CONNECT_FAILED, errno, 1 );
414 0 : return;
415 0 : }
416 :
417 0 : # if FD_HAS_OPENSSL
418 0 : if( client->use_tls ) {
419 0 : BIO * bio = fd_openssl_bio_new_socket( client->sockfd, BIO_NOCLOSE );
420 0 : if( FD_UNLIKELY( !bio ) ) {
421 0 : FD_LOG_WARNING(( "fd_openssl_bio_new_socket failed" ));
422 0 : disconnect( client, DISCONNECT_REASON_CONNECT_FAILED, 0, 1 );
423 0 : return;
424 0 : }
425 :
426 0 : SSL * ssl = SSL_new( client->ssl_ctx );
427 0 : if( FD_UNLIKELY( !ssl ) ) {
428 0 : FD_LOG_WARNING(( "SSL_new failed" ));
429 0 : BIO_free( bio );
430 0 : disconnect( client, DISCONNECT_REASON_CONNECT_FAILED, 0, 1 );
431 0 : return;
432 0 : }
433 :
434 0 : SSL_set_bio( ssl, bio, bio ); /* moves ownership of bio */
435 0 : SSL_set_connect_state( ssl );
436 :
437 : /* SNI and hostname verification */
438 0 : if( FD_UNLIKELY( !SSL_set_tlsext_host_name( ssl, client->server_fqdn ) ) ) {
439 0 : FD_LOG_WARNING(( "SSL_set_tlsext_host_name failed" ));
440 0 : SSL_free( ssl );
441 0 : disconnect( client, DISCONNECT_REASON_CONNECT_FAILED, 0, 1 );
442 0 : return;
443 0 : }
444 0 : if( FD_UNLIKELY( !SSL_set1_host( ssl, client->server_fqdn ) ) ) {
445 0 : FD_LOG_WARNING(( "SSL_set1_host failed" ));
446 0 : SSL_free( ssl );
447 0 : disconnect( client, DISCONNECT_REASON_CONNECT_FAILED, 0, 1 );
448 0 : return;
449 0 : }
450 :
451 0 : client->ssl = ssl;
452 0 : }
453 0 : # endif /* FD_HAS_OPENSSL */
454 :
455 0 : fd_grpc_client_reset( client->grpc_client );
456 :
457 0 : client->state = FD_EVENT_CLIENT_STATE_CONNECTING;
458 0 : client->connecting.connect_deadline = now+(long)1L*(long)1e9; /* 1 second to connect */
459 0 : }
460 :
461 : static int
462 0 : fd_event_client_try_send_authenticate( fd_event_client_t * client ) {
463 0 : if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( client->grpc_client ) ) ) return 0;
464 0 : if( FD_UNLIKELY( fd_grpc_client_request_stream_busy( client->grpc_client ) ) ) return 0;
465 :
466 0 : fd_pb_encoder_t auth_req[1];
467 0 : uchar buffer[ 256UL ];
468 0 : fd_pb_encoder_init( auth_req, buffer, sizeof(buffer) );
469 :
470 0 : fd_pb_push_bytes( auth_req, 1U, client->identity_pubkey, 32UL );
471 0 : fd_pb_push_string( auth_req, 2U, client->client_version, strlen( client->client_version ) );
472 0 : fd_pb_push_string( auth_req, 3U, client->commit_hash, strlen( client->commit_hash ) );
473 0 : fd_pb_push_bytes( auth_req, 4U, client->genesis_hash, 32UL );
474 0 : fd_pb_push_uint64( auth_req, 5U, client->shred_version );
475 0 : fd_pb_push_uint64( auth_req, 6U, client->instance_id );
476 0 : fd_pb_push_uint64( auth_req, 7U, client->machine_id );
477 0 : fd_pb_push_uint64( auth_req, 8U, client->boot_id );
478 0 : fd_pb_push_string( auth_req, 9U, client->action, strlen( client->action ) );
479 :
480 0 : fd_grpc_h2_stream_t * stream = fd_grpc_client_request_start1(
481 0 : client->grpc_client,
482 0 : "/events.v1.EventService/Authenticate", strlen("/events.v1.EventService/Authenticate"),
483 0 : FD_EVENT_CLIENT_REQ_CTX_AUTHENTICATE,
484 0 : buffer, fd_pb_encoder_out_sz( auth_req ),
485 0 : NULL, 0UL,
486 0 : 0 /* not streaming */ );
487 :
488 0 : if( FD_UNLIKELY( !stream ) ) return 0;
489 :
490 0 : long now = fd_log_wallclock();
491 0 : fd_grpc_client_deadline_set( stream, FD_GRPC_DEADLINE_HEADER, now+(long)2e9 );
492 0 : fd_grpc_client_deadline_set( stream, FD_GRPC_DEADLINE_RX_END, now+(long)2e9 );
493 :
494 0 : client->auth_send_pending = 0;
495 0 : FD_LOG_INFO(( "Requesting auth challenge from event server " FD_IP4_ADDR_FMT ":%u (%.*s)",
496 0 : FD_IP4_ADDR_FMT_ARGS( client->server_ip4_addr ), client->server_tcp_port,
497 0 : (int)client->server_fqdn_len, client->server_fqdn ));
498 0 : return 1;
499 0 : }
500 :
501 : static void
502 0 : fd_event_client_grpc_conn_established( void * app_ctx ) {
503 0 : fd_event_client_t * client = app_ctx;
504 :
505 0 : long now = fd_log_wallclock();
506 0 : client->state = FD_EVENT_CLIENT_STATE_AUTHENTICATING;
507 0 : client->auth_deadline = now + (long)2e9;
508 0 : client->auth_send_pending = 1;
509 :
510 0 : fd_event_client_try_send_authenticate( client );
511 0 : }
512 :
513 : static void
514 : fd_event_client_handle_auth_challenge_resp( fd_event_client_t * client,
515 : void const * protobuf,
516 0 : ulong protobuf_sz ) {
517 0 : fd_pb_inbuf_t inbuf[1];
518 0 : fd_pb_inbuf_init( inbuf, protobuf, protobuf_sz );
519 :
520 0 : if( FD_UNLIKELY( protobuf_sz==0UL ) ) {
521 0 : FD_LOG_WARNING(( "Empty auth challenge response" ));
522 0 : client->defer_disconnect = DISCONNECT_REASON_AUTH_FAILED;
523 0 : return;
524 0 : }
525 :
526 0 : fd_pb_tlv_t challenge_tlv;
527 0 : if( FD_UNLIKELY( !fd_pb_read_tlv( inbuf, &challenge_tlv ) ) ) {
528 0 : FD_LOG_WARNING(( "Failed to parse auth challenge response" ));
529 0 : client->defer_disconnect = DISCONNECT_REASON_AUTH_FAILED;
530 0 : return;
531 0 : }
532 :
533 0 : if( FD_UNLIKELY( challenge_tlv.field_id!=1U || challenge_tlv.wire_type!=FD_PB_WIRE_TYPE_LEN ) ) {
534 0 : FD_LOG_WARNING(( "Unexpected field in auth challenge response" ));
535 0 : client->defer_disconnect = DISCONNECT_REASON_AUTH_FAILED;
536 0 : return;
537 0 : }
538 :
539 0 : ulong challenge_len = challenge_tlv.len;
540 0 : if( FD_UNLIKELY( challenge_len!=FD_EVENT_CLIENT_TOKEN_SZ ) ) {
541 0 : FD_LOG_WARNING(( "Invalid challenge token size: %lu bytes (expected %lu)", challenge_len, FD_EVENT_CLIENT_TOKEN_SZ ));
542 0 : client->defer_disconnect = DISCONNECT_REASON_AUTH_FAILED;
543 0 : return;
544 0 : }
545 :
546 0 : if( FD_UNLIKELY( fd_pb_inbuf_sz( inbuf )<challenge_len ) ) {
547 0 : FD_LOG_WARNING(( "Truncated auth challenge response" ));
548 0 : client->defer_disconnect = DISCONNECT_REASON_AUTH_FAILED;
549 0 : return;
550 0 : }
551 :
552 0 : uchar challenge_token[ FD_EVENT_CLIENT_TOKEN_SZ ];
553 0 : memcpy( challenge_token, inbuf->cur, challenge_len );
554 0 : inbuf->cur += challenge_len;
555 :
556 0 : if( FD_UNLIKELY( fd_pb_inbuf_sz( inbuf ) ) ) {
557 0 : FD_LOG_WARNING(( "Trailing data in auth challenge response" ));
558 0 : client->defer_disconnect = DISCONNECT_REASON_AUTH_FAILED;
559 0 : return;
560 0 : }
561 :
562 0 : uchar sign_request[ 100UL + FD_EVENT_CLIENT_TOKEN_SZ ];
563 0 : static char const sign_prefix[ 100 ] =
564 0 : " " /* 32 spaces */
565 0 : " " /* 32 spaces */
566 0 : "Firedancer event challenge-response";
567 0 : memcpy( sign_request, sign_prefix, sizeof(sign_prefix) );
568 0 : memcpy( sign_request+100, challenge_token, challenge_len );
569 :
570 0 : uchar signature[ 64UL ];
571 0 : fd_keyguard_client_sign( client->keyguard_client,
572 0 : signature,
573 0 : sign_request, 100UL+challenge_len,
574 0 : FD_KEYGUARD_SIGN_TYPE_ED25519 );
575 :
576 : /* Build "hex(challenge_token).hex(signature)" for the bearer token. */
577 0 : fd_hex_encode( client->auth_bearer, challenge_token, FD_EVENT_CLIENT_TOKEN_SZ );
578 0 : client->auth_bearer[ 2UL*FD_EVENT_CLIENT_TOKEN_SZ ] = '.';
579 0 : fd_hex_encode( client->auth_bearer + 2UL*FD_EVENT_CLIENT_TOKEN_SZ+1UL, signature, 64UL );
580 0 : client->auth_bearer_len = 2UL*FD_EVENT_CLIENT_TOKEN_SZ + 1UL + 2UL*64UL;
581 0 : client->auth_bearer[ client->auth_bearer_len ] = '\0';
582 :
583 0 : client->event_stream = NULL;
584 0 : client->metrics.transport_success_cnt++;
585 0 : client->state = FD_EVENT_CLIENT_STATE_CONNECTED;
586 0 : client->connected.connected_timestamp = fd_log_wallclock();
587 0 : FD_LOG_NOTICE(( "connected to event server " FD_IP4_ADDR_FMT ":%u (%.*s)",
588 0 : FD_IP4_ADDR_FMT_ARGS( client->server_ip4_addr ), client->server_tcp_port,
589 0 : (int)client->server_fqdn_len, client->server_fqdn ));
590 0 : }
591 :
592 : static void
593 : fd_event_client_grpc_conn_dead( void * app_ctx,
594 : uint h2_err,
595 0 : int closed_by ) {
596 0 : fd_event_client_t * client = app_ctx;
597 0 : FD_LOG_WARNING(( "Event gRPC connection closed %s (%u-%s)",
598 0 : closed_by ? "by peer" : "due to error",
599 0 : h2_err, fd_h2_strerror( h2_err ) ));
600 0 : disconnect( client, DISCONNECT_REASON_PEER_CLOSED, 0, 1 );
601 0 : }
602 :
603 : static void
604 : fd_event_client_grpc_tx_complete( void * app_ctx,
605 0 : ulong request_ctx ) {
606 0 : (void)app_ctx; (void)request_ctx;
607 0 : }
608 :
609 : void
610 : fd_event_client_grpc_rx_start( void * app_ctx,
611 0 : ulong request_ctx ) {
612 0 : (void)app_ctx; (void)request_ctx;
613 0 : }
614 :
615 : static void
616 : fd_event_client_handle_stream_events_resp( fd_event_client_t * client,
617 : void const * protobuf,
618 0 : ulong protobuf_sz ) {
619 0 : fd_pb_inbuf_t inbuf[1];
620 0 : fd_pb_inbuf_init( inbuf, protobuf, protobuf_sz );
621 :
622 0 : ulong nonce_ack = 0UL;
623 0 : if( FD_LIKELY( protobuf_sz ) ) {
624 0 : fd_pb_tlv_t event_id;
625 0 : if( FD_UNLIKELY( !fd_pb_read_tlv( inbuf, &event_id ) ||
626 0 : event_id.field_id!=1U /* event_id */ ||
627 0 : event_id.wire_type!=FD_PB_WIRE_TYPE_VARINT ) ) {
628 0 : FD_LOG_WARNING(( "Event gRPC rx msg: invalid Protobuf" ));
629 0 : client->defer_disconnect = DISCONNECT_REASON_INVALID_PROTOBUF;
630 0 : return;
631 0 : }
632 0 : nonce_ack = event_id.varint;
633 :
634 0 : if( FD_UNLIKELY( fd_pb_inbuf_sz( inbuf ) ) ) {
635 0 : FD_LOG_WARNING(( "Event gRPC rx msg: trailing data in StreamEventsResponse" ));
636 0 : client->defer_disconnect = DISCONNECT_REASON_INVALID_PROTOBUF;
637 0 : return;
638 0 : }
639 0 : }
640 :
641 0 : client->metrics.events_acked++;
642 0 : if( FD_UNLIKELY( nonce_ack==ULONG_MAX ) ) return;
643 :
644 0 : client->metrics.last_acked_id = nonce_ack;
645 :
646 0 : int err = fd_circq_pop_until( client->circq, nonce_ack );
647 0 : if( FD_UNLIKELY( -1==err ) ) {
648 0 : FD_LOG_WARNING(( "Event gRPC rx msg: invalid cursor ack %lu", nonce_ack ));
649 0 : client->defer_disconnect = DISCONNECT_REASON_INVALID_CURSOR;
650 0 : }
651 0 : }
652 :
653 : void
654 : fd_event_client_grpc_rx_msg( void * app_ctx,
655 : void const * protobuf,
656 : ulong protobuf_sz,
657 0 : ulong request_ctx ) {
658 0 : fd_event_client_t * client = app_ctx;
659 :
660 0 : switch( request_ctx ) {
661 0 : case FD_EVENT_CLIENT_REQ_CTX_AUTHENTICATE:
662 0 : fd_event_client_handle_auth_challenge_resp( client, protobuf, protobuf_sz );
663 0 : break;
664 0 : case FD_EVENT_CLIENT_REQ_CTX_STREAM_EVENTS:
665 0 : fd_event_client_handle_stream_events_resp( client, protobuf, protobuf_sz );
666 0 : break;
667 0 : default:
668 0 : FD_LOG_WARNING(( "Unknown request_ctx: %lu, disconnecting", request_ctx ));
669 0 : client->defer_disconnect = DISCONNECT_REASON_INVALID_PROTOBUF;
670 0 : break;
671 0 : }
672 0 : }
673 :
674 : void
675 : fd_event_client_grpc_rx_end( void * app_ctx,
676 : ulong request_ctx,
677 0 : fd_grpc_resp_hdrs_t * resp ) {
678 0 : fd_event_client_t * client = app_ctx;
679 :
680 0 : if( FD_UNLIKELY( resp->h2_status!=200 ) ) {
681 0 : FD_LOG_WARNING(( "Event gRPC request failed (HTTP status %u)", resp->h2_status ));
682 0 : client->defer_disconnect = DISCONNECT_REASON_TRANSPORT_FAILED;
683 0 : return;
684 0 : }
685 :
686 0 : resp->grpc_msg_len = (uint)fd_url_unescape( resp->grpc_msg, resp->grpc_msg_len );
687 0 : if( !resp->grpc_msg_len ) {
688 0 : fd_memcpy( resp->grpc_msg, "unknown error", 13 );
689 0 : resp->grpc_msg_len = 13;
690 0 : }
691 :
692 0 : if( FD_UNLIKELY( resp->grpc_status!=FD_GRPC_STATUS_OK ) ) {
693 0 : switch( request_ctx ) {
694 0 : case FD_EVENT_CLIENT_REQ_CTX_AUTHENTICATE:
695 0 : FD_LOG_WARNING(( "Event authentication failed (gRPC status %u-%s): %.*s",
696 0 : resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ),
697 0 : (int)resp->grpc_msg_len, resp->grpc_msg ));
698 0 : client->defer_disconnect = DISCONNECT_REASON_AUTH_FAILED;
699 0 : return;
700 0 : case FD_EVENT_CLIENT_REQ_CTX_STREAM_EVENTS:
701 0 : FD_LOG_WARNING(( "Event stream failed (gRPC status %u-%s): %.*s",
702 0 : resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ),
703 0 : (int)resp->grpc_msg_len, resp->grpc_msg ));
704 0 : client->defer_disconnect = DISCONNECT_REASON_PEER_CLOSED;
705 0 : return;
706 0 : default:
707 0 : FD_LOG_WARNING(( "Event gRPC request failed (gRPC status %u-%s): %.*s",
708 0 : resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ),
709 0 : (int)resp->grpc_msg_len, resp->grpc_msg ));
710 0 : client->defer_disconnect = DISCONNECT_REASON_TRANSPORT_FAILED;
711 0 : return;
712 0 : }
713 0 : }
714 :
715 0 : if( request_ctx==FD_EVENT_CLIENT_REQ_CTX_STREAM_EVENTS ) {
716 0 : FD_LOG_INFO(( "Event gRPC stream ended gracefully" ));
717 0 : client->defer_disconnect = DISCONNECT_REASON_PEER_CLOSED;
718 0 : }
719 0 : }
720 :
721 : void
722 : fd_event_client_grpc_rx_timeout( void * app_ctx,
723 : ulong request_ctx FD_PARAM_UNUSED,
724 0 : int deadline_kind FD_PARAM_UNUSED ) {
725 0 : FD_LOG_WARNING(( "Event gRPC rx timeout" ));
726 0 : fd_event_client_t * client = (fd_event_client_t *)app_ctx;
727 0 : client->defer_disconnect = DISCONNECT_REASON_TRANSPORT_FAILED;
728 0 : client->event_stream = NULL;
729 0 : }
730 :
731 : static void
732 0 : fd_event_client_grpc_ping_ack( void * app_ctx ) {
733 0 : (void)app_ctx;
734 0 : FD_LOG_WARNING(( "Event gRPC ping ack" ));
735 0 : }
736 :
737 : static void
738 : tx( fd_event_client_t * client,
739 0 : int * charge_busy ) {
740 0 : FD_TEST( client->state==FD_EVENT_CLIENT_STATE_CONNECTED );
741 :
742 0 : if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( client->grpc_client ) ) ) return;
743 0 : if( FD_UNLIKELY( client->event_stream && client->grpc_client->request_stream != NULL && client->grpc_client->request_stream!=client->event_stream ) ) return;
744 :
745 0 : if( FD_UNLIKELY( !client->event_stream ) ) {
746 0 : client->event_stream = fd_grpc_client_request_start1(
747 0 : client->grpc_client,
748 0 : "/events.v1.EventService/StreamEvents", strlen("/events.v1.EventService/StreamEvents"),
749 0 : FD_EVENT_CLIENT_REQ_CTX_STREAM_EVENTS,
750 0 : NULL, 0UL, /* headers only; first message sent later */
751 0 : client->auth_bearer, client->auth_bearer_len,
752 0 : 1 /* streaming */ );
753 0 : if( FD_UNLIKELY( !client->event_stream ) ) return; /* transient; retry next poll */
754 0 : fd_grpc_client_deadline_set( client->event_stream, FD_GRPC_DEADLINE_HEADER, fd_log_wallclock()+(long)10e9 /* 10s */ );
755 0 : *charge_busy = 1;
756 0 : return;
757 0 : }
758 :
759 0 : ulong msg_sz;
760 0 : uchar const * msg = fd_circq_cursor_advance( client->circq, &msg_sz );
761 0 : if( FD_LIKELY( !msg ) ) return;
762 :
763 0 : int result = fd_grpc_client_stream_send_msg1( client->grpc_client, client->event_stream, msg, msg_sz );
764 0 : if( FD_UNLIKELY( !result ) ) return; /* Only reason for failure is too big message, so just skip it */
765 :
766 0 : client->metrics.events_sent++;
767 0 : *charge_busy = 1;
768 0 : }
769 :
770 : void
771 : fd_event_client_poll( fd_event_client_t * client,
772 0 : int * charge_busy ) {
773 0 : if( FD_UNLIKELY( !client->has_genesis_hash || !client->has_shred_version ) ) return;
774 :
775 0 : long now = fd_log_wallclock();
776 :
777 0 : if( FD_UNLIKELY( client->state==FD_EVENT_CLIENT_STATE_DISCONNECTED ) ) reconnect( client, charge_busy );
778 0 : if( FD_UNLIKELY( client->state==FD_EVENT_CLIENT_STATE_CONNECTING ) ) {
779 0 : if( FD_UNLIKELY( now>client->connecting.connect_deadline ) ) {
780 0 : disconnect( client, DISCONNECT_REASON_TIMEOUT, 0, 1 );
781 0 : return;
782 0 : }
783 0 : }
784 : /* Check auth handshake timeout */
785 0 : if( FD_UNLIKELY( client->state==FD_EVENT_CLIENT_STATE_AUTHENTICATING && now>client->auth_deadline ) ) {
786 0 : FD_LOG_WARNING(( "auth handshake timed out" ));
787 0 : client->metrics.handshake_timeout_cnt++;
788 0 : disconnect( client, DISCONNECT_REASON_TIMEOUT, 0, 1 );
789 0 : return;
790 0 : }
791 0 : if( FD_LIKELY( client->state!=FD_EVENT_CLIENT_STATE_DISCONNECTED ) ) {
792 0 : int rxtx_err;
793 0 : # if FD_HAS_OPENSSL
794 0 : if( client->use_tls )
795 0 : rxtx_err = fd_grpc_client_rxtx_ossl( client->grpc_client, client->ssl, charge_busy );
796 0 : else
797 0 : # endif
798 0 : rxtx_err = fd_grpc_client_rxtx_socket( client->grpc_client, client->sockfd, charge_busy );
799 0 : if( FD_UNLIKELY( -1==rxtx_err ) ) {
800 0 : disconnect( client, DISCONNECT_REASON_TRANSPORT_FAILED, errno, 1 );
801 0 : return;
802 0 : }
803 0 : }
804 :
805 0 : if( FD_UNLIKELY( client->state==FD_EVENT_CLIENT_STATE_AUTHENTICATING && client->auth_send_pending ) ) {
806 0 : fd_event_client_try_send_authenticate( client );
807 0 : }
808 :
809 0 : if( FD_UNLIKELY( client->defer_disconnect!=INT_MAX ) ) {
810 0 : int reason = client->defer_disconnect;
811 0 : client->defer_disconnect = INT_MAX;
812 0 : if( reason==DISCONNECT_REASON_AUTH_FAILED ) client->metrics.auth_fail_cnt++;
813 0 : if( reason==DISCONNECT_REASON_INVALID_PROTOBUF ) client->metrics.invalid_msg_cnt++;
814 0 : disconnect( client, reason, 0, 1 );
815 0 : return;
816 0 : }
817 :
818 0 : if( FD_LIKELY( client->state==FD_EVENT_CLIENT_STATE_CONNECTED ) ) {
819 0 : if( FD_UNLIKELY( client->consecutive_failure_count && (now-client->connected.connected_timestamp>10L*(long)1e9 ) ) ) client->consecutive_failure_count = 0UL;
820 0 : tx( client, charge_busy );
821 0 : }
822 0 : }
823 :
824 : fd_grpc_client_callbacks_t fd_event_client_grpc_callbacks = {
825 : .conn_established = fd_event_client_grpc_conn_established,
826 : .conn_dead = fd_event_client_grpc_conn_dead,
827 : .tx_complete = fd_event_client_grpc_tx_complete,
828 : .rx_start = fd_event_client_grpc_rx_start,
829 : .rx_msg = fd_event_client_grpc_rx_msg,
830 : .rx_end = fd_event_client_grpc_rx_end,
831 : .rx_timeout = fd_event_client_grpc_rx_timeout,
832 : .ping_ack = fd_event_client_grpc_ping_ack,
833 : };
|