Line data Source code
1 : /* The txsend tile relays new transactions to the current leader.
2 :
3 : To recap, every ~1.6 seconds, a new validator is elected by the
4 : protocol to receive all transactions, and pack them into blocks.
5 : To submit a transaction to Solana, one has to track the "leader
6 : schedule" and get the timing right to submit a transaction early
7 : enough.
8 :
9 : The txsend tile supports the TPU-UDP (connectionless) and TPU-QUIC
10 : transports. This tile has the following jobs:
11 : - decide who to connect to (continually monitor gossip, leader schedule)
12 : - manage a pool of QUIC connections
13 : - actually send transactions
14 :
15 : An important quirk is that txsend tolerates temporary gossip outages
16 : by indefinitely caching old endpoint info (until the gossip feed is
17 : revived). */
18 :
19 : #include "fd_txsend_tile.h"
20 :
21 : #include "../../disco/topo/fd_topo.h"
22 : #include "../../disco/fd_txn_m.h"
23 : #include "../../disco/metrics/fd_metrics.h"
24 : #include "../../disco/events/generated/fd_event_gen.h"
25 : #include "../../disco/events/fd_event_report.h"
26 : #include "../../choreo/tower/fd_tower_serdes.h"
27 : #include "../../flamenco/runtime/fd_system_ids.h"
28 : #include "../../disco/keyguard/fd_keyguard.h"
29 : #include "../../disco/keyguard/fd_keyload.h"
30 : #include "../fd_startup.h"
31 : #include "../tower/fd_tower_tile.h"
32 : #include "../../util/net/fd_net_headers.h"
33 : #include "../../waltz/quic/fd_quic.h"
34 :
35 : #include <time.h>
36 : #include "generated/fd_txsend_tile_seccomp.h"
37 :
38 0 : #define IN_KIND_SIGN (0UL)
39 0 : #define IN_KIND_GOSSIP (1UL)
40 0 : #define IN_KIND_EPOCH (2UL)
41 0 : #define IN_KIND_TOWER (3UL)
42 0 : #define IN_KIND_NET (4UL)
43 :
44 : fd_quic_limits_t quic_limits = {
45 : .conn_cnt = 128UL,
46 : .handshake_cnt = 128UL,
47 : .conn_id_cnt = FD_QUIC_MIN_CONN_ID_CNT,
48 : .inflight_frame_cnt = 16UL * 128UL,
49 : .min_inflight_frame_cnt_conn = 4UL,
50 : .stream_id_cnt = 64UL,
51 : .tx_buf_sz = FD_TXN_MTU,
52 : .stream_pool_cnt = 128UL,
53 : };
54 :
55 : #define MAP_NAME peer_map
56 0 : #define MAP_KEY pubkey
57 : #define MAP_ELE_T peer_entry_t
58 : #define MAP_KEY_T fd_pubkey_t
59 0 : #define MAP_NEXT map.next
60 : #define MAP_KEY_EQ(k0,k1) fd_pubkey_eq( k0, k1 )
61 : #define MAP_KEY_HASH(key,seed) fd_progcache_rec_key_hash1( (key)->uc, (seed) )
62 : #define MAP_IMPL_STYLE 2
63 : #include "../../util/tmpl/fd_map_chain.c"
64 :
65 : FD_FN_CONST static inline ulong
66 0 : scratch_align( void ) {
67 0 : return fd_ulong_max( 128UL, fd_quic_align() );
68 0 : }
69 :
70 : FD_FN_PURE static inline ulong
71 0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED) {
72 0 : ulong l = FD_LAYOUT_INIT;
73 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_txsend_tile_t), sizeof(fd_txsend_tile_t) );
74 0 : l = FD_LAYOUT_APPEND( l, fd_quic_align(), fd_quic_footprint( &quic_limits ) );
75 0 : l = FD_LAYOUT_APPEND( l, peer_map_align(), peer_map_footprint( 2UL*FD_CONTACT_INFO_TABLE_SIZE ) );
76 0 : return FD_LAYOUT_FINI( l, scratch_align() );
77 0 : }
78 :
79 : static void
80 0 : during_housekeeping( fd_txsend_tile_t * ctx ) {
81 0 : if( FD_UNLIKELY( fd_keyswitch_state_query( ctx->keyswitch )==FD_KEYSWITCH_STATE_UNHALT_PENDING ) ) {
82 0 : FD_LOG_DEBUG(( "keyswitch: unhalting" ));
83 0 : ctx->halt_net_frags = 0;
84 0 : fd_keyswitch_state( ctx->keyswitch, FD_KEYSWITCH_STATE_COMPLETED );
85 0 : }
86 :
87 0 : if( FD_UNLIKELY( fd_keyswitch_state_query( ctx->keyswitch )==FD_KEYSWITCH_STATE_SWITCH_PENDING ) ) {
88 0 : FD_LOG_DEBUG(( "keyswitch: switching identity" ));
89 0 : ulong seq_must_complete = ctx->keyswitch->param;
90 0 : if( FD_UNLIKELY( fd_seq_lt( ctx->tower_in_expect_seq, seq_must_complete ) ) ) {
91 : /* See fd_keyswitch.h, we need to flush any in-flight shreds from
92 : the leader pipeline before switching key. */
93 0 : FD_LOG_WARNING(( "Flushing in-flight unpublished votes from tower, must reach seq %lu, currently at %lu ...", seq_must_complete, ctx->tower_in_expect_seq ));
94 0 : return;
95 0 : }
96 :
97 : /* Halt net frags to avoid potential quic callback in after_frag */
98 0 : ctx->halt_net_frags = 1;
99 :
100 0 : fd_quic_set_identity_public_key( ctx->quic, ctx->keyswitch->bytes );
101 :
102 0 : memcpy( ctx->identity_key, ctx->keyswitch->bytes, 32UL );
103 0 : fd_keyswitch_state( ctx->keyswitch, FD_KEYSWITCH_STATE_COMPLETED );
104 0 : }
105 0 : }
106 :
107 : static void
108 0 : metrics_write( fd_txsend_tile_t * ctx ) {
109 0 : FD_MCNT_SET( TXSEND, PKT_RX_BYTES, ctx->quic->metrics.net_rx_byte_cnt );
110 0 : FD_MCNT_ENUM_COPY( TXSEND, FRAME_RX, ctx->quic->metrics.frame_rx_cnt );
111 0 : FD_MCNT_SET( TXSEND, PKT_RX, ctx->quic->metrics.net_rx_pkt_cnt );
112 0 : FD_MCNT_SET( TXSEND, STREAM_RX_BYTES, ctx->quic->metrics.stream_rx_byte_cnt );
113 0 : FD_MCNT_SET( TXSEND, STREAM_RX, ctx->quic->metrics.stream_rx_event_cnt );
114 :
115 0 : FD_MCNT_SET( TXSEND, PKT_TX, ctx->quic->metrics.net_tx_pkt_cnt );
116 0 : FD_MCNT_SET( TXSEND, PKT_TX_BYTES, ctx->quic->metrics.net_tx_byte_cnt );
117 0 : FD_MCNT_SET( TXSEND, PKT_TX_RETRY, ctx->quic->metrics.retry_tx_cnt );
118 0 : FD_MCNT_ENUM_COPY( TXSEND, ACK_TX, ctx->quic->metrics.ack_tx );
119 :
120 0 : FD_MGAUGE_ENUM_COPY( TXSEND, CONN_STATE, ctx->quic->metrics.conn_state_cnt );
121 0 : FD_MGAUGE_SET( TXSEND, CONN_IN_USE, ctx->quic->metrics.conn_alloc_cnt );
122 0 : FD_MCNT_SET( TXSEND, CONN_CREATED, ctx->quic->metrics.conn_created_cnt );
123 0 : FD_MCNT_SET( TXSEND, CONN_CLOSED, ctx->quic->metrics.conn_closed_cnt );
124 0 : FD_MCNT_SET( TXSEND, CONN_ABORTED, ctx->quic->metrics.conn_aborted_cnt );
125 0 : FD_MCNT_SET( TXSEND, CONN_TIMED_OUT, ctx->quic->metrics.conn_timeout_cnt );
126 0 : FD_MCNT_SET( TXSEND, CONN_RETRIED, ctx->quic->metrics.conn_retry_cnt );
127 0 : FD_MCNT_SET( TXSEND, CONN_ERROR_NO_SLOTS, ctx->quic->metrics.conn_err_no_slots_cnt );
128 0 : FD_MCNT_SET( TXSEND, CONN_ERROR_RETRY_FAILED, ctx->quic->metrics.conn_err_retry_fail_cnt );
129 :
130 0 : FD_MCNT_ENUM_COPY( TXSEND, PKT_CRYPTO_FAILED, ctx->quic->metrics.pkt_decrypt_fail_cnt );
131 0 : FD_MCNT_ENUM_COPY( TXSEND, PKT_NO_KEY, ctx->quic->metrics.pkt_no_key_cnt );
132 0 : FD_MCNT_ENUM_COPY( TXSEND, PKT_NO_CONN, ctx->quic->metrics.pkt_no_conn_cnt );
133 0 : FD_MCNT_SET( TXSEND, PKT_SRC_INVALID, ctx->quic->metrics.pkt_wrong_src_cnt );
134 0 : FD_MCNT_ENUM_COPY( TXSEND, FRAME_META_ACQUIRED, ctx->quic->metrics.frame_tx_alloc_cnt );
135 0 : FD_MCNT_SET( TXSEND, PKT_NET_HEADER_INVALID, ctx->quic->metrics.pkt_net_hdr_err_cnt );
136 0 : FD_MCNT_SET( TXSEND, PKT_HEADER_INVALID, ctx->quic->metrics.pkt_quic_hdr_err_cnt );
137 0 : FD_MCNT_SET( TXSEND, PKT_UNDERSIZE, ctx->quic->metrics.pkt_undersz_cnt );
138 0 : FD_MCNT_SET( TXSEND, PKT_OVERSIZE, ctx->quic->metrics.pkt_oversz_cnt );
139 0 : FD_MCNT_SET( TXSEND, PKT_RX_VERSION_NEGOTIATION, ctx->quic->metrics.pkt_verneg_cnt );
140 0 : FD_MCNT_ENUM_COPY( TXSEND, PKT_TX_RETRANSMITTED, ctx->quic->metrics.pkt_retransmissions_cnt );
141 :
142 0 : FD_MCNT_SET( TXSEND, HANDSHAKE_CREATED, ctx->quic->metrics.hs_created_cnt );
143 0 : FD_MCNT_SET( TXSEND, HANDSHAKE_ERROR_ALLOC_FAIL, ctx->quic->metrics.hs_err_alloc_fail_cnt );
144 0 : FD_MCNT_SET( TXSEND, HANDSHAKE_EVICTED, ctx->quic->metrics.hs_evicted_cnt );
145 :
146 0 : FD_MCNT_SET( TXSEND, FRAME_PARSE_FAILED, ctx->quic->metrics.frame_rx_err_cnt );
147 :
148 0 : FD_MHIST_COPY( TXSEND, SERVICE_DURATION_SECONDS, ctx->quic->metrics.service_duration );
149 0 : FD_MHIST_COPY( TXSEND, RX_DURATION_SECONDS, ctx->quic->metrics.receive_duration );
150 0 : }
151 :
152 : static void
153 : quic_tls_cv_sign( void * signer_ctx,
154 : uchar signature[ static 64 ],
155 0 : uchar const payload[ static 130 ] ) {
156 0 : fd_txsend_tile_t * ctx = signer_ctx;
157 :
158 0 : fd_keyguard_client_sign( ctx->keyguard_client, signature, payload, 130UL, FD_KEYGUARD_SIGN_TYPE_ED25519 );
159 0 : }
160 :
161 : static void
162 : send_to_net( fd_txsend_tile_t * ctx,
163 : fd_ip4_hdr_t const * ip4_hdr,
164 : fd_udp_hdr_t const * udp_hdr,
165 : uchar const * payload,
166 : ulong payload_sz,
167 0 : long now ) {
168 0 : uint const ip_dst = FD_LOAD( uint, ip4_hdr->daddr_c );
169 0 : ulong const ip_sz = FD_IP4_GET_LEN( *ip4_hdr );
170 :
171 0 : fd_txsend_out_t * net_out_link = ctx->net_out;
172 0 : uchar * packet_l2 = fd_chunk_to_laddr( net_out_link->mem, net_out_link->chunk );
173 0 : uchar * packet_l3 = packet_l2 + sizeof(fd_eth_hdr_t);
174 0 : uchar * packet_l4 = packet_l3 + ip_sz;
175 0 : uchar * packet_l5 = packet_l4 + sizeof(fd_udp_hdr_t);
176 :
177 0 : fd_memcpy( packet_l2, ctx->packet_hdr->eth, sizeof(fd_eth_hdr_t) );
178 0 : fd_memcpy( packet_l3, ip4_hdr, ip_sz );
179 0 : fd_memcpy( packet_l4, udp_hdr, sizeof(fd_udp_hdr_t) );
180 0 : fd_memcpy( packet_l5, payload, payload_sz );
181 :
182 0 : ulong sig = fd_disco_netmux_sig( ip_dst, 0U, ip_dst, DST_PROTO_OUTGOING, FD_NETMUX_SIG_MIN_HDR_SZ );
183 0 : ulong sz_l2 = sizeof(fd_eth_hdr_t) + ip_sz + sizeof(fd_udp_hdr_t) + payload_sz;
184 :
185 0 : ulong tspub = (ulong)fd_frag_meta_ts_comp( now );
186 0 : fd_stem_publish( ctx->stem, net_out_link->idx, sig, net_out_link->chunk, sz_l2, 0UL, 0, tspub );
187 0 : net_out_link->chunk = fd_dcache_compact_next( net_out_link->chunk, sz_l2, net_out_link->chunk0, net_out_link->wmark );
188 0 : }
189 :
190 : static int
191 : quic_tx_aio_send( void * _ctx,
192 : fd_aio_pkt_info_t const * batch,
193 : ulong batch_cnt,
194 : ulong * opt_batch_idx,
195 0 : int flush FD_PARAM_UNUSED ) {
196 0 : fd_txsend_tile_t * ctx = _ctx;
197 :
198 0 : long now = fd_log_wallclock();
199 :
200 0 : for( ulong i=0; i<batch_cnt; i++ ) {
201 0 : if( FD_UNLIKELY( batch[ i ].buf_sz<FD_NETMUX_SIG_MIN_HDR_SZ ) ) continue;
202 0 : uchar * buf = batch[ i ].buf;
203 0 : fd_ip4_hdr_t * ip4_hdr = fd_type_pun( buf );
204 0 : ulong const ip4_len = FD_IP4_GET_LEN( *ip4_hdr );
205 0 : fd_udp_hdr_t * udp_hdr = fd_type_pun( buf + ip4_len );
206 0 : uchar * payload = buf + ip4_len + sizeof(fd_udp_hdr_t);
207 0 : FD_TEST( batch[ i ].buf_sz >= ip4_len + sizeof(fd_udp_hdr_t) );
208 0 : ulong payload_sz = batch[ i ].buf_sz - ip4_len - sizeof(fd_udp_hdr_t);
209 0 : send_to_net( ctx, ip4_hdr, udp_hdr, payload, payload_sz, now );
210 0 : }
211 :
212 0 : if( FD_LIKELY( opt_batch_idx ) ) {
213 0 : *opt_batch_idx = batch_cnt;
214 0 : }
215 :
216 0 : return FD_AIO_SUCCESS;
217 0 : }
218 :
219 : /* quic_conn_deregister removes references to a quic_conn object from
220 : txsend state. */
221 :
222 : static void
223 : quic_conn_deregister( fd_txsend_tile_t * tile,
224 0 : fd_quic_conn_t * conn ) {
225 0 : for( ulong i=0UL; i<tile->conns_len; i++ ) {
226 0 : if( FD_LIKELY( tile->conns[ i ].conn!=conn ) ) continue;
227 0 : peer_entry_t * peer = peer_map_ele_query( tile->peer_map, &tile->conns[ i ].pubkey, NULL, tile->peers );
228 0 : if( FD_LIKELY( peer ) ) {
229 0 : for( ulong j=0UL; j<2UL; j++ ) {
230 0 : if( peer->quic_conns[ j ].quic_conn==conn ) {
231 0 : peer->quic_conns[ j ].quic_conn = NULL;
232 0 : }
233 0 : }
234 0 : }
235 0 : if( FD_UNLIKELY( i!=tile->conns_len-1UL ) ) tile->conns[ i ] = tile->conns[ tile->conns_len-1UL ];
236 0 : tile->conns_len--;
237 0 : return;
238 0 : }
239 0 : }
240 :
241 : /* quic_conn_final is invoked by fd_quic just before a conn object is
242 : deallocated. Here, we must remove all references to this conn. */
243 :
244 : static void
245 : quic_conn_final( fd_quic_conn_t * conn,
246 0 : void * ctx ) {
247 0 : fd_txsend_tile_t * tile = ctx;
248 0 : quic_conn_deregister( tile, conn );
249 0 : }
250 :
251 : /* quic_conn_close instructs fd_quic to deallocate the given conn and
252 : say goodbye (CONNECTION_CLOSE) to the peer. */
253 :
254 : static void
255 : quic_conn_close( fd_txsend_tile_t * tile,
256 : fd_quic_conn_t * conn,
257 0 : uint reason ) {
258 0 : if( FD_UNLIKELY( !conn ) ) return;
259 : /* Defer a conn close operation */
260 0 : fd_quic_conn_close( conn, reason );
261 0 : quic_conn_deregister( tile, conn );
262 : /* Send out a packet and invoke quic_conn_final */
263 0 : fd_quic_service( tile->quic, fd_log_wallclock() );
264 0 : }
265 :
266 : /* This QUIC servicing is very precarious. Recall a few facts,
267 :
268 : 1) QUIC needs to be serviced periodically to make progress
269 : 2) QUIC servicing may produce outgoing packets that need to be sent
270 : to the network
271 : 3) Elsewhere, the the tile publishes frags to the verify tile to
272 : send our own votes into our leader pipeline
273 :
274 : You could service QUIC in before_credit, as the QUIC tile does, but
275 : this has a problem. If you publish frags in before_credit, you might
276 : overrun the downstream consumer. For net tile, this is OK because it
277 : expects that (as does verify). But the credit counting mechanism
278 : doesn't expect this behavior and will underflow. (That's also not
279 : ideal, in case some plugin wanted to listen reliably on quic->verify
280 : they could not, if it got underflowed). Here though, we want to
281 : avoid dropping outgoing votes to verify, since they might be needed
282 : for liveness of a small cluster.
283 :
284 : We thus take the trade of servicing QUIC in after_credit, which means
285 : it could theoretically get backpressured by verify, however this
286 : isn't realistic in practice, as verify polls round robin and there's
287 : only one vote per slot. */
288 :
289 : static inline void
290 : after_credit( fd_txsend_tile_t * ctx,
291 : fd_stem_context_t * stem,
292 : int * opt_poll_in,
293 0 : int * charge_busy ) {
294 0 : ctx->stem = stem;
295 :
296 0 : *charge_busy = fd_quic_service( ctx->quic, fd_log_wallclock() );
297 0 : *opt_poll_in = !*charge_busy; /* refetch credits to prevent above documented situation */
298 :
299 0 : if( FD_UNLIKELY( ctx->leader_schedules<2UL ) ) return;
300 0 : if( FD_UNLIKELY( ctx->voted_slot==ULONG_MAX ) ) return;
301 :
302 0 : fd_pubkey_t const * leaders[ 7UL ];
303 :
304 0 : for( ulong i=0UL; i<7UL; i++ ) {
305 : /* It's possible for leaders[i] to be NULL if target slot is two
306 : epochs ahead of the replay root. This is not possible on mainnet
307 : but can occur on local clusters during warmup epochs. */
308 0 : ulong target_slot = ctx->voted_slot+1UL + i*FD_EPOCH_SLOTS_PER_ROTATION;
309 0 : leaders[ i ] = fd_multi_epoch_leaders_get_leader_for_slot( ctx->mleaders, target_slot );
310 0 : }
311 :
312 : /* Disconnect any QUIC connection to a leader that does not have a
313 : rotation coming up in the next 7 slots. */
314 0 : ulong conn_cnt = ctx->conns_len;
315 0 : for( ulong i=0UL; i<conn_cnt; ) {
316 0 : int keep_conn = 0;
317 0 : for( ulong j=0UL; j<7UL; j++ ) {
318 0 : if( leaders[j] && fd_pubkey_eq( &ctx->conns[ i ].pubkey, leaders[ j ] ) ) {
319 0 : keep_conn = 1;
320 0 : break;
321 0 : }
322 0 : }
323 :
324 0 : if( FD_UNLIKELY( !keep_conn ) ) quic_conn_close( ctx, ctx->conns[ i ].conn, 0 );
325 0 : if( ctx->conns_len==conn_cnt ) i++;
326 0 : conn_cnt = ctx->conns_len;
327 0 : }
328 :
329 : /* Connect to any leader that does not have a connection yet. */
330 0 : for( ulong i=0UL; i<7UL; i++ ) {
331 0 : fd_pubkey_t const * leader = leaders[ i ];
332 0 : if( FD_UNLIKELY( !leader ) ) continue;
333 0 : peer_entry_t * peer = peer_map_ele_query( ctx->peer_map, leader, NULL, ctx->peers );
334 0 : if( FD_UNLIKELY( !peer ) ) continue; /* no contact info */
335 :
336 0 : for( ulong j=0UL; j<2UL; j++ ) {
337 0 : if( FD_UNLIKELY( ctx->conns_len==128UL ) ) break; /* connection limit reached */
338 0 : txsend_conn_t * conn = &peer->quic_conns[ j ];
339 0 : if( FD_LIKELY( conn->quic_conn ) ) continue; /* already connected */
340 0 : if( FD_UNLIKELY( !conn->quic_ip_addr || !conn->quic_port ) ) continue;
341 :
342 : /* Don't try to reconnect more than once every two seconds ...
343 : Basically Agave limits us to 8 connections per minute, so if we
344 : keep trying to reconnect rapidly it's much less effective than
345 : waiting a little bit to ensure we stay under the threshold.
346 :
347 : We should probably make this a bit more sophisticated, with a
348 : simple model that considers past connection attempts, and
349 : future leader slots (e.g. we might still want to burn an
350 : attempt if a leader slot is imminent, even if we recently tried
351 : to connect). For now the dumb logic seems to work well enough. */
352 0 : long now = fd_log_wallclock();
353 0 : if( FD_UNLIKELY( conn->quic_last_connected+2e9L>now ) ) continue;
354 :
355 0 : fd_quic_conn_t * quic_conn =
356 0 : fd_quic_connect( ctx->quic,
357 0 : conn->quic_ip_addr,
358 0 : conn->quic_port,
359 0 : ctx->src_ip_addr,
360 0 : ctx->src_port,
361 0 : now );
362 0 : if( FD_UNLIKELY( !quic_conn ) ) {
363 : /* Should never happen, but handle it gracefully */
364 0 : return;
365 0 : }
366 0 : ctx->conns[ ctx->conns_len ].conn = quic_conn;
367 0 : ctx->conns[ ctx->conns_len ].pubkey = *leader;
368 0 : conn->quic_conn = quic_conn;
369 0 : conn->quic_last_connected = now;
370 0 : ctx->conns_len++;
371 0 : }
372 0 : }
373 0 : }
374 :
375 : void
376 : send_vote_to_leader( fd_txsend_tile_t * ctx,
377 : fd_pubkey_t const * leader_pubkey,
378 : uchar const * vote_payload,
379 0 : ulong vote_payload_sz ) {
380 0 : peer_entry_t const * peer = peer_map_ele_query_const( ctx->peer_map, leader_pubkey, NULL, ctx->peers );
381 0 : if( FD_UNLIKELY( !peer ) ) return; /* no known contact info */
382 :
383 0 : for( ulong i=0UL; i<2UL; i++ ) {
384 0 : if( FD_UNLIKELY( !peer->udp_ip_addrs[ i ] | !peer->udp_ports[ i ] ) ) continue;
385 :
386 0 : fd_ip4_hdr_t * ip4_hdr = ctx->packet_hdr->ip4;
387 0 : fd_udp_hdr_t * udp_hdr = ctx->packet_hdr->udp;
388 :
389 0 : ip4_hdr->daddr = peer->udp_ip_addrs[ i ];
390 0 : ip4_hdr->net_tot_len = fd_ushort_bswap( (ushort)(vote_payload_sz+sizeof(fd_ip4_hdr_t)+sizeof(fd_udp_hdr_t)) );
391 0 : ip4_hdr->net_id = fd_ushort_bswap( ctx->net_id++ );
392 0 : ip4_hdr->check = 0;
393 0 : ip4_hdr->check = fd_ip4_hdr_check_fast( ip4_hdr );
394 :
395 0 : udp_hdr->net_dport = fd_ushort_bswap( peer->udp_ports[ i ] );
396 0 : udp_hdr->net_len = fd_ushort_bswap( (ushort)( vote_payload_sz+sizeof(fd_udp_hdr_t) ) );
397 0 : send_to_net( ctx, ip4_hdr, udp_hdr, vote_payload, vote_payload_sz, fd_log_wallclock() );
398 0 : }
399 :
400 0 : for( ulong i=0UL; i<2UL; i++ ) {
401 0 : fd_quic_conn_t * conn = peer->quic_conns[ i ].quic_conn;
402 0 : if( FD_UNLIKELY( !conn ) ) continue;
403 :
404 0 : fd_quic_stream_t * stream = fd_quic_conn_new_stream( conn );
405 0 : if( FD_UNLIKELY( !stream ) ) continue;
406 :
407 0 : fd_quic_stream_send( stream, vote_payload, vote_payload_sz, 1 );
408 0 : }
409 0 : }
410 :
411 : /* gossip -> txsend peer synchronization
412 :
413 : The gossip update stream can be replayed to perfectly replicate the
414 : ContactInfo table. The stream is laid out so there are no duplicate
415 : pubkeys.
416 :
417 : However, the txsend tile wants to retain pubkeys past deletion.
418 : Gossip evicts ContactInfos without updates quickly, but txsend should
419 : continue sending to staked leaders even if there is a temporary
420 : gossip outage.
421 :
422 : Therefore, the txsend tile only tombstones entries when the gossip
423 : stream instructs to remove them, instead of deleting. The tombstone
424 : handling is then resolved downstream during ContactInfo updates. */
425 :
426 : static inline void
427 : handle_contact_info_remove( fd_txsend_tile_t * ctx,
428 0 : fd_gossip_update_message_t const * msg ) {
429 0 : FD_TEST( msg->contact_info_remove->idx < FD_CONTACT_INFO_TABLE_SIZE );
430 0 : peer_entry_t * entry = &ctx->peers[ msg->contact_info_remove->idx ];
431 0 : entry->tombstoned = 1;
432 0 : }
433 :
434 : static void
435 : handle_contact_info_update( fd_txsend_tile_t * ctx,
436 0 : fd_gossip_update_message_t const * msg ) {
437 0 : FD_TEST( msg->contact_info->idx < FD_CONTACT_INFO_TABLE_SIZE );
438 :
439 : /* Key updated by the gossip event */
440 0 : fd_pubkey_t key = FD_LOAD( fd_pubkey_t, msg->origin );
441 :
442 : /* Storage entry updated by the gossip event */
443 0 : peer_entry_t * entry = &ctx->peers[ msg->contact_info->idx ];
444 :
445 : /* At this point, entry contains an arbitrary old tombstoned entry or
446 : a previous version of this key. */
447 :
448 0 : if( FD_UNLIKELY( !fd_pubkey_eq( &entry->pubkey, &key ) ) ) {
449 : /* Overwriting an unrelated (tombstoned) entry, free it */
450 0 : quic_conn_close( ctx, entry->quic_conns[ 0 ].quic_conn, 0 );
451 0 : quic_conn_close( ctx, entry->quic_conns[ 1 ].quic_conn, 0 );
452 0 : peer_map_ele_remove( ctx->peer_map, &entry->pubkey, NULL, ctx->peers );
453 0 : memset( entry, 0, sizeof(peer_entry_t) );
454 0 : }
455 :
456 : /* At this point, entry contains a stale version of the same key or is
457 : empty. */
458 :
459 0 : peer_entry_t * stale = peer_map_ele_query( ctx->peer_map, &key, NULL, ctx->peers );
460 0 : if( FD_UNLIKELY( stale && stale!=entry ) ) {
461 : /* The key exists at another entry location, drop that and migrate
462 : it to this slot. */
463 0 : for( ulong i=0UL; i<2UL; i++ ) {
464 0 : entry->quic_conns [ i ] = stale->quic_conns [ i ];
465 0 : entry->udp_ip_addrs[ i ] = stale->udp_ip_addrs[ i ];
466 0 : entry->udp_ports [ i ] = stale->udp_ports [ i ];
467 0 : }
468 0 : peer_map_ele_remove( ctx->peer_map, &stale->pubkey, NULL, ctx->peers );
469 0 : memset( stale, 0, sizeof(peer_entry_t) );
470 0 : fd_memcpy( entry->pubkey.uc, msg->origin, 32UL );
471 0 : FD_TEST( peer_map_ele_insert( ctx->peer_map, entry, ctx->peers ) );
472 0 : } else if( !stale ) {
473 0 : fd_memcpy( entry->pubkey.uc, msg->origin, 32UL );
474 0 : FD_TEST( peer_map_ele_insert( ctx->peer_map, entry, ctx->peers ) );
475 0 : }
476 :
477 0 : entry->tombstoned = 0;
478 :
479 0 : static ulong const quic_socket_idx[ 2UL ] = {
480 0 : FD_GOSSIP_CONTACT_INFO_SOCKET_TPU_VOTE_QUIC,
481 0 : FD_GOSSIP_CONTACT_INFO_SOCKET_TPU_QUIC,
482 0 : };
483 :
484 0 : static ulong const udp_socket_idx[ 2UL ] = {
485 0 : FD_GOSSIP_CONTACT_INFO_SOCKET_TPU_VOTE,
486 0 : FD_GOSSIP_CONTACT_INFO_SOCKET_TPU,
487 0 : };
488 :
489 : /* At this point, entry is not in a map and *entry might still contain
490 : stale endpoint info (from a previous update). Only overwrite it if
491 : update actually contains endpoint info. */
492 :
493 0 : for( ulong i=0UL; i<2UL; i++ ) {
494 0 : if( FD_LIKELY( !msg->contact_info->value->sockets[ quic_socket_idx[ i ] ].is_ipv6 && msg->contact_info->value->sockets[ quic_socket_idx[ i ] ].ip4 ) ) {
495 0 : entry->quic_conns[ i ].quic_ip_addr = msg->contact_info->value->sockets[ quic_socket_idx[ i ] ].ip4;
496 0 : }
497 0 : ushort port = fd_ushort_bswap( msg->contact_info->value->sockets[ quic_socket_idx[ i ] ].port );
498 0 : if( FD_LIKELY( port ) ) {
499 0 : entry->quic_conns[ i ].quic_port = port;
500 0 : }
501 0 : }
502 :
503 0 : for( ulong i=0UL; i<2UL; i++ ) {
504 0 : if( FD_LIKELY( !msg->contact_info->value->sockets[ udp_socket_idx[ i ] ].is_ipv6 && msg->contact_info->value->sockets[ udp_socket_idx[ i ] ].ip4 ) ) {
505 0 : entry->udp_ip_addrs[ i ] = msg->contact_info->value->sockets[ udp_socket_idx[ i ] ].ip4;
506 0 : }
507 0 : if( FD_LIKELY( fd_ushort_bswap( msg->contact_info->value->sockets[ udp_socket_idx[ i ] ].port ) ) ) {
508 0 : entry->udp_ports [ i ] = fd_ushort_bswap( msg->contact_info->value->sockets[ udp_socket_idx[ i ] ].port );
509 0 : }
510 0 : }
511 0 : }
512 :
513 : static void
514 : report_signed_vote( uchar const * payload,
515 : fd_txn_t const * txn,
516 : uchar const * signatures,
517 0 : ulong vote_txn_sz ) {
518 0 : if( FD_LIKELY( !fd_event_tl ) ) return;
519 :
520 0 : if( FD_UNLIKELY( txn->instr_cnt!=1UL ) ) return;
521 0 : if( FD_UNLIKELY( txn->acct_addr_cnt<3UL || txn->acct_addr_cnt>4UL ) ) return;
522 0 : fd_txn_instr_t const * instr = &txn->instr[ 0 ];
523 0 : fd_acct_addr_t const * addrs = fd_txn_get_acct_addrs( txn, payload );
524 0 : if( FD_UNLIKELY( 0!=memcmp( addrs[ instr->program_id ].b, fd_solana_vote_program_id.uc, sizeof(fd_pubkey_t) ) ) ) return;
525 0 : if( FD_UNLIKELY( instr->acct_cnt!=2UL ) ) return;
526 0 : uchar const * instr_addrs = fd_txn_get_instr_accts( instr, payload );
527 0 : uchar const * instr_data = payload + instr->data_off;
528 0 : ulong instr_data_sz = instr->data_sz;
529 0 : if( FD_UNLIKELY( instr_data_sz<sizeof(uint) ) ) return;
530 0 : if( FD_UNLIKELY( FD_LOAD( uint, instr_data )!=FD_VOTE_IX_KIND_TOWER_SYNC ) ) return;
531 0 : instr_data += 4; instr_data_sz -= 4;
532 :
533 0 : fd_compact_tower_sync_serde_t sync[1];
534 0 : if( FD_UNLIKELY( 0!=fd_compact_tower_sync_de( sync, instr_data, instr_data_sz ) ) ) return;
535 0 : if( FD_UNLIKELY( sync->lockouts_cnt==0 || sync->lockouts_cnt>31 ) ) return;
536 :
537 0 : uchar const * rbh = fd_txn_get_recent_blockhash( txn, payload );
538 :
539 0 : fd_event_signed_vote_t ev = {0};
540 0 : FD_TEST( vote_txn_sz<=sizeof(ev.signed_txn) );
541 0 : fd_memcpy( ev.signed_txn, payload, vote_txn_sz );
542 0 : ev.signed_txn_len = vote_txn_sz;
543 0 : fd_memcpy( ev.vote_account, addrs[ instr_addrs[ 0 ] ].b, sizeof(fd_pubkey_t) );
544 0 : fd_memcpy( ev.vote_authority, addrs[ instr_addrs[ 1 ] ].b, sizeof(fd_pubkey_t) );
545 0 : fd_memcpy( ev.fee_payer, addrs[ 0 ].b, sizeof(fd_pubkey_t) );
546 0 : fd_memcpy( ev.signature, signatures, sizeof(fd_ed25519_sig_t) );
547 0 : fd_memcpy( ev.vote_bank_hash, sync->hash.uc, sizeof(fd_hash_t) );
548 0 : fd_memcpy( ev.vote_block_id, sync->block_id.uc, sizeof(fd_hash_t) );
549 0 : fd_memcpy( ev.txn_blockhash, rbh, sizeof(fd_hash_t) );
550 :
551 0 : ulong root_slot = fd_ulong_if( sync->root==ULONG_MAX, 0UL, sync->root );
552 0 : ulong slot = root_slot;
553 0 : for( ulong i=0UL; i<sync->lockouts_cnt; i++ ) {
554 0 : slot += sync->lockouts[ i ].offset;
555 0 : ev.tower[ i ].slot = slot;
556 0 : ev.tower[ i ].confirmation_count = (uchar)sync->lockouts[ i ].confirmation_count;
557 0 : }
558 0 : ev.tower_cnt = sync->lockouts_cnt;
559 0 : ev.vote_slot = slot; /* top of tower */
560 :
561 0 : fd_event_report_signed_vote( &ev );
562 0 : }
563 :
564 : static void
565 : handle_vote_msg( fd_txsend_tile_t * ctx,
566 : fd_stem_context_t * stem,
567 : fd_tower_slot_done_t const * slot_done,
568 0 : ulong tsorig_comp ) {
569 0 : if( FD_UNLIKELY( slot_done->vote_slot==ULONG_MAX ) ) return;
570 0 : if( FD_UNLIKELY( !slot_done->has_vote_txn ) ) return;
571 :
572 0 : ctx->voted_slot = slot_done->vote_slot;
573 :
574 0 : fd_txn_m_t * txnm = fd_chunk_to_laddr( ctx->txsend_out->mem, ctx->txsend_out->chunk );
575 0 : FD_TEST( slot_done->vote_txn_sz<=FD_TXN_MTU );
576 0 : txnm->payload_sz = (ushort)slot_done->vote_txn_sz;
577 0 : txnm->source_ipv4 = ctx->src_ip_addr;
578 0 : txnm->source_tpu = FD_TXN_M_TPU_SOURCE_TXSEND;
579 0 : txnm->block_engine.bundle_id = 0UL;
580 0 : fd_memcpy( fd_txn_m_payload( txnm ), slot_done->vote_txn, slot_done->vote_txn_sz );
581 :
582 0 : txnm->txn_t_sz = (ushort)fd_txn_parse( slot_done->vote_txn, slot_done->vote_txn_sz, fd_txn_m_txn_t( txnm ), NULL );
583 0 : FD_TEST( txnm->txn_t_sz );
584 :
585 0 : uchar * payload = fd_txn_m_payload( txnm );
586 0 : fd_txn_t const * txn = fd_txn_m_txn_t_const( txnm );
587 :
588 0 : uchar * signatures = payload + txn->signature_off;
589 0 : uchar const * message = payload + txn->message_off;
590 0 : ulong message_sz = slot_done->vote_txn_sz - txn->message_off;
591 0 : fd_keyguard_client_vote_txn_sign( ctx->keyguard_client, signatures, slot_done->authority_idx, message, message_sz );
592 :
593 0 : FD_BASE58_ENCODE_64_BYTES( signatures, vote_sig_b58 );
594 0 : FD_LOG_INFO(( "vote txn for slot %lu created: %s", slot_done->vote_slot, vote_sig_b58 ));
595 :
596 0 : report_signed_vote( payload, txn, signatures, slot_done->vote_txn_sz );
597 :
598 0 : for( ulong i=0UL; i<3UL; i++ ) {
599 0 : ulong target_slot = slot_done->vote_slot+1UL + i*FD_EPOCH_SLOTS_PER_ROTATION;
600 0 : fd_pubkey_t const * leader = fd_multi_epoch_leaders_get_leader_for_slot( ctx->mleaders, target_slot );
601 0 : if( FD_UNLIKELY( !leader ) ) {
602 0 : FD_LOG_WARNING(( "no leader found for slot %lu", target_slot ));
603 0 : continue;
604 0 : }
605 0 : send_vote_to_leader( ctx, leader, payload, slot_done->vote_txn_sz );
606 0 : }
607 :
608 0 : ulong msg_sz = fd_txn_m_realized_footprint( txnm, 0, 0 );
609 0 : ulong tspub_comp = fd_frag_meta_ts_comp( fd_tickcount() );
610 0 : fd_stem_publish( stem, ctx->txsend_out->idx, 1UL, ctx->txsend_out->chunk, msg_sz, 0UL, tsorig_comp, tspub_comp );
611 0 : ctx->txsend_out->chunk = fd_dcache_compact_next( ctx->txsend_out->chunk, msg_sz, ctx->txsend_out->chunk0, ctx->txsend_out->wmark );
612 0 : }
613 :
614 :
615 : static inline int
616 : before_frag( fd_txsend_tile_t * ctx,
617 : ulong in_idx,
618 : ulong seq,
619 0 : ulong sig ) {
620 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_TOWER ) ) ctx->tower_in_expect_seq = seq+1UL;
621 0 : if( FD_UNLIKELY( ctx->halt_net_frags && ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) return -1;
622 :
623 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP ) ) {
624 0 : return sig!=FD_GOSSIP_UPDATE_TAG_CONTACT_INFO && sig!=FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE;
625 0 : } else if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_TOWER ) ) {
626 0 : return sig!=FD_TOWER_SIG_SLOT_DONE;
627 0 : }
628 :
629 0 : return 0;
630 0 : }
631 :
632 : static void
633 : during_frag( fd_txsend_tile_t * ctx,
634 : ulong in_idx,
635 : ulong seq,
636 : ulong sig,
637 : ulong chunk,
638 : ulong sz,
639 0 : ulong ctl ) {
640 0 : (void)seq; (void)sig;
641 :
642 0 : ctx->chunk = chunk;
643 :
644 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_EPOCH ) ) {
645 0 : if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark ) )
646 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu,%lu]", chunk, sz, ctx->in[in_idx].chunk0, ctx->in[in_idx].wmark, ctx->in[ in_idx ].mtu ));
647 :
648 0 : fd_epoch_info_msg_t const * msg = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
649 0 : FD_TEST( msg->staked_vote_cnt<=MAX_COMPRESSED_STAKE_WEIGHTS ); /* implicit sz verification since sz field on frag_meta too small */
650 0 : FD_TEST( msg->staked_id_cnt<=MAX_SHRED_DESTS );
651 0 : } else {
652 0 : if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz>ctx->in[ in_idx ].mtu ) )
653 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu,%lu]", chunk, sz, ctx->in[in_idx].chunk0, ctx->in[in_idx].wmark, ctx->in[ in_idx ].mtu ));
654 0 : }
655 :
656 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
657 0 : void const * src = fd_net_rx_translate_frag( &ctx->net_in_bounds[ in_idx ], chunk, ctl, sz );
658 0 : fd_memcpy( ctx->quic_buf, src, sz );
659 0 : }
660 0 : }
661 :
662 : static void
663 : after_frag( fd_txsend_tile_t * ctx,
664 : ulong in_idx,
665 : ulong seq,
666 : ulong sig,
667 : ulong sz,
668 : ulong tsorig,
669 : ulong tspub,
670 0 : fd_stem_context_t * stem ) {
671 0 : (void)seq; (void)sig; (void)tspub;
672 :
673 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
674 0 : uchar * ip_packet = ctx->quic_buf+sizeof(fd_eth_hdr_t);
675 0 : ulong ip_packet_sz = sz-sizeof(fd_eth_hdr_t);
676 0 : fd_quic_process_packet( ctx->quic, ip_packet, ip_packet_sz, fd_log_wallclock() );
677 0 : } else if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP ) ) {
678 0 : if( FD_LIKELY( sig==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO ) ) handle_contact_info_update( ctx, fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, ctx->chunk ) );
679 0 : else handle_contact_info_remove( ctx, fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, ctx->chunk ) );
680 0 : } else if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_TOWER ) ) {
681 0 : handle_vote_msg( ctx, stem, fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, ctx->chunk ), tsorig );
682 0 : } else if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_EPOCH ) ) {
683 0 : fd_multi_epoch_leaders_epoch_msg_init( ctx->mleaders, fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, ctx->chunk ) );
684 0 : fd_multi_epoch_leaders_stake_msg_fini( ctx->mleaders );
685 0 : ctx->leader_schedules++;
686 0 : } else {
687 0 : FD_LOG_ERR(( "unknown in_kind %d on link %lu", ctx->in_kind[ in_idx ], in_idx ));
688 0 : }
689 0 : }
690 :
691 : static void
692 : privileged_init( fd_topo_t const * topo,
693 0 : fd_topo_tile_t const * tile ) {
694 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
695 :
696 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
697 0 : fd_txsend_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_txsend_tile_t), sizeof(fd_txsend_tile_t) );
698 :
699 0 : if( FD_UNLIKELY( !strcmp( tile->txsend.identity_key_path, "" ) ) )
700 0 : FD_LOG_ERR(( "identity_key_path not set" ));
701 :
702 0 : ctx->identity_key[ 0 ] = *(fd_pubkey_t const *)fd_type_pun_const( fd_keyload_load( tile->txsend.identity_key_path, /* pubkey only: */ 1 ) );
703 :
704 0 : FD_TEST( fd_rng_secure( &ctx->seed, sizeof(ctx->seed) ) );
705 0 : }
706 :
707 : static inline fd_txsend_out_t
708 : out1( fd_topo_t const * topo,
709 : fd_topo_tile_t const * tile,
710 0 : char const * name ) {
711 0 : ulong idx = ULONG_MAX;
712 :
713 0 : for( ulong i=0UL; i<tile->out_cnt; i++ ) {
714 0 : fd_topo_link_t const * link = &topo->links[ tile->out_link_id[ i ] ];
715 0 : if( !strcmp( link->name, name ) ) {
716 0 : if( FD_UNLIKELY( idx!=ULONG_MAX ) ) FD_LOG_ERR(( "tile %s:%lu had multiple output links named %s but expected one", tile->name, tile->kind_id, name ));
717 0 : idx = i;
718 0 : }
719 0 : }
720 :
721 0 : if( FD_UNLIKELY( idx==ULONG_MAX ) ) FD_LOG_ERR(( "tile %s:%lu had no output link named %s", tile->name, tile->kind_id, name ));
722 :
723 0 : void * mem = topo->workspaces[ topo->objs[ topo->links[ tile->out_link_id[ idx ] ].dcache_obj_id ].wksp_id ].wksp;
724 0 : ulong chunk0 = fd_dcache_compact_chunk0( mem, topo->links[ tile->out_link_id[ idx ] ].dcache );
725 0 : ulong wmark = fd_dcache_compact_wmark ( mem, topo->links[ tile->out_link_id[ idx ] ].dcache, topo->links[ tile->out_link_id[ idx ] ].mtu );
726 :
727 0 : return (fd_txsend_out_t){ .idx = idx, .mem = mem, .chunk0 = chunk0, .wmark = wmark, .chunk = chunk0 };
728 0 : }
729 :
730 : static void
731 : unprivileged_init( fd_topo_t const * topo,
732 0 : fd_topo_tile_t const * tile ) {
733 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
734 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
735 0 : fd_txsend_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_txsend_tile_t), sizeof(fd_txsend_tile_t) );
736 0 : void * _quic = FD_SCRATCH_ALLOC_APPEND( l, fd_quic_align(), fd_quic_footprint( &quic_limits ) );
737 0 : void * _peer_map = FD_SCRATCH_ALLOC_APPEND( l, peer_map_align(), peer_map_footprint( 2UL*FD_CONTACT_INFO_TABLE_SIZE ) );
738 :
739 0 : ctx->quic = fd_quic_join( fd_quic_new( _quic, &quic_limits ) );
740 0 : FD_TEST( ctx->quic );
741 :
742 0 : ctx->leader_schedules = 0UL;
743 :
744 0 : ctx->mleaders = fd_multi_epoch_leaders_join( fd_multi_epoch_leaders_new( ctx->mleaders_mem ) );
745 0 : FD_TEST( ctx->mleaders );
746 :
747 0 : ctx->peer_map = peer_map_join( peer_map_new( _peer_map, 2UL*FD_CONTACT_INFO_TABLE_SIZE, ctx->seed ) );
748 0 : FD_TEST( ctx->peer_map );
749 :
750 0 : fd_aio_t * quic_tx_aio = fd_aio_join( fd_aio_new( ctx->quic_tx_aio, ctx, quic_tx_aio_send ) );
751 0 : FD_TEST( quic_tx_aio );
752 0 : fd_quic_set_aio_net_tx( ctx->quic, quic_tx_aio );
753 :
754 0 : ctx->quic->config.role = FD_QUIC_ROLE_CLIENT;
755 0 : ctx->quic->config.idle_timeout = 30e9L;
756 0 : ctx->quic->config.ack_delay = 25e6L;
757 0 : ctx->quic->config.keep_alive = 1;
758 0 : ctx->quic->config.sign = quic_tls_cv_sign;
759 0 : ctx->quic->config.sign_ctx = ctx;
760 0 : fd_memcpy( ctx->quic->config.identity_public_key, ctx->identity_key, sizeof(ctx->identity_key) );
761 :
762 0 : ctx->quic->cb.conn_final = quic_conn_final;
763 0 : ctx->quic->cb.quic_ctx = ctx;
764 :
765 0 : FD_TEST( fd_quic_init( ctx->quic ));
766 :
767 0 : for( ulong i=0UL; i<FD_CONTACT_INFO_TABLE_SIZE; i++ ) {
768 0 : ctx->peers[ i ] = (peer_entry_t){0};
769 0 : }
770 :
771 0 : ctx->conns_len = 0UL;
772 0 : ctx->voted_slot = ULONG_MAX;
773 0 : ctx->net_id = 0;
774 :
775 0 : ctx->src_ip_addr = tile->txsend.ip_addr;
776 0 : ctx->src_port = tile->txsend.txsend_src_port;
777 0 : fd_ip4_udp_hdr_init( ctx->packet_hdr, FD_TXN_MTU, ctx->src_ip_addr, ctx->src_port );
778 :
779 0 : FD_TEST( tile->in_cnt<sizeof(ctx->in_kind)/sizeof(ctx->in_kind[ 0 ]) );
780 0 : for( ulong i=0UL; i<tile->in_cnt; i++ ) {
781 0 : fd_topo_link_t const * link = &topo->links[ tile->in_link_id[ i ] ];
782 0 : fd_topo_wksp_t const * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ];
783 :
784 0 : ctx->in[ i ].mem = link_wksp->wksp;
785 0 : ctx->in[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->in[ i ].mem, link->dcache );
786 0 : ctx->in[ i ].wmark = fd_dcache_compact_wmark ( ctx->in[ i ].mem, link->dcache, link->mtu );
787 0 : ctx->in[ i ].mtu = link->mtu;
788 :
789 0 : if( !strcmp( link->name, "net_txsend" ) ) {
790 0 : fd_net_rx_bounds_init( &ctx->net_in_bounds[ i ], link->dcache );
791 0 : ctx->in_kind[ i ] = IN_KIND_NET;
792 0 : } else if( !strcmp( link->name, "gossip_out" ) ) ctx->in_kind[ i ] = IN_KIND_GOSSIP;
793 0 : else if( !strcmp( link->name, "replay_epoch" ) ) ctx->in_kind[ i ] = IN_KIND_EPOCH;
794 0 : else if( !strcmp( link->name, "tower_out" ) ) ctx->in_kind[ i ] = IN_KIND_TOWER;
795 0 : else if( !strcmp( link->name, "sign_txsend" ) ) ctx->in_kind[ i ] = IN_KIND_SIGN;
796 0 : else FD_LOG_ERR(( "unexpected input link name %s", link->name ));
797 0 : }
798 :
799 0 : *ctx->txsend_out = out1( topo, tile, "txsend_out" );
800 0 : *ctx->net_out = out1( topo, tile, "txsend_net" );
801 :
802 0 : ulong sign_in_idx = fd_topo_find_tile_in_link ( topo, tile, "sign_txsend", tile->kind_id );
803 0 : ulong sign_out_idx = fd_topo_find_tile_out_link( topo, tile, "txsend_sign", tile->kind_id );
804 0 : FD_TEST( sign_in_idx!=ULONG_MAX );
805 0 : fd_topo_link_t const * sign_in = &topo->links[ tile->in_link_id[ sign_in_idx ] ];
806 0 : fd_topo_link_t const * sign_out = &topo->links[ tile->out_link_id[ sign_out_idx ] ];
807 0 : if( FD_UNLIKELY( !fd_keyguard_client_join( fd_keyguard_client_new( ctx->keyguard_client,
808 0 : sign_out->mcache,
809 0 : sign_out->dcache,
810 0 : sign_in->mcache,
811 0 : sign_in->dcache,
812 0 : sign_out->mtu ) ) ) ) {
813 0 : FD_LOG_ERR(( "failed to construct keyguard" ));
814 0 : }
815 :
816 0 : ctx->keyswitch = fd_keyswitch_join( fd_topo_obj_laddr( topo, tile->id_keyswitch_obj_id ) );
817 0 : FD_TEST( ctx->keyswitch );
818 :
819 0 : ctx->tower_in_expect_seq = 0UL;
820 0 : ctx->halt_net_frags = 0;
821 :
822 0 : fd_histf_join( fd_histf_new( ctx->quic->metrics.service_duration, FD_MHIST_SECONDS_MIN( TXSEND, SERVICE_DURATION_SECONDS ),
823 0 : FD_MHIST_SECONDS_MAX( TXSEND, SERVICE_DURATION_SECONDS ) ) );
824 0 : fd_histf_join( fd_histf_new( ctx->quic->metrics.receive_duration, FD_MHIST_SECONDS_MIN( TXSEND, RX_DURATION_SECONDS ),
825 0 : FD_MHIST_SECONDS_MAX( TXSEND, RX_DURATION_SECONDS ) ) );
826 :
827 0 : ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
828 0 : if( FD_UNLIKELY( scratch_top > (ulong)scratch + scratch_footprint( tile ) ) )
829 0 : FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
830 :
831 0 : fd_sleep_until_replay_started( topo );
832 0 : }
833 :
834 : static ulong
835 : populate_allowed_seccomp( fd_topo_t const * topo FD_PARAM_UNUSED,
836 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
837 : ulong out_cnt,
838 0 : struct sock_filter * out ) {
839 :
840 0 : populate_sock_filter_policy_fd_txsend_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() );
841 0 : return sock_filter_policy_fd_txsend_tile_instr_cnt;
842 0 : }
843 :
844 : static ulong
845 : populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED,
846 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
847 : ulong out_fds_cnt,
848 0 : int * out_fds ) {
849 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
850 :
851 0 : ulong out_cnt = 0;
852 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
853 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
854 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
855 0 : return out_cnt;
856 0 : }
857 :
858 0 : #define STEM_BURST 1UL
859 0 : #define STEM_LAZY (128L*3000L)
860 :
861 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_txsend_tile_t
862 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_txsend_tile_t)
863 :
864 0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
865 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
866 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
867 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
868 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
869 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
870 :
871 : #include "../../disco/stem/fd_stem.c"
872 :
873 : fd_topo_run_tile_t fd_tile_txsend = {
874 : .name = "txsend",
875 : .max_event_sz = sizeof(fd_event_signed_vote_t),
876 : .populate_allowed_seccomp = populate_allowed_seccomp,
877 : .populate_allowed_fds = populate_allowed_fds,
878 : .scratch_align = scratch_align,
879 : .scratch_footprint = scratch_footprint,
880 : .privileged_init = privileged_init,
881 : .unprivileged_init = unprivileged_init,
882 : .run = stem_run,
883 : };
|