Line data Source code
1 : #define _GNU_SOURCE
2 : #include "fd_bundle_tile_private.h"
3 : #include "fd_bundle_tile.h"
4 : #include "../fd_txn_m.h"
5 : #include "../metrics/fd_metrics.h"
6 : #include "../topo/fd_topo.h"
7 : #include "../keyguard/fd_keyload.h"
8 : #include "../../waltz/http/fd_url.h"
9 : #include "../../waltz/openssl/fd_openssl_tile.h"
10 :
11 : #if FD_HAS_OPENSSL
12 : #include <errno.h>
13 : #endif
14 :
15 : #include <dirent.h> /* opendir */
16 : #include <stdio.h> /* snprintf */
17 : #include <fcntl.h> /* F_SETFL */
18 : #include <unistd.h> /* close */
19 : #include <sys/mman.h> /* PROT_READ (seccomp) */
20 : #include <sys/uio.h> /* writev */
21 : #include <netinet/in.h> /* AF_INET */
22 : #include <netinet/tcp.h> /* TCP_FASTOPEN_CONNECT (seccomp) */
23 : #include "../../waltz/resolv/fd_netdb.h"
24 : #include "../../discof/replay/fd_replay_tile.h"
25 :
26 : #include "generated/fd_bundle_tile_seccomp.h"
27 :
28 6 : #define IN_KIND_REPLAY_OUT (1)
29 :
30 0 : #define STEM_BURST (5UL)
31 : FD_STATIC_ASSERT( FD_BUNDLE_CLIENT_MAX_TXN_PER_BUNDLE<=STEM_BURST, stem_burst );
32 :
33 : /* hysteresis thresholds to avoid bouncing (e.g. during forks) */
34 36 : #define FD_BUNDLE_SLEEP_THRESHOLD_SLOTS (450UL)
35 30 : #define FD_BUNDLE_WAKE_THRESHOLD_SLOTS (400UL)
36 69 : #define FD_BUNDLE_SLEEP_CHECK_INTERVAL_NS ((long)5e9)
37 :
38 : FD_FN_CONST static ulong
39 0 : scratch_align( void ) {
40 0 : return fd_ulong_max( fd_ulong_max( fd_ulong_max( alignof(fd_bundle_tile_t), fd_grpc_client_align() ), fd_alloc_align() ), pending_txn_align() );
41 0 : }
42 :
43 : FD_FN_CONST static ulong
44 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
45 0 : ulong pending_max = tile->bundle.out_depth;
46 0 : ulong l = FD_LAYOUT_INIT;
47 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_bundle_tile_t), sizeof(fd_bundle_tile_t) );
48 0 : l = FD_LAYOUT_APPEND( l, fd_grpc_client_align(), fd_grpc_client_footprint( tile->bundle.buf_sz ) );
49 0 : l = FD_LAYOUT_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
50 0 : l = FD_LAYOUT_APPEND( l, pending_txn_align(), pending_txn_footprint( pending_max ) );
51 0 : return FD_LAYOUT_FINI( l, scratch_align() );
52 0 : }
53 :
54 : FD_FN_CONST static inline ulong
55 0 : loose_footprint( fd_topo_tile_t const * tile ) {
56 : /* Leftover space for OpenSSL allocations */
57 0 : return tile->bundle.ssl_heap_sz;
58 0 : }
59 :
60 : static void
61 78 : fd_bundle_tile_maybe_sleep( fd_bundle_tile_t * ctx, long now_ns ) {
62 78 : if( FD_UNLIKELY( !ctx->replay_in.mem ) ) return;
63 69 : if( FD_LIKELY( now_ns < ctx->sleep_check_ns ) ) return;
64 66 : ctx->sleep_check_ns = now_ns + FD_BUNDLE_SLEEP_CHECK_INTERVAL_NS;
65 :
66 66 : ulong next_leader_slot = ctx->next_leader_slot;
67 66 : ulong reset_slot = ctx->reset_slot;
68 :
69 : /* Either don't know the leader schedule yet or have no upcoming
70 : leader slots. Sleep. */
71 66 : if( FD_UNLIKELY( next_leader_slot==ULONG_MAX || reset_slot==ULONG_MAX ) ) {
72 12 : if( !ctx->sleep_mode ) {
73 9 : ctx->sleep_mode = 1;
74 9 : FD_LOG_INFO(( "Bundle tile entering sleep mode: no upcoming leader slots" ));
75 9 : }
76 12 : return;
77 12 : }
78 :
79 54 : ulong slots_until_leader = fd_ulong_sat_sub( next_leader_slot, reset_slot );
80 :
81 54 : if( ctx->sleep_mode ) {
82 24 : if( slots_until_leader <= FD_BUNDLE_WAKE_THRESHOLD_SLOTS ) {
83 15 : ctx->sleep_mode = 0;
84 15 : ctx->last_bundle_status_log_nanos = now_ns;
85 15 : FD_LOG_INFO(( "Bundle tile waking up: next leader slot %lu (~%lu slots away)", next_leader_slot, slots_until_leader ));
86 15 : }
87 30 : } else {
88 : /* reset_slot stays at/near next_leader_slot throughout a leader
89 : rotation and only jumps to the next rotation after leadership
90 : ends, so this cannot trigger mid-rotation. */
91 30 : if( slots_until_leader > FD_BUNDLE_SLEEP_THRESHOLD_SLOTS ) {
92 18 : ctx->sleep_mode = 1;
93 18 : FD_LOG_INFO(( "Bundle tile entering sleep mode: next leader slot %lu (~%lu slots away)", next_leader_slot, slots_until_leader ));
94 18 : }
95 30 : }
96 54 : }
97 :
98 : static inline void
99 0 : metrics_write( fd_bundle_tile_t * ctx ) {
100 0 : FD_MCNT_SET( BUNDLE, TXN_RX, ctx->metrics.txn_received_cnt );
101 0 : FD_MCNT_SET( BUNDLE, BUNDLE_RX, ctx->metrics.bundle_received_cnt );
102 0 : FD_MCNT_SET( BUNDLE, PKT_RX, ctx->metrics.packet_received_cnt );
103 0 : FD_MCNT_SET( BUNDLE, PROTOBUF_RX_BYTES, ctx->metrics.proto_received_bytes );
104 0 : FD_MCNT_SET( BUNDLE, SHREDSTREAM_HEARTBEAT_SENT, ctx->metrics.shredstream_heartbeat_cnt );
105 0 : FD_MCNT_SET( BUNDLE, PING_ACKED, ctx->metrics.ping_ack_cnt );
106 0 : FD_MCNT_SET( BUNDLE, CONN_ERROR_PROTOBUF, ctx->metrics.decode_fail_cnt );
107 0 : FD_MCNT_SET( BUNDLE, CONN_ERROR_TRANSPORT, ctx->metrics.transport_fail_cnt );
108 0 : FD_MCNT_SET( BUNDLE, CONN_ERROR_NO_FEE_INFO, ctx->metrics.missing_builder_info_fail_cnt );
109 0 : FD_MGAUGE_SET( BUNDLE, TXN_PENDING, pending_txn_cnt( ctx->pending_txns ) );
110 0 : FD_MCNT_SET ( BUNDLE, TXN_BUFFER_FULL, ctx->metrics.backpressure_drop_cnt );
111 0 : #if FD_HAS_OPENSSL
112 0 : FD_MCNT_SET( BUNDLE, CONN_ERROR_SSL_ALLOC, fd_ossl_alloc_errors );
113 0 : #endif
114 :
115 0 : FD_MGAUGE_SET( BUNDLE, RTT_SAMPLE_NANOS, (ulong)ctx->rtt->latest_rtt );
116 0 : FD_MGAUGE_SET( BUNDLE, RTT_SMOOTHED_NANOS, (ulong)ctx->rtt->smoothed_rtt );
117 0 : FD_MGAUGE_SET( BUNDLE, RTT_VARIANCE_NANOS, (ulong)ctx->rtt->var_rtt );
118 :
119 0 : FD_MHIST_COPY( BUNDLE, MESSAGE_RX_DELAY_NANOS, ctx->metrics.msg_rx_delay );
120 :
121 0 : fd_wksp_t * wksp = fd_wksp_containing( ctx );
122 0 : fd_wksp_usage_t usage[1];
123 0 : ulong const free_tag = 0UL;
124 0 : if( FD_UNLIKELY( !fd_wksp_usage( wksp, &free_tag, 1UL, usage ) ) ) {
125 0 : FD_LOG_ERR(( "fd_wksp_usage failed" )); /* unreachable */
126 0 : }
127 0 : FD_MGAUGE_SET( BUNDLE, HEAP_SIZE_BYTES, usage->total_sz );
128 0 : FD_MGAUGE_SET( BUNDLE, HEAP_FREE_BYTES, usage->free_sz );
129 :
130 0 : int status = fd_bundle_client_status( ctx );
131 0 : ulong state = (ulong)status;
132 0 : if( FD_UNLIKELY( ctx->sleep_mode ) ) state = FD_BUNDLE_STATE_SLEEPING;
133 :
134 0 : FD_MGAUGE_SET( BUNDLE, STATE, state );
135 0 : ctx->bundle_status_recent = (uchar)state;
136 0 : }
137 :
138 : void
139 6 : fd_bundle_tile_housekeeping( fd_bundle_tile_t * ctx ) {
140 6 : long log_interval_ns = (long)30e9;
141 6 : int status = fd_bundle_client_status( ctx );
142 6 : long log_next_ns = ctx->last_bundle_status_log_nanos + log_interval_ns;
143 6 : long now_ns = fd_log_wallclock();
144 :
145 6 : if( FD_UNLIKELY( !ctx->sleep_mode && status!=FD_BUNDLE_STATE_CONNECTED && now_ns>log_next_ns ) ) {
146 0 : FD_LOG_WARNING(( "No bundle server connection in the last %ld seconds", log_interval_ns/(long)1e9 ) );
147 0 : ctx->last_bundle_status_log_nanos = now_ns;
148 0 : }
149 :
150 6 : if( FD_UNLIKELY( fd_keyswitch_state_query( ctx->keyswitch )==FD_KEYSWITCH_STATE_SWITCH_PENDING ) ) {
151 3 : fd_memcpy( ctx->auther.pubkey, ctx->keyswitch->bytes, 32UL );
152 3 : fd_keyswitch_state( ctx->keyswitch, FD_KEYSWITCH_STATE_COMPLETED );
153 3 : ctx->defer_reset = 1;
154 3 : ctx->sleep_check_ns = 0;
155 3 : }
156 :
157 6 : fd_bundle_tile_maybe_sleep( ctx, now_ns );
158 6 : }
159 :
160 : static void
161 : fd_bundle_tile_publish_block_engine_update(
162 : fd_bundle_tile_t * ctx,
163 : fd_stem_context_t * stem
164 0 : ) {
165 0 : fd_bundle_block_engine_update_t * update =
166 0 : fd_chunk_to_laddr( ctx->plugin_out.mem, ctx->plugin_out.chunk );
167 0 : memset( update, 0, sizeof(fd_bundle_block_engine_update_t) );
168 :
169 0 : strncpy( update->name, "jito", sizeof(update->name) );
170 :
171 : /* Deliberately silently truncates */
172 0 : snprintf( update->url, sizeof(update->url), "%s://%.*s:%u",
173 0 : ctx->is_ssl ? "https" : "http",
174 0 : (int)ctx->server_fqdn_len,
175 0 : ctx->server_fqdn,
176 0 : ctx->server_tcp_port );
177 :
178 : /* Format IPv4 string */
179 0 : snprintf( update->ip_cstr, sizeof(update->ip_cstr),
180 0 : FD_IP4_ADDR_FMT,
181 0 : FD_IP4_ADDR_FMT_ARGS( ctx->server_ip4_addr ) );
182 :
183 0 : ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_bundle_now() );
184 0 : fd_stem_publish(
185 0 : stem,
186 0 : ctx->plugin_out.idx,
187 0 : (ulong)ctx->bundle_status_recent,
188 0 : ctx->plugin_out.chunk,
189 0 : sizeof(fd_bundle_block_engine_update_t),
190 0 : 0UL, /* ctl */
191 0 : 0UL, /* seq */
192 0 : tspub
193 0 : );
194 0 : ctx->plugin_out.chunk = fd_dcache_compact_next( ctx->plugin_out.chunk, sizeof(fd_bundle_block_engine_update_t), ctx->plugin_out.chunk0, ctx->plugin_out.wmark );
195 0 : }
196 :
197 : static void
198 : during_frag( fd_bundle_tile_t * ctx,
199 : ulong in_idx,
200 : ulong seq FD_PARAM_UNUSED,
201 : ulong sig,
202 : ulong chunk,
203 : ulong sz FD_PARAM_UNUSED,
204 21 : ulong ctl FD_PARAM_UNUSED ) {
205 :
206 21 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_REPLAY_OUT ) ) {
207 18 : if( FD_LIKELY( sig!=REPLAY_SIG_RESET ) ) return;
208 15 : if( FD_UNLIKELY( chunk<ctx->replay_in.chunk0 || chunk>ctx->replay_in.wmark || sz!=sizeof(fd_poh_reset_t) ) )
209 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->replay_in.chunk0, ctx->replay_in.wmark ));
210 :
211 15 : fd_poh_reset_t const * reset = fd_chunk_to_laddr_const( ctx->replay_in.mem, chunk );
212 15 : ctx->next_leader_slot_staged = reset->next_leader_slot;
213 15 : ctx->reset_slot_staged = reset->completed_slot;
214 15 : }
215 21 : }
216 :
217 : static void
218 : after_frag( fd_bundle_tile_t * ctx,
219 : ulong in_idx,
220 : ulong seq FD_PARAM_UNUSED,
221 : ulong sig,
222 : ulong sz FD_PARAM_UNUSED,
223 : ulong tsorig FD_PARAM_UNUSED,
224 : ulong tspub FD_PARAM_UNUSED,
225 21 : fd_stem_context_t * stem FD_PARAM_UNUSED ) {
226 :
227 21 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_REPLAY_OUT ) ) {
228 18 : if( FD_LIKELY( sig!=REPLAY_SIG_RESET ) ) return;
229 15 : ctx->next_leader_slot = ctx->next_leader_slot_staged;
230 15 : ctx->reset_slot = ctx->reset_slot_staged;
231 15 : }
232 21 : }
233 :
234 : static void
235 : before_credit( fd_bundle_tile_t * ctx,
236 : fd_stem_context_t * stem,
237 0 : int * charge_busy ) {
238 0 : if( FD_UNLIKELY( !ctx->stem ) ) {
239 0 : ctx->stem = stem;
240 0 : }
241 :
242 0 : if( FD_UNLIKELY( ctx->sleep_mode ) ) {
243 0 : if( ctx->tcp_sock>=0 ) {
244 0 : fd_bundle_client_reset( ctx );
245 : /* Override backoff so we don't treat this as an error */
246 0 : ctx->backoff_until = 0;
247 0 : ctx->backoff_iter = 0;
248 0 : }
249 0 : return;
250 0 : }
251 :
252 0 : if( pending_txn_empty( ctx->pending_txns ) ) {
253 0 : fd_bundle_client_step( ctx, charge_busy );
254 0 : }
255 0 : }
256 :
257 : static void
258 : after_credit( fd_bundle_tile_t * ctx,
259 : fd_stem_context_t * stem,
260 : int * opt_poll_in,
261 0 : int * charge_busy ) {
262 0 : if( !pending_txn_empty( ctx->pending_txns ) ) {
263 0 : fd_bundle_pending_txn_t * head = pending_txn_peek_head( ctx->pending_txns );
264 0 : ulong drain_seq = head->bundle_seq;
265 0 : ulong drain_sig = head->sig;
266 0 : ulong drain_cnt = 0UL;
267 :
268 0 : do {
269 0 : fd_bundle_pending_txn_t const * txn = pending_txn_peek_head( ctx->pending_txns );
270 :
271 0 : fd_txn_m_t * txnm = fd_chunk_to_laddr( ctx->verify_out.mem, ctx->verify_out.chunk );
272 0 : *txnm = (fd_txn_m_t) {
273 0 : .reference_slot = 0UL,
274 0 : .payload_sz = txn->payload_sz,
275 0 : .txn_t_sz = 0U,
276 0 : .source_ipv4 = txn->source_ipv4,
277 0 : .source_tpu = FD_TXN_M_TPU_SOURCE_BUNDLE,
278 0 : .block_engine = {
279 0 : .bundle_id = txn->bundle_seq,
280 0 : .bundle_txn_cnt = txn->bundle_txn_cnt,
281 0 : .commission = txn->commission,
282 0 : },
283 0 : };
284 0 : fd_memcpy( txnm->block_engine.commission_pubkey, txn->commission_pubkey, 32UL );
285 0 : fd_memcpy( fd_txn_m_payload( txnm ), txn->payload, txn->payload_sz );
286 :
287 0 : ulong sz = fd_txn_m_realized_footprint( txnm, 0, 0 );
288 0 : ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_bundle_now() );
289 0 : fd_stem_publish( stem, ctx->verify_out.idx, txn->sig, ctx->verify_out.chunk, sz, 0UL, 0UL, tspub );
290 0 : ctx->verify_out.chunk = fd_dcache_compact_next( ctx->verify_out.chunk, sz, ctx->verify_out.chunk0, ctx->verify_out.wmark );
291 :
292 0 : pending_txn_remove_head( ctx->pending_txns );
293 0 : drain_cnt++;
294 0 : } while( fd_bundle_drain_continue( ctx->pending_txns, drain_sig, drain_seq, drain_cnt, STEM_BURST ) );
295 :
296 0 : *charge_busy = 1;
297 0 : *opt_poll_in = 0;
298 0 : }
299 :
300 0 : if( ctx->plugin_out.mem ) {
301 0 : if( FD_UNLIKELY( ctx->bundle_status_recent != ctx->bundle_status_plugin ) ) {
302 0 : fd_bundle_tile_publish_block_engine_update( ctx, stem );
303 0 : ctx->bundle_status_plugin = (uchar)ctx->bundle_status_recent;
304 0 : *charge_busy = 1;
305 0 : }
306 0 : }
307 0 : }
308 :
309 : #ifndef FD_TILE_TEST
310 : static void
311 : fd_bundle_tile_parse_endpoint( fd_bundle_tile_t * ctx,
312 0 : fd_topo_tile_t const * tile ) {
313 0 : fd_url_t url[1];
314 0 : _Bool is_ssl = 0;
315 0 : if( FD_UNLIKELY( fd_url_parse_endpoint( url,
316 0 : tile->bundle.url,
317 0 : tile->bundle.url_len,
318 0 : &ctx->server_tcp_port,
319 0 : &is_ssl,
320 0 : "[tiles.bundle.url]" ) ) ) {
321 0 : FD_LOG_ERR(( "Could not parse [tiles.bundle.url]" ));
322 0 : }
323 0 : if( FD_UNLIKELY( url->host_len > 255 ) ) {
324 0 : FD_LOG_CRIT(( "Invalid url->host_len" )); /* unreachable */
325 0 : }
326 0 : fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( ctx->server_fqdn ), url->host, url->host_len ) );
327 0 : ctx->server_fqdn_len = url->host_len;
328 :
329 0 : if( FD_UNLIKELY( tile->bundle.sni_len ) ) {
330 0 : fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( ctx->server_sni ), tile->bundle.sni, tile->bundle.sni_len ) );
331 0 : ctx->server_sni_len = tile->bundle.sni_len;
332 0 : } else {
333 0 : fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( ctx->server_sni ), url->host, url->host_len ) );
334 0 : ctx->server_sni_len = url->host_len;
335 0 : }
336 :
337 0 : ctx->is_ssl = !!is_ssl;
338 : #if !FD_HAS_OPENSSL
339 : if( FD_UNLIKELY( is_ssl ) ) {
340 : FD_LOG_ERR(( "This build does not include OpenSSL. To install OpenSSL, re-run ./deps.sh and do a clean re build." ));
341 : }
342 : #endif
343 0 : }
344 :
345 : #if FD_HAS_OPENSSL
346 :
347 : static void
348 : fd_ossl_keylog_callback( SSL const * ssl,
349 0 : char const * line ) {
350 0 : SSL_CTX * ssl_ctx = SSL_get_SSL_CTX( ssl );
351 0 : fd_bundle_tile_t * ctx = SSL_CTX_get_ex_data( ssl_ctx, 0 );
352 0 : ulong line_len = strlen( line );
353 0 : struct iovec iovs[2] = {
354 0 : { .iov_base=(void *)line, .iov_len=line_len },
355 0 : { .iov_base=(void *)"\n", .iov_len=1UL }
356 0 : };
357 0 : if( FD_UNLIKELY( writev( ctx->keylog_fd, iovs, 2 )!=(long)line_len+1 ) ) {
358 0 : FD_LOG_WARNING(( "write(keylog) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
359 0 : }
360 0 : }
361 :
362 : static void
363 : fd_bundle_tile_init_openssl( fd_bundle_tile_t * ctx,
364 : void * alloc_mem,
365 0 : int tls_cert_verify ) {
366 0 : fd_alloc_t * alloc = fd_alloc_join( fd_alloc_new( alloc_mem, 1UL ), 1UL );
367 0 : if( FD_UNLIKELY( !alloc ) ) {
368 0 : FD_LOG_ERR(( "fd_alloc_new failed" ));
369 0 : }
370 0 : ctx->ssl_alloc = alloc;
371 0 : fd_ossl_tile_init( alloc );
372 :
373 0 : SSL_CTX * ssl_ctx = SSL_CTX_new( TLS_client_method() );
374 0 : if( FD_UNLIKELY( !ssl_ctx ) ) {
375 0 : FD_LOG_ERR(( "SSL_CTX_new failed" ));
376 0 : }
377 :
378 0 : if( FD_UNLIKELY( !SSL_CTX_set_ex_data( ssl_ctx, 0, ctx ) ) ) {
379 0 : FD_LOG_ERR(( "SSL_CTX_set_ex_data failed" ));
380 0 : }
381 :
382 0 : if( FD_UNLIKELY( !SSL_CTX_set_mode( ssl_ctx, SSL_MODE_ENABLE_PARTIAL_WRITE|SSL_MODE_AUTO_RETRY ) ) ) {
383 0 : FD_LOG_ERR(( "SSL_CTX_set_mode failed" ));
384 0 : }
385 :
386 0 : if( FD_UNLIKELY( !SSL_CTX_set_min_proto_version( ssl_ctx, TLS1_3_VERSION ) ) ) {
387 0 : FD_LOG_ERR(( "SSL_CTX_set_min_proto_version(ssl_ctx,TLS1_3_VERSION) failed" ));
388 0 : }
389 :
390 0 : if( FD_UNLIKELY( 0!=SSL_CTX_set_alpn_protos( ssl_ctx, (const unsigned char *)"\x02h2", 3 ) ) ) {
391 0 : FD_LOG_ERR(( "SSL_CTX_set_alpn_protos failed" ));
392 0 : }
393 :
394 0 : if( tls_cert_verify ) {
395 0 : fd_ossl_load_certs( ssl_ctx );
396 0 : }
397 :
398 0 : if( FD_LIKELY( ctx->keylog_fd >= 0 ) ) {
399 0 : SSL_CTX_set_keylog_callback( ssl_ctx, fd_ossl_keylog_callback );
400 0 : }
401 :
402 0 : ctx->ssl_ctx = ssl_ctx;
403 0 : }
404 :
405 : #endif /* FD_HAS_OPENSSL */
406 :
407 : static void
408 : privileged_init( fd_topo_t const * topo,
409 0 : fd_topo_tile_t const * tile ) {
410 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
411 :
412 0 : ulong const pending_max = tile->bundle.out_depth;
413 :
414 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
415 0 : fd_bundle_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_bundle_tile_t), sizeof(fd_bundle_tile_t) );
416 0 : void * grpc_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_grpc_client_align(), fd_grpc_client_footprint( tile->bundle.buf_sz ) );
417 0 : void * alloc_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
418 0 : void * deque_mem = FD_SCRATCH_ALLOC_APPEND( l, pending_txn_align(), pending_txn_footprint( pending_max ) );
419 0 : (void)alloc_mem; /* potentially unused */
420 :
421 0 : ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
422 0 : if( FD_UNLIKELY( scratch_top > (ulong)scratch + scratch_footprint( tile ) ) )
423 0 : FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
424 :
425 0 : memset( ctx, 0, sizeof(fd_bundle_tile_t) );
426 0 : ctx->grpc_client_mem = grpc_mem;
427 0 : ctx->grpc_buf_max = tile->bundle.buf_sz;
428 0 : ctx->tcp_sock = -1;
429 0 : ctx->pending_txns = pending_txn_join( pending_txn_new( deque_mem, pending_max ) );
430 :
431 0 : fd_bundle_auther_init( &ctx->auther );
432 0 : uchar const * public_key = fd_keyload_load( tile->bundle.identity_key_path, 1 /* public key only */ );
433 0 : fd_memcpy( ctx->auther.pubkey, public_key, 32UL );
434 :
435 0 : ctx->keylog_fd = -1;
436 :
437 0 : # if FD_HAS_OPENSSL
438 :
439 0 : if( FD_UNLIKELY( tile->bundle.key_log_path[0] ) ) {
440 0 : ctx->keylog_fd = open( tile->bundle.key_log_path, O_WRONLY|O_APPEND|O_CREAT, 0644 );
441 0 : if( FD_UNLIKELY( ctx->keylog_fd < 0 ) ) {
442 0 : FD_LOG_ERR(( "open(%s) failed (%i-%s)", tile->bundle.key_log_path, errno, fd_io_strerror( errno ) ));
443 0 : }
444 0 : }
445 :
446 : /* OpenSSL goes and tries to read files and allocate memory and
447 : other dumb things on a thread local basis, so we need a special
448 : initializer to do it before seccomp happens in the process. */
449 0 : fd_bundle_tile_init_openssl( ctx, alloc_mem, tile->bundle.tls_cert_verify );
450 :
451 0 : # endif /* FD_HAS_OPENSSL */
452 :
453 : /* Init resolver */
454 0 : if( FD_UNLIKELY( !fd_netdb_open_fds( ctx->netdb_fds ) ) ) {
455 0 : FD_LOG_ERR(( "fd_netdb_open_fds failed" ));
456 0 : }
457 :
458 : /* Random seed for header hashmap */
459 0 : if( FD_UNLIKELY( !fd_rng_secure( &ctx->map_seed, sizeof(ulong) ) ) ) {
460 0 : FD_LOG_CRIT(( "fd_rng_secure failed" ));
461 0 : }
462 :
463 : /* Random seed for timing RNG */
464 0 : uint rng_seed;
465 0 : if( FD_UNLIKELY( !fd_rng_secure( &rng_seed, sizeof(uint) ) ) ) {
466 0 : FD_LOG_CRIT(( "fd_rng_secure failed" ));
467 0 : }
468 0 : if( FD_UNLIKELY( !fd_rng_join( fd_rng_new( &ctx->rng, rng_seed, 0UL ) ) ) ) {
469 0 : FD_LOG_CRIT(( "fd_rng_join failed" )); /* unreachable */
470 0 : }
471 0 : }
472 :
473 : static fd_bundle_out_ctx_t
474 : bundle_out_link( fd_topo_t const * topo,
475 : fd_topo_link_t const * link,
476 0 : ulong out_link_idx ) {
477 0 : fd_bundle_out_ctx_t out = {0};
478 0 : out.idx = out_link_idx;
479 0 : out.mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
480 0 : out.chunk0 = fd_dcache_compact_chunk0( out.mem, link->dcache );
481 0 : out.wmark = fd_dcache_compact_wmark ( out.mem, link->dcache, link->mtu );
482 0 : out.chunk = out.chunk0;
483 0 : return out;
484 0 : }
485 :
486 : static void
487 : unprivileged_init( fd_topo_t const * topo,
488 0 : fd_topo_tile_t const * tile ) {
489 0 : fd_bundle_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id );
490 0 : if( FD_UNLIKELY( tile->kind_id!=0 ) ) {
491 0 : FD_LOG_ERR(( "There can only be one bundle tile" ));
492 0 : }
493 :
494 0 : ulong sign_in_idx = fd_topo_find_tile_in_link( topo, tile, "sign_bundle", tile->kind_id );
495 0 : if( FD_UNLIKELY( sign_in_idx==ULONG_MAX ) ) FD_LOG_ERR(( "Missing sign_bundle link" ));
496 0 : fd_topo_link_t const * sign_in = &topo->links[ tile->in_link_id[ sign_in_idx ] ];
497 :
498 0 : ulong sign_out_idx = fd_topo_find_tile_out_link( topo, tile, "bundle_sign", tile->kind_id );
499 0 : if( FD_UNLIKELY( sign_out_idx==ULONG_MAX ) ) FD_LOG_ERR(( "Missing bundle_sign link" ));
500 0 : fd_topo_link_t const * sign_out = &topo->links[ tile->out_link_id[ sign_out_idx ] ];
501 :
502 0 : if( FD_UNLIKELY( !fd_keyguard_client_join( fd_keyguard_client_new(
503 0 : ctx->keyguard_client,
504 0 : sign_out->mcache,
505 0 : sign_out->dcache,
506 0 : sign_in->mcache,
507 0 : sign_in->dcache,
508 0 : sign_out->mtu
509 0 : ) ) ) ) {
510 0 : FD_LOG_ERR(( "fd_keyguard_client_join failed" )); /* unreachable */
511 0 : }
512 :
513 0 : ctx->keyswitch = fd_keyswitch_join( fd_topo_obj_laddr( topo, tile->id_keyswitch_obj_id ) );
514 0 : FD_TEST( ctx->keyswitch );
515 :
516 0 : ulong verify_out_idx = fd_topo_find_tile_out_link( topo, tile, "bundle_verif", tile->kind_id );
517 0 : if( FD_UNLIKELY( verify_out_idx==ULONG_MAX ) ) FD_LOG_ERR(( "Missing bundle_verif link" ));
518 0 : ctx->verify_out = bundle_out_link( topo, &topo->links[ tile->out_link_id[ verify_out_idx ] ], verify_out_idx );
519 :
520 0 : ulong plugin_out_idx = fd_topo_find_tile_out_link( topo, tile, "bundle_status", tile->kind_id );
521 0 : if( plugin_out_idx!=ULONG_MAX ) {
522 0 : ctx->plugin_out = bundle_out_link( topo, &topo->links[ tile->out_link_id[ plugin_out_idx ] ], plugin_out_idx );
523 0 : } else {
524 0 : ctx->plugin_out = (fd_bundle_out_ctx_t){ .idx=ULONG_MAX };
525 0 : }
526 :
527 : /* Set socket receive buffer size */
528 0 : ulong so_rcvbuf = tile->bundle.buf_sz;
529 0 : if( FD_UNLIKELY( so_rcvbuf < 2048UL ) ) FD_LOG_ERR(( "Invalid [development.bundle.buffer_size_kib]: too small" ));
530 0 : if( FD_UNLIKELY( so_rcvbuf > INT_MAX ) ) FD_LOG_ERR(( "Invalid [development.bundle.buffer_size_kib]: too large" ));
531 0 : ctx->so_rcvbuf = (int)so_rcvbuf;
532 :
533 : /* Set idle ping timer */
534 0 : ctx->keepalive_interval = (long)tile->bundle.keepalive_interval_nanos;
535 :
536 0 : ctx->bundle_status_plugin = 127;
537 0 : ctx->bundle_status_recent = (uchar)FD_BUNDLE_STATE_DISCONNECTED;
538 0 : ctx->last_bundle_status_log_nanos = fd_log_wallclock();
539 :
540 0 : FD_TEST( tile->in_cnt<=sizeof(ctx->in_kind)/sizeof(ctx->in_kind[0]) );
541 0 : int has_replay_in = 0;
542 0 : ulong polled_in_idx = 0UL;
543 0 : for( ulong i=0UL; i<tile->in_cnt; i++ ) {
544 0 : if( FD_UNLIKELY( !tile->in_link_poll[ i ] ) ) continue;
545 :
546 0 : fd_topo_link_t const * link = &topo->links[ tile->in_link_id[ i ] ];
547 0 : if( !strcmp( link->name, "replay_out" ) ) {
548 0 : ctx->in_kind[ polled_in_idx ] = IN_KIND_REPLAY_OUT;
549 0 : fd_topo_wksp_t const * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ];
550 0 : ctx->replay_in.mem = link_wksp->wksp;
551 0 : ctx->replay_in.chunk0 = fd_dcache_compact_chunk0( ctx->replay_in.mem, link->dcache );
552 0 : ctx->replay_in.wmark = fd_dcache_compact_wmark ( ctx->replay_in.mem, link->dcache, link->mtu );
553 0 : has_replay_in = 1;
554 0 : }
555 0 : polled_in_idx++;
556 0 : }
557 :
558 0 : ctx->next_leader_slot = ULONG_MAX;
559 0 : ctx->reset_slot = ULONG_MAX;
560 0 : ctx->sleep_mode = has_replay_in; /* start asleep until we learn leader schedule */
561 0 : ctx->sleep_check_ns = 0;
562 0 : if( !has_replay_in ) memset( &ctx->replay_in, 0, sizeof(ctx->replay_in) );
563 :
564 0 : fd_bundle_tile_parse_endpoint( ctx, tile );
565 :
566 0 : ctx->grpc_client = fd_grpc_client_new( ctx->grpc_client_mem, &fd_bundle_client_grpc_callbacks, ctx->grpc_metrics, ctx, ctx->grpc_buf_max, ctx->map_seed );
567 0 : if( FD_UNLIKELY( !ctx->grpc_client ) ) {
568 0 : FD_LOG_CRIT(( "fd_grpc_client_new failed" )); /* unreachable */
569 0 : }
570 0 : fd_grpc_client_set_version( ctx->grpc_client, fd_version_cstr, strlen( fd_version_cstr ) );
571 0 : fd_grpc_client_set_authority( ctx->grpc_client, ctx->server_sni, ctx->server_sni_len, ctx->server_tcp_port );
572 :
573 0 : fd_histf_new( ctx->metrics.msg_rx_delay,
574 0 : FD_MHIST_MIN( BUNDLE, MESSAGE_RX_DELAY_NANOS ),
575 0 : FD_MHIST_MAX( BUNDLE, MESSAGE_RX_DELAY_NANOS ) );
576 0 : }
577 :
578 : static ulong
579 : populate_allowed_seccomp( fd_topo_t const * topo,
580 : fd_topo_tile_t const * tile,
581 : ulong out_cnt,
582 0 : struct sock_filter * out ) {
583 0 : fd_bundle_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id );
584 :
585 0 : populate_sock_filter_policy_fd_bundle_tile(
586 0 : out_cnt, out,
587 0 : (uint)fd_log_private_logfile_fd(),
588 0 : (uint)ctx->keylog_fd,
589 0 : (uint)ctx->netdb_fds->etc_hosts,
590 0 : (uint)ctx->netdb_fds->etc_resolv_conf
591 0 : );
592 0 : return sock_filter_policy_fd_bundle_tile_instr_cnt;
593 0 : }
594 :
595 : static ulong
596 : populate_allowed_fds( fd_topo_t const * topo,
597 : fd_topo_tile_t const * tile,
598 : ulong out_fds_cnt,
599 0 : int * out_fds ) {
600 0 : fd_bundle_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id );
601 :
602 0 : if( FD_UNLIKELY( out_fds_cnt<5UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
603 :
604 0 : ulong out_cnt = 0UL;
605 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
606 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
607 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
608 0 : if( FD_LIKELY( ctx->netdb_fds->etc_hosts >= 0 ) )
609 0 : out_fds[ out_cnt++ ] = ctx->netdb_fds->etc_hosts;
610 0 : out_fds[ out_cnt++ ] = ctx->netdb_fds->etc_resolv_conf;
611 0 : if( FD_UNLIKELY( ctx->keylog_fd>=0 ) )
612 0 : out_fds[ out_cnt++ ] = ctx->keylog_fd;
613 0 : return out_cnt;
614 0 : }
615 :
616 0 : #define STEM_LAZY ((long)10e6)
617 :
618 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_bundle_tile_t
619 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_bundle_tile_t)
620 :
621 0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING fd_bundle_tile_housekeeping
622 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
623 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
624 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
625 0 : #define STEM_CALLBACK_BEFORE_CREDIT before_credit
626 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
627 :
628 : #include "../stem/fd_stem.c"
629 :
630 : fd_topo_run_tile_t fd_tile_bundle = {
631 : .name = "bundle",
632 : .populate_allowed_seccomp = populate_allowed_seccomp,
633 : .populate_allowed_fds = populate_allowed_fds,
634 : .scratch_align = scratch_align,
635 : .scratch_footprint = scratch_footprint,
636 : .loose_footprint = loose_footprint,
637 : .privileged_init = privileged_init,
638 : .unprivileged_init = unprivileged_init,
639 : .run = stem_run,
640 : .rlimit_file_cnt = 64,
641 : .keep_host_networking = 1,
642 : .allow_connect = 1
643 : };
644 : #endif
|