Line data Source code
1 : #define _GNU_SOURCE
2 : #include "fd_circq.h"
3 : #include "fd_event_client.h"
4 :
5 : #include "../fd_txn_m.h"
6 : #include "../fd_clock_tile.h"
7 : #include "../metrics/fd_metrics.h"
8 : #include "../net/fd_net_tile.h"
9 : #include "../../discof/genesis/fd_genesi_tile.h"
10 : #include "generated/fd_event_gen.h"
11 : #include "../keyguard/fd_keyload.h"
12 : #include "../keyguard/fd_keyswitch.h"
13 : #include "../topo/fd_topo.h"
14 : #include "../../waltz/resolv/fd_netdb.h"
15 : #include "../../waltz/http/fd_url.h"
16 : #include "../../ballet/lthash/fd_lthash.h"
17 : #include "../../ballet/pb/fd_pb_encode.h"
18 : #include "../../tango/tempo/fd_tempo.h"
19 :
20 : #if FD_HAS_OPENSSL
21 : #include "../../util/alloc/fd_alloc.h"
22 : #include "../../waltz/openssl/fd_openssl.h"
23 : #include "../../waltz/openssl/fd_openssl_tile.h"
24 : #include <openssl/ssl.h>
25 : #endif
26 :
27 : #include <unistd.h>
28 : #include <fcntl.h>
29 : #include <errno.h>
30 : #include <unistd.h>
31 : #include <sys/socket.h>
32 : #include <sys/syscall.h>
33 : #include <netinet/in.h>
34 : #include <netinet/tcp.h>
35 :
36 : #include "generated/fd_event_tile_seccomp.h"
37 :
38 0 : #define GRPC_BUF_MAX (2048UL<<10UL) /* 2 MiB */
39 :
40 0 : #define IN_KIND_SHRED (0)
41 0 : #define IN_KIND_DEDUP (1)
42 : #define IN_KIND_SIGN (2)
43 0 : #define IN_KIND_GENESI (3)
44 0 : #define IN_KIND_IPECHO (4)
45 0 : #define IN_KIND_EVENT (5)
46 :
47 : #define FD_EVENT_TYPE_TXN 1
48 : #define FD_EVENT_TYPE_SHRED 2
49 : #define FD_EVENT_TYPE_SIGNED_VOTE 3
50 :
51 : union fd_event_tile_in {
52 : struct {
53 : fd_wksp_t * mem;
54 : ulong mtu;
55 : ulong chunk0;
56 : ulong wmark;
57 : };
58 : fd_net_rx_bounds_t net_rx;
59 : };
60 :
61 : typedef union fd_event_tile_in fd_event_tile_in_t;
62 :
63 : struct fd_event_tile {
64 : fd_circq_t * circq;
65 : fd_event_client_t * client;
66 :
67 : fd_topo_t const * topo;
68 :
69 : int tile_shutdown_rendered[ FD_TOPO_MAX_TILES ];
70 :
71 : fd_keyswitch_t * keyswitch;
72 :
73 : ulong idle_cnt;
74 :
75 : ulong boot_id;
76 : ulong machine_id;
77 : ulong instance_id;
78 : ulong seed;
79 :
80 : ulong chunk;
81 :
82 : ushort shred_source_port;
83 : ulong shred_buf_sz;
84 : uchar shred_buf[ FD_NET_MTU ];
85 :
86 : ulong event_type;
87 : ulong event_sz;
88 : uchar event_buf[ FD_EVENT_GEN_STRUCT_MAX ];
89 :
90 : uchar identity_pubkey[ 32UL ];
91 :
92 : int use_tls;
93 : #if FD_HAS_OPENSSL
94 : SSL_CTX * ssl_ctx;
95 : #endif
96 :
97 : fd_keyguard_client_t keyguard_client[1];
98 : fd_rng_t rng[1];
99 :
100 : fd_netdb_fds_t netdb_fds[1];
101 :
102 : fd_clock_tile_t clock[1];
103 :
104 : ulong in_cnt;
105 : int in_kind[ 64UL ];
106 : fd_event_tile_in_t in[ 64UL ];
107 : };
108 :
109 : typedef struct fd_event_tile fd_event_tile_t;
110 :
111 : FD_FN_CONST static inline ulong
112 0 : scratch_align( void ) {
113 0 : ulong a = alignof( fd_event_tile_t );
114 0 : a = fd_ulong_max( a, fd_event_client_align() );
115 0 : a = fd_ulong_max( a, fd_circq_align() );
116 0 : # if FD_HAS_OPENSSL
117 0 : a = fd_ulong_max( a, fd_alloc_align() );
118 0 : # endif
119 0 : return a;
120 0 : }
121 :
122 : FD_FN_PURE static inline ulong
123 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
124 0 : (void)tile;
125 :
126 0 : ulong l = FD_LAYOUT_INIT;
127 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_event_tile_t), sizeof(fd_event_tile_t) );
128 0 : l = FD_LAYOUT_APPEND( l, fd_event_client_align(), fd_event_client_footprint( GRPC_BUF_MAX ) );
129 0 : l = FD_LAYOUT_APPEND( l, fd_circq_align(), fd_circq_footprint( 1UL<<30UL ) ); /* 1GiB circq for events */
130 0 : # if FD_HAS_OPENSSL
131 0 : l = FD_LAYOUT_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
132 0 : # endif
133 0 : return FD_LAYOUT_FINI( l, scratch_align() );
134 0 : }
135 :
136 : # if FD_HAS_OPENSSL
137 : FD_FN_CONST static inline ulong
138 0 : loose_footprint( fd_topo_tile_t const * tile ) {
139 0 : (void)tile;
140 : /* Extra workspace memory for OpenSSL dynamic allocations */
141 0 : return 1UL<<26UL; /* 64 MiB */
142 0 : }
143 : # endif
144 :
145 : static inline void
146 0 : metrics_write( fd_event_tile_t * ctx ) {
147 0 : FD_MGAUGE_SET( EVENT, QUEUE_DEPTH, ctx->circq->cnt );
148 0 : FD_MGAUGE_SET( EVENT, QUEUE_UNSENT, fd_circq_unsent_cnt( ctx->circq ) );
149 0 : FD_MCNT_SET( EVENT, QUEUE_DROPPED, ctx->circq->metrics.drop_cnt );
150 0 : FD_MGAUGE_SET( EVENT, QUEUE_BYTES_USED, fd_circq_bytes_used( ctx->circq ) );
151 0 : FD_MGAUGE_SET( EVENT, QUEUE_BYTES_CAPACITY, ctx->circq->size );
152 :
153 0 : fd_event_client_metrics_t const * metrics = fd_event_client_metrics( ctx->client );
154 0 : FD_MCNT_SET( EVENT, SENT, metrics->events_sent );
155 0 : FD_MCNT_SET( EVENT, ACKED, metrics->events_acked );
156 0 : FD_MGAUGE_SET( EVENT, LAST_ACKED_ID, metrics->last_acked_id );
157 0 : FD_MCNT_SET( EVENT, BYTES_WRITTEN, metrics->bytes_written );
158 0 : FD_MCNT_SET( EVENT, BYTES_READ, metrics->bytes_read );
159 0 : FD_MCNT_SET( EVENT, AUTH_FAILED, metrics->auth_fail_cnt );
160 0 : FD_MCNT_SET( EVENT, INVALID_MESSAGE, metrics->invalid_msg_cnt );
161 0 : FD_MCNT_SET( EVENT, CONN_ATTEMPT, metrics->connect_attempt_cnt );
162 0 : FD_MCNT_SET( EVENT, HANDSHAKE_TIMEOUT, metrics->handshake_timeout_cnt );
163 :
164 0 : FD_MGAUGE_SET( EVENT, CONN_STATE, fd_event_client_state( ctx->client ) );
165 0 : }
166 :
167 : static void
168 : before_credit( fd_event_tile_t * ctx,
169 : fd_stem_context_t * stem,
170 0 : int * charge_busy ) {
171 0 : (void)stem;
172 :
173 0 : ctx->idle_cnt++;
174 0 : if( FD_LIKELY( ctx->idle_cnt<2UL*ctx->in_cnt ) ) return;
175 0 : ctx->idle_cnt = 0UL;
176 :
177 0 : fd_event_client_poll( ctx->client, charge_busy );
178 0 : }
179 :
180 : static void
181 : during_frag( fd_event_tile_t * ctx,
182 : ulong in_idx,
183 : ulong seq,
184 : ulong sig,
185 : ulong chunk,
186 : ulong sz,
187 0 : ulong ctl ) {
188 0 : (void)seq; (void)ctl;
189 :
190 0 : fd_event_tile_in_t const * in = &ctx->in[ in_idx ];
191 0 : switch( ctx->in_kind[ in_idx ] ) {
192 0 : case IN_KIND_SHRED: {
193 0 : uchar const * dcache_entry = fd_net_rx_translate_frag( &ctx->in[ in_idx ].net_rx, chunk, ctl, sz );
194 0 : ulong hdr_sz = fd_disco_netmux_sig_hdr_sz( sig );
195 0 : FD_TEST( hdr_sz <= sz ); /* Should be ensured by the net tile */
196 0 : fd_udp_hdr_t const * udp_hdr = (fd_udp_hdr_t const *)( dcache_entry + hdr_sz - sizeof(fd_udp_hdr_t) );
197 0 : ctx->shred_source_port = fd_ushort_bswap( udp_hdr->net_sport );
198 : // TODO: SHOULD BE RELIABLE. MAKE XDP TILE RELIABLE FIRST.
199 0 : fd_memcpy( ctx->shred_buf, dcache_entry+hdr_sz, sz-hdr_sz );
200 0 : ctx->shred_buf_sz = sz-hdr_sz;
201 0 : break;
202 0 : }
203 0 : case IN_KIND_DEDUP:
204 0 : case IN_KIND_GENESI:
205 0 : case IN_KIND_IPECHO:
206 0 : if( FD_UNLIKELY( chunk<in->chunk0 || chunk>in->wmark || sz>in->mtu ) )
207 0 : FD_LOG_CRIT(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, in->chunk0, in->wmark ));
208 0 : ctx->chunk = chunk;
209 0 : break;
210 0 : case IN_KIND_EVENT: {
211 0 : if( FD_UNLIKELY( chunk<in->chunk0 || chunk>in->wmark || sz>in->mtu ) )
212 0 : FD_LOG_CRIT(( "chunk %lu corrupt, not in range [%lu,%lu]", chunk, in->chunk0, in->wmark ));
213 0 : fd_memcpy( ctx->event_buf, fd_chunk_to_laddr_const( in->mem, chunk ), sz );
214 0 : ctx->event_type = sig;
215 0 : ctx->event_sz = sz;
216 0 : break;
217 0 : }
218 0 : default:
219 0 : FD_LOG_CRIT(( "unexpected in_kind %d %lu", ctx->in_kind[ in_idx ], in_idx ));
220 0 : }
221 0 : }
222 :
223 : static void
224 : after_frag( fd_event_tile_t * ctx,
225 : ulong in_idx,
226 : ulong seq,
227 : ulong sig,
228 : ulong sz,
229 : ulong tsorig,
230 : ulong tspub,
231 0 : fd_stem_context_t * stem ) {
232 0 : (void)sz; (void)tsorig; (void)stem;
233 :
234 0 : switch( ctx->in_kind[ in_idx ] ) {
235 0 : case IN_KIND_SHRED: {
236 0 : FD_TEST( ctx->shred_buf_sz<=FD_NET_MTU );
237 : /* TODO: Currently no way to find a tight bound for the buffer
238 : size here, but 4096 is guaranteed to fit since sz<=FD_NET_MTU.
239 : Need to have the schema generator spit out max sizes for
240 : messages. */
241 0 : uchar * buffer = fd_circq_push_back( ctx->circq, 1UL, 4096UL );
242 0 : FD_TEST( buffer );
243 :
244 0 : uint ip_addr = fd_disco_netmux_sig_ip( sig );
245 0 : uint source_port = ctx->shred_source_port;
246 0 : int protocol;
247 0 : switch( fd_disco_netmux_sig_proto( sig ) ) {
248 0 : case DST_PROTO_SHRED:
249 0 : protocol = 1;
250 0 : break;
251 0 : case DST_PROTO_REPAIR:
252 0 : protocol = 2;
253 0 : break;
254 0 : default:
255 : // TODO: Leader shreds?
256 0 : FD_LOG_ERR(( "unexpected proto %lu in sig %lu", fd_disco_netmux_sig_proto( sig ), sig ));
257 0 : }
258 :
259 0 : ulong event_id = fd_event_client_id_reserve( ctx->client );
260 0 : long timestamp_nanos = fd_clock_tile_tickcount_to_wallclock( ctx->clock,
261 0 : fd_clock_tile_tickcount_decomp( ctx->clock, tspub ) );
262 :
263 0 : fd_pb_encoder_t encoder[1];
264 0 : fd_pb_encoder_init( encoder, buffer, 4096UL );
265 :
266 0 : FD_TEST( ctx->circq->cursor_push_seq );
267 0 : fd_pb_push_uint64( encoder, 1U, ctx->circq->cursor_push_seq-1UL );
268 0 : fd_pb_push_uint64( encoder, 2U, event_id );
269 0 : fd_pb_push_uint64( encoder, 3U, seq ); /* link_seq */
270 0 : fd_pb_push_uint64( encoder, 4U, (ulong)timestamp_nanos );
271 :
272 0 : fd_pb_submsg_open( encoder, 5U ); /* Event */
273 0 : fd_pb_submsg_open( encoder, 2U ); /* Shred */
274 0 : fd_pb_push_bytes( encoder, 1U, &ip_addr, 4UL );
275 0 : fd_pb_push_uint32( encoder, 2U, source_port );
276 0 : fd_pb_push_int32( encoder, 3U, protocol );
277 0 : fd_pb_push_bytes( encoder, 4U, ctx->shred_buf, ctx->shred_buf_sz );
278 0 : fd_pb_submsg_close( encoder );
279 0 : fd_pb_submsg_close( encoder );
280 0 : fd_circq_resize_back( ctx->circq, fd_pb_encoder_out_sz( encoder ) );
281 0 : break;
282 0 : }
283 0 : case IN_KIND_DEDUP:
284 0 : FD_TEST( sz<=FD_TPU_PARSED_MTU );
285 : /* See comment above about buffer size. */
286 0 : uchar * buffer = fd_circq_push_back( ctx->circq, 1UL, 4096UL );
287 0 : FD_TEST( buffer );
288 :
289 0 : fd_txn_m_t * txnm = (fd_txn_m_t *)fd_chunk_to_laddr( ctx->in[ in_idx ].mem, ctx->chunk );
290 0 : FD_TEST( txnm->payload_sz<=FD_TPU_MTU );
291 :
292 0 : int protocol = 0;
293 0 : switch( txnm->source_tpu ) {
294 0 : case FD_TXN_M_TPU_SOURCE_QUIC: protocol = 1; break;
295 0 : case FD_TXN_M_TPU_SOURCE_UDP: protocol = 2; break;
296 0 : case FD_TXN_M_TPU_SOURCE_GOSSIP: protocol = 3; break;
297 0 : case FD_TXN_M_TPU_SOURCE_BUNDLE: protocol = 4; break;
298 0 : case FD_TXN_M_TPU_SOURCE_TXSEND: protocol = 5; break;
299 0 : default:
300 0 : FD_LOG_ERR(( "unexpected source_tpu %u", txnm->source_tpu ));
301 0 : }
302 :
303 0 : ulong event_id = fd_event_client_id_reserve( ctx->client );
304 0 : long timestamp_nanos = fd_clock_tile_tickcount_to_wallclock( ctx->clock,
305 0 : fd_clock_tile_tickcount_decomp( ctx->clock, tspub ) );
306 :
307 0 : fd_pb_encoder_t encoder[1];
308 0 : fd_pb_encoder_init( encoder, buffer, 4096UL );
309 :
310 0 : FD_TEST( ctx->circq->cursor_push_seq );
311 0 : fd_pb_push_uint64( encoder, 1U, ctx->circq->cursor_push_seq-1UL );
312 0 : fd_pb_push_uint64( encoder, 2U, event_id );
313 0 : fd_pb_push_uint64( encoder, 3U, seq ); /* link_seq */
314 0 : fd_pb_push_uint64( encoder, 4U, (ulong)timestamp_nanos );
315 :
316 0 : fd_pb_submsg_open( encoder, 5U ); /* Event */
317 0 : fd_pb_submsg_open( encoder, 1U ); /* Txn */
318 0 : fd_pb_push_bytes( encoder, 1U, &txnm->source_ipv4, 4UL );
319 0 : fd_pb_push_uint32( encoder, 2U, 0U ); /* TODO: source port .. */
320 0 : fd_pb_push_int32( encoder, 3U, protocol );
321 0 : fd_pb_push_uint64( encoder, 4U, txnm->block_engine.bundle_id );
322 0 : if( FD_UNLIKELY( txnm->block_engine.bundle_id ) ) {
323 0 : fd_pb_push_uint32( encoder, 5U, (uint)txnm->block_engine.bundle_txn_cnt );
324 0 : fd_pb_push_uint32( encoder, 6U, txnm->block_engine.commission );
325 0 : fd_pb_push_bytes( encoder, 7U, txnm->block_engine.commission_pubkey, 32UL );
326 0 : } else {
327 0 : fd_pb_push_uint32( encoder, 5U, 0U );
328 0 : fd_pb_push_uint32( encoder, 6U, 0U );
329 0 : uchar zero_pubkey[32UL] = {0};
330 0 : fd_pb_push_bytes( encoder, 7U, zero_pubkey, 32UL );
331 0 : }
332 0 : fd_pb_push_bytes( encoder, 8U, fd_txn_m_payload( txnm ), txnm->payload_sz );
333 :
334 0 : fd_pb_submsg_close( encoder );
335 0 : fd_pb_submsg_close( encoder );
336 0 : fd_circq_resize_back( ctx->circq, fd_pb_encoder_out_sz( encoder ) );
337 :
338 0 : break;
339 0 : case IN_KIND_GENESI: {
340 0 : fd_genesis_meta_t const * genesis_meta = fd_chunk_to_laddr( ctx->in[ in_idx ].mem, ctx->chunk );
341 0 : fd_event_client_init_genesis( ctx->client, genesis_meta );
342 0 : break;
343 0 : }
344 0 : case IN_KIND_IPECHO:
345 0 : FD_TEST( sig && sig<=USHORT_MAX );
346 0 : fd_event_client_init_shred_version( ctx->client, (ushort)sig );
347 0 : break;
348 0 : case IN_KIND_EVENT: {
349 0 : long timestamp_nanos = fd_clock_tile_tickcount_to_wallclock( ctx->clock, fd_clock_tile_tickcount_decomp( ctx->clock, tspub ) );
350 0 : fd_event_serialize_by_type( ctx->event_type, ctx->circq, ctx->client, timestamp_nanos, seq, ctx->event_buf, ctx->event_sz );
351 0 : break;
352 0 : }
353 0 : default:
354 0 : FD_LOG_ERR(( "unexpected in_kind %d", ctx->in_kind[ in_idx ] ));
355 0 : }
356 0 : }
357 :
358 : static void
359 : privileged_init( fd_topo_t const * topo,
360 0 : fd_topo_tile_t const * tile ) {
361 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
362 :
363 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
364 0 : fd_event_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_event_tile_t), sizeof(fd_event_tile_t) );
365 0 : FD_SCRATCH_ALLOC_APPEND( l, fd_event_client_align(), fd_event_client_footprint( GRPC_BUF_MAX ) );
366 0 : FD_SCRATCH_ALLOC_APPEND( l, fd_circq_align(), fd_circq_footprint( 1UL<<30UL ) );
367 0 : # if FD_HAS_OPENSSL
368 0 : void * alloc_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
369 0 : (void)alloc_mem;
370 0 : # endif
371 :
372 0 : ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
373 0 : if( FD_UNLIKELY( scratch_top > (ulong)scratch + scratch_footprint( tile ) ) )
374 0 : FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
375 :
376 0 : if( FD_UNLIKELY( !strcmp( tile->event.identity_key_path, "" ) ) ) FD_LOG_ERR(( "identity_key_path not set" ));
377 0 : const uchar * identity_key = fd_keyload_load( tile->event.identity_key_path, /* pubkey only: */ 1 );
378 0 : fd_memcpy( ctx->identity_pubkey, identity_key, 32UL );
379 :
380 0 : FD_TEST( fd_rng_secure( &ctx->seed, 8UL ) );
381 0 : FD_TEST( fd_rng_secure( &ctx->instance_id, 8UL ) );
382 :
383 0 : #define FD_EVENT_ID_SEED 0x812CAFEBABEFEEE0UL
384 :
385 0 : char _boot_id[ 36 ];
386 0 : int boot_id_fd = open( "/proc/sys/kernel/random/boot_id", O_RDONLY );
387 0 : if( FD_UNLIKELY( -1==boot_id_fd ) ) FD_LOG_ERR(( "open(/proc/sys/kernel/random/boot_id) failed (%d-%s)", errno, fd_io_strerror( errno ) ));
388 0 : if( FD_UNLIKELY( 36UL!=read( boot_id_fd, _boot_id, 36UL ) ) ) FD_LOG_ERR(( "read(/proc/sys/kernel/random/boot_id) failed (%d-%s)", errno, fd_io_strerror( errno ) ));
389 0 : if( FD_UNLIKELY( -1==close( boot_id_fd ) ) ) FD_LOG_ERR(( "close(/proc/sys/kernel/random/boot_id) failed (%d-%s)", errno, fd_io_strerror( errno ) ));
390 :
391 0 : ctx->boot_id = fd_hash( FD_EVENT_ID_SEED, _boot_id, 36UL );
392 :
393 0 : char _machine_id[ 32 ];
394 0 : int machine_id_fd = open( "/etc/machine-id", O_RDONLY );
395 0 : if( FD_UNLIKELY( -1==machine_id_fd ) ) FD_LOG_ERR(( "open(/etc/machine-id) failed (%d-%s)", errno, fd_io_strerror( errno ) ));
396 0 : if( FD_UNLIKELY( 32UL!=read( machine_id_fd, _machine_id, 32UL ) ) ) FD_LOG_ERR(( "read(/etc/machine-id) failed (%d-%s)", errno, fd_io_strerror( errno ) ));
397 0 : if( FD_UNLIKELY( -1==close( machine_id_fd ) ) ) FD_LOG_ERR(( "close(/etc/machine-id) failed (%d-%s)", errno, fd_io_strerror( errno ) ));
398 :
399 0 : ctx->machine_id = fd_hash( FD_EVENT_ID_SEED, _machine_id, 32UL );
400 :
401 0 : if( FD_UNLIKELY( !fd_netdb_open_fds( ctx->netdb_fds ) ) ) {
402 0 : FD_LOG_ERR(( "fd_netdb_open_fds failed" ));
403 0 : }
404 :
405 : /* Detect TLS from the URL scheme */
406 0 : fd_url_t url[ 1UL ];
407 0 : ushort port;
408 0 : _Bool is_ssl = 0;
409 0 : if( FD_UNLIKELY( fd_url_parse_endpoint( url, tile->event.url, strlen( tile->event.url ), &port, &is_ssl, "[tiles.event.url]" ) ) ) {
410 0 : FD_LOG_ERR(( "Could not parse [tiles.event.url]" ));
411 0 : }
412 0 : ctx->use_tls = is_ssl;
413 :
414 0 : # if FD_HAS_OPENSSL
415 0 : ctx->ssl_ctx = NULL;
416 0 : if( ctx->use_tls ) {
417 0 : fd_alloc_t * alloc = fd_alloc_join( fd_alloc_new( alloc_mem, 1UL ), tile->kind_id );
418 0 : if( FD_UNLIKELY( !alloc ) ) FD_LOG_ERR(( "fd_alloc_new failed" ));
419 0 : fd_ossl_tile_init( alloc );
420 :
421 0 : SSL_CTX * ssl_ctx = SSL_CTX_new( TLS_client_method() );
422 0 : if( FD_UNLIKELY( !ssl_ctx ) ) FD_LOG_ERR(( "SSL_CTX_new failed" ));
423 :
424 0 : if( FD_UNLIKELY( !SSL_CTX_set_mode( ssl_ctx, SSL_MODE_ENABLE_PARTIAL_WRITE|SSL_MODE_AUTO_RETRY ) ) )
425 0 : FD_LOG_ERR(( "SSL_CTX_set_mode failed" ));
426 :
427 0 : if( FD_UNLIKELY( !SSL_CTX_set_min_proto_version( ssl_ctx, TLS1_3_VERSION ) ) )
428 0 : FD_LOG_ERR(( "SSL_CTX_set_min_proto_version(ssl_ctx,TLS1_3_VERSION) failed" ));
429 :
430 0 : if( FD_UNLIKELY( 0!=SSL_CTX_set_alpn_protos( ssl_ctx, (uchar const *)"\x02h2", 3 ) ) )
431 0 : FD_LOG_ERR(( "SSL_CTX_set_alpn_protos failed" ));
432 :
433 0 : fd_ossl_load_certs( ssl_ctx ); /* also sets SSL_VERIFY_PEER */
434 :
435 0 : ctx->ssl_ctx = ssl_ctx;
436 0 : }
437 : # else
438 : if( FD_UNLIKELY( ctx->use_tls ) ) {
439 : FD_LOG_ERR(( "TLS requested for event service (https:// URL) but this build "
440 : "does not include OpenSSL. Re-run ./deps.sh and do a clean rebuild." ));
441 : }
442 : # endif
443 0 : }
444 :
445 : static int
446 : link_is_event_report( fd_topo_t const * topo,
447 0 : ulong link_id ) {
448 0 : for( ulong i=0UL; i<topo->tile_cnt; i++ ) {
449 0 : if( FD_UNLIKELY( topo->tiles[ i ].event_link_id==link_id ) ) return 1;
450 0 : }
451 0 : return 0;
452 0 : }
453 :
454 : static void
455 : unprivileged_init( fd_topo_t const * topo,
456 0 : fd_topo_tile_t const * tile ) {
457 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
458 :
459 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
460 0 : fd_event_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_event_tile_t), sizeof(fd_event_tile_t) );
461 0 : void * _event_client = FD_SCRATCH_ALLOC_APPEND( l, fd_event_client_align(), fd_event_client_footprint( GRPC_BUF_MAX ) );
462 0 : void * _circq = FD_SCRATCH_ALLOC_APPEND( l, fd_circq_align(), fd_circq_footprint( 1UL<<30UL ) );
463 0 : # if FD_HAS_OPENSSL
464 0 : FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
465 0 : # endif
466 :
467 0 : ulong sign_in_idx = fd_topo_find_tile_in_link ( topo, tile, "sign_event", tile->kind_id );
468 0 : ulong sign_out_idx = fd_topo_find_tile_out_link( topo, tile, "event_sign", tile->kind_id );
469 0 : FD_TEST( sign_in_idx!=ULONG_MAX );
470 0 : fd_topo_link_t const * sign_in = &topo->links[ tile->in_link_id[ sign_in_idx ] ];
471 0 : fd_topo_link_t const * sign_out = &topo->links[ tile->out_link_id[ sign_out_idx ] ];
472 0 : if( FD_UNLIKELY( !fd_keyguard_client_join( fd_keyguard_client_new( ctx->keyguard_client,
473 0 : sign_out->mcache,
474 0 : sign_out->dcache,
475 0 : sign_in->mcache,
476 0 : sign_in->dcache,
477 0 : sign_out->mtu ) ) ) ) {
478 0 : FD_LOG_ERR(( "failed to construct keyguard" ));
479 0 : }
480 :
481 0 : FD_TEST( fd_rng_join( fd_rng_new( ctx->rng, 0U, ctx->seed ) ) );
482 :
483 0 : ctx->keyswitch = fd_keyswitch_join( fd_topo_obj_laddr( topo, tile->id_keyswitch_obj_id ) );
484 0 : FD_TEST( ctx->keyswitch );
485 :
486 0 : ctx->circq = fd_circq_join( fd_circq_new( _circq, 1UL<<30UL /* 1GiB */ ) );
487 0 : FD_TEST( ctx->circq );
488 :
489 0 : void * ssl_ctx_ptr = NULL;
490 0 : # if FD_HAS_OPENSSL
491 0 : ssl_ctx_ptr = ctx->ssl_ctx;
492 0 : # endif
493 :
494 0 : ctx->client = fd_event_client_join( fd_event_client_new( _event_client,
495 0 : ctx->keyguard_client,
496 0 : ctx->rng,
497 0 : ctx->circq,
498 0 : 2*(1UL<<20UL) /* 2 MiB */,
499 0 : tile->event.url,
500 0 : ctx->identity_pubkey,
501 0 : fd_version_cstr,
502 0 : fd_commit_ref_cstr,
503 0 : tile->event.action,
504 0 : ctx->instance_id,
505 0 : ctx->boot_id,
506 0 : ctx->machine_id,
507 0 : GRPC_BUF_MAX,
508 0 : ctx->use_tls,
509 0 : ssl_ctx_ptr ) );
510 0 : FD_TEST( ctx->client );
511 :
512 0 : ctx->topo = topo;
513 0 : fd_memset( ctx->tile_shutdown_rendered, 0, sizeof(ctx->tile_shutdown_rendered) );
514 :
515 0 : ctx->idle_cnt = 0UL;
516 :
517 0 : FD_TEST( tile->in_cnt<=sizeof(ctx->in_kind)/sizeof(ctx->in_kind[0]) );
518 0 : ulong polled_in_idx = 0UL;
519 0 : for( ulong i=0UL; i<tile->in_cnt; i++ ) {
520 0 : if( FD_UNLIKELY( !tile->in_link_poll[ i ] ) ) continue;
521 :
522 0 : fd_topo_link_t const * link = &topo->links[ tile->in_link_id[ i ] ];
523 0 : fd_topo_wksp_t const * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ];
524 :
525 0 : if( FD_LIKELY( !strcmp( link->name, "net_shred" ) ) ) {
526 0 : fd_net_rx_bounds_init( &ctx->in[ polled_in_idx ].net_rx, link->dcache );
527 0 : ctx->in_kind[ polled_in_idx ] = IN_KIND_SHRED;
528 0 : polled_in_idx++;
529 0 : continue; /* only net_rx needs to be set in this case. */
530 0 : }
531 0 : else if( FD_LIKELY( !strcmp( link->name, "dedup_resolv" ) ) ) ctx->in_kind[ polled_in_idx ] = IN_KIND_DEDUP;
532 0 : else if( FD_LIKELY( !strcmp( link->name, "genesi_out" ) ) ) ctx->in_kind[ polled_in_idx ] = IN_KIND_GENESI;
533 0 : else if( FD_LIKELY( !strcmp( link->name, "ipecho_out" ) ) ) ctx->in_kind[ polled_in_idx ] = IN_KIND_IPECHO;
534 0 : else if( FD_LIKELY( link_is_event_report( topo, link->id ) ) ) {
535 0 : ctx->in_kind[ polled_in_idx ] = IN_KIND_EVENT;
536 0 : FD_TEST( link->mtu<=sizeof(ctx->event_buf) );
537 0 : }
538 0 : else FD_LOG_ERR(( "event tile has unexpected input link %lu %s", i, link->name ));
539 :
540 0 : ctx->in[ polled_in_idx ].mem = link_wksp->wksp;
541 0 : ctx->in[ polled_in_idx ].mtu = link->mtu;
542 0 : if( FD_UNLIKELY( ctx->in[ polled_in_idx ].mtu ) ) {
543 0 : ctx->in[ polled_in_idx ].chunk0 = fd_dcache_compact_chunk0( ctx->in[ polled_in_idx ].mem, link->dcache );
544 0 : ctx->in[ polled_in_idx ].wmark = fd_dcache_compact_wmark ( ctx->in[ polled_in_idx ].mem, link->dcache, link->mtu );
545 0 : } else {
546 0 : ctx->in[ polled_in_idx ].chunk0 = 0UL;
547 0 : ctx->in[ polled_in_idx ].wmark = 0UL;
548 0 : }
549 0 : polled_in_idx++;
550 0 : }
551 0 : ctx->in_cnt = polled_in_idx;
552 :
553 0 : fd_clock_tile_init( ctx->clock );
554 :
555 0 : ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
556 0 : if( FD_UNLIKELY( scratch_top > (ulong)scratch + scratch_footprint( tile ) ) )
557 0 : FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
558 0 : }
559 :
560 : static ulong
561 : populate_allowed_seccomp( fd_topo_t const * topo,
562 : fd_topo_tile_t const * tile,
563 : ulong out_cnt,
564 0 : struct sock_filter * out ) {
565 0 : fd_event_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id );
566 :
567 0 : populate_sock_filter_policy_fd_event_tile(
568 0 : out_cnt, out,
569 0 : (uint)fd_log_private_logfile_fd(),
570 0 : (uint)ctx->netdb_fds->etc_hosts,
571 0 : (uint)ctx->netdb_fds->etc_resolv_conf );
572 0 : return sock_filter_policy_fd_event_tile_instr_cnt;
573 0 : }
574 :
575 : static ulong
576 : populate_allowed_fds( fd_topo_t const * topo,
577 : fd_topo_tile_t const * tile,
578 : ulong out_fds_cnt,
579 0 : int * out_fds ) {
580 0 : fd_event_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id );
581 :
582 0 : if( FD_UNLIKELY( out_fds_cnt<4UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
583 :
584 0 : ulong out_cnt = 0;
585 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
586 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
587 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
588 0 : if( FD_LIKELY( ctx->netdb_fds->etc_hosts >= 0 ) )
589 0 : out_fds[ out_cnt++ ] = ctx->netdb_fds->etc_hosts;
590 0 : out_fds[ out_cnt++ ] = ctx->netdb_fds->etc_resolv_conf;
591 0 : return out_cnt;
592 0 : }
593 :
594 : static void
595 0 : during_housekeeping( fd_event_tile_t * ctx ) {
596 0 : if( FD_UNLIKELY( fd_clock_tile_recal_due( ctx->clock ) ) ) {
597 0 : fd_clock_tile_recal( ctx->clock );
598 0 : }
599 :
600 0 : if( FD_UNLIKELY( fd_keyswitch_state_query( ctx->keyswitch )==FD_KEYSWITCH_STATE_SWITCH_PENDING ) ) {
601 0 : FD_LOG_DEBUG(( "keyswitch: switching identity" ));
602 0 : memcpy( ctx->identity_pubkey, ctx->keyswitch->bytes, 32UL );
603 0 : fd_event_client_set_identity( ctx->client, ctx->identity_pubkey );
604 0 : fd_keyswitch_state( ctx->keyswitch, FD_KEYSWITCH_STATE_COMPLETED );
605 0 : }
606 0 : }
607 :
608 0 : #define STEM_BURST (1UL)
609 0 : #define STEM_LAZY ((long)10e6) /* 10ms */
610 :
611 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_event_tile_t
612 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_event_tile_t)
613 :
614 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
615 0 : #define STEM_CALLBACK_BEFORE_CREDIT before_credit
616 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
617 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
618 0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
619 :
620 : #include "../stem/fd_stem.c"
621 :
622 : fd_topo_run_tile_t fd_tile_event = {
623 : .name = "event",
624 : .rlimit_file_cnt = 5UL, /* stderr, logfile, /etc/hosts, /etc/resolv.conf, and socket to the server */
625 : .populate_allowed_seccomp = populate_allowed_seccomp,
626 : .populate_allowed_fds = populate_allowed_fds,
627 : .scratch_align = scratch_align,
628 : .scratch_footprint = scratch_footprint,
629 : # if FD_HAS_OPENSSL
630 : .loose_footprint = loose_footprint,
631 : # endif
632 : .privileged_init = privileged_init,
633 : .unprivileged_init = unprivileged_init,
634 : .run = stem_run,
635 : .keep_host_networking = 1,
636 : .allow_connect = 1,
637 : };
|