Line data Source code
1 : #include "fd_txsend_tile.h"
2 : #include "../../disco/topo/fd_topo.h"
3 : #include "../../disco/keyguard/fd_keyload.h"
4 : #include "../../disco/fd_txn_m.h"
5 : #include "../../disco/keyguard/fd_keyguard.h"
6 : #include "../../discof/tower/fd_tower_tile.h"
7 : #include "generated/fd_txsend_tile_seccomp.h"
8 : #include "../../flamenco/gossip/fd_gossip_types.h"
9 :
10 : #include <sys/random.h>
11 :
12 : #define IN_KIND_SIGN (0UL)
13 0 : #define IN_KIND_GOSSIP (1UL)
14 0 : #define IN_KIND_EPOCH (2UL)
15 0 : #define IN_KIND_TOWER (3UL)
16 0 : #define IN_KIND_NET (4UL)
17 :
18 : /* map leader pubkey to contact/conn info
19 : A map entry is created only for staked peers. On receiving contact info, we update
20 : the map entry with the following 4 sockets from the contact info:
21 : - QUIC_VOTE
22 : - QUIC_TPU
23 : - UDP_VOTE
24 : - UDP_TPU
25 :
26 : For the UDP ports, we simply send to that sockaddr when the leader is selected.
27 : For QUIC ports, we establish a quic connection just in time for the leader slot.
28 : We allow that connection to time out if it's going to be dormant for a while.
29 : This reduces the bandwidth consumed and the amount of work the fd_quic needs to do.
30 : */
31 :
32 : #define MAP_NAME fd_txsend_conn_map
33 0 : #define MAP_T fd_txsend_conn_entry_t
34 0 : #define MAP_LG_SLOT_CNT 17
35 0 : #define MAP_KEY pubkey
36 0 : #define MAP_KEY_T fd_pubkey_t
37 0 : #define MAP_KEY_NULL (fd_pubkey_t){0}
38 0 : #define MAP_KEY_EQUAL(k0,k1) (!(memcmp((k0).key,(k1).key,sizeof(fd_pubkey_t))))
39 0 : #define MAP_KEY_INVAL(k) (MAP_KEY_EQUAL((k),MAP_KEY_NULL))
40 : #define MAP_KEY_EQUAL_IS_SLOW 1
41 0 : #define MAP_KEY_HASH(key) ((key).ui[3])
42 : #include "../../util/tmpl/fd_map.c"
43 :
44 : fd_quic_limits_t quic_limits = {
45 : .conn_cnt = MAX_STAKED_LEADERS,
46 : .handshake_cnt = MAX_STAKED_LEADERS,
47 : .conn_id_cnt = FD_QUIC_MIN_CONN_ID_CNT,
48 : .inflight_frame_cnt = 16UL * MAX_STAKED_LEADERS,
49 : .min_inflight_frame_cnt_conn = 4UL,
50 : .stream_id_cnt = 64UL,
51 : .tx_buf_sz = FD_TXN_MTU,
52 : .stream_pool_cnt = 1UL<<13
53 : };
54 :
55 : FD_FN_CONST static inline ulong
56 0 : scratch_align( void ) {
57 0 : return fd_ulong_max( fd_ulong_max( 128UL, fd_quic_align() ), fd_txsend_conn_map_align() );
58 0 : }
59 :
60 : FD_FN_PURE static inline ulong
61 0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED) {
62 0 : ulong l = FD_LAYOUT_INIT;
63 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_txsend_tile_ctx_t), sizeof(fd_txsend_tile_ctx_t) );
64 0 : l = FD_LAYOUT_APPEND( l, fd_quic_align(), fd_quic_footprint( &quic_limits ) );
65 0 : l = FD_LAYOUT_APPEND( l, fd_txsend_conn_map_align(), fd_txsend_conn_map_footprint() );
66 0 : return FD_LAYOUT_FINI( l, scratch_align() );
67 0 : }
68 :
69 : /* QUIC callbacks */
70 :
71 : static void
72 : quic_tls_cv_sign( void * signer_ctx,
73 : uchar signature[ static 64 ],
74 0 : uchar const payload[ static 130 ] ) {
75 0 : fd_txsend_tile_ctx_t * ctx = signer_ctx;
76 0 : long dt = -fd_clock_now( ctx->clock );
77 0 : fd_keyguard_client_sign( ctx->keyguard_client, signature, payload, 130UL, FD_KEYGUARD_SIGN_TYPE_ED25519 );
78 0 : dt += ( ctx->now = fd_clock_now( ctx->clock ) );
79 0 : fd_histf_sample( ctx->metrics.sign_duration, (ulong)dt );
80 0 : }
81 :
82 : /* quic_hs_complete is called when the QUIC handshake is complete
83 : It is currently used only for debug logging */
84 : static void
85 : quic_hs_complete( fd_quic_conn_t * conn,
86 0 : void * quic_ctx FD_PARAM_UNUSED ) {
87 0 : fd_txsend_tile_ctx_t * ctx = fd_type_pun( quic_ctx );
88 0 : fd_txsend_conn_entry_t * entry = fd_type_pun( fd_quic_conn_get_context( conn ) );
89 0 : if( FD_UNLIKELY( !entry ) ) return;
90 :
91 0 : for( ulong i=0; i<FD_TXSEND_PORT_QUIC_CNT; i++ ) {
92 0 : if( entry->conn[i] == conn ) { ctx->metrics.quic_hs_complete[i]++; break; }
93 0 : }
94 0 : FD_DEBUG(
95 0 : FD_BASE58_ENCODE_32_BYTES( entry->pubkey.key, pubkey_b58 );
96 0 : FD_LOG_DEBUG(( "QUIC handshake complete for leader %s", pubkey_b58 ));
97 0 : )
98 0 : }
99 :
100 : inline static int
101 0 : port_idx_is_quic( ulong port_idx ) {
102 0 : return (port_idx==FD_TXSEND_PORT_QUIC_VOTE_IDX) | (port_idx==FD_TXSEND_PORT_QUIC_TPU_IDX);
103 0 : }
104 :
105 : /* quic_conn_final is called when the QUIC connection dies. */
106 : static void
107 : quic_conn_final( fd_quic_conn_t * conn,
108 0 : void * quic_ctx ) {
109 0 : fd_txsend_conn_entry_t * entry = fd_type_pun( fd_quic_conn_get_context( conn ) );
110 0 : if( FD_UNLIKELY( !entry ) ) {
111 0 : FD_LOG_CRIT(( "Conn map entry not found in conn_final" ));
112 0 : }
113 :
114 0 : fd_txsend_tile_ctx_t * ctx = fd_type_pun( quic_ctx );
115 :
116 0 : for( ulong i=0UL; i<FD_TXSEND_PORT_QUIC_CNT; i++ ) {
117 0 : if( entry->conn[i] == conn ) {
118 0 : entry->conn[i] = NULL;
119 0 : FD_DEBUG(
120 0 : FD_BASE58_ENCODE_32_BYTES( entry->pubkey.key, pubkey_b58 );
121 0 : FD_LOG_DEBUG(("Quic final for conn: %p to peer " FD_IP4_ADDR_FMT ":%u in entry %p to pubkey %s",
122 0 : (void*)conn, FD_IP4_ADDR_FMT_ARGS(entry->ip4s[i]), entry->ports[i], (void*)entry,
123 0 : pubkey_b58));
124 0 : )
125 0 : ctx->metrics.quic_conn_final[i]++;
126 0 : if( i==FD_TXSEND_PORT_QUIC_VOTE_IDX ) entry->last_quic_vote_close = ctx->now;
127 0 : return;
128 0 : }
129 0 : }
130 :
131 0 : FD_BASE58_ENCODE_32_BYTES( entry->pubkey.key, pubkey_b58 );
132 0 : FD_LOG_CRIT(( "conn not found in entry for peer %s", pubkey_b58 ));
133 0 : }
134 :
135 : /* send_to_net sends a packet to the net tile.
136 : It takes pointers to the ip4 hdr, udp hdr, and the payload.
137 : Always uses the eth hdr from ctx->packet_hdr. */
138 : static void
139 : send_to_net( fd_txsend_tile_ctx_t * ctx,
140 : fd_ip4_hdr_t const * ip4_hdr,
141 : fd_udp_hdr_t const * udp_hdr,
142 : uchar const * payload,
143 0 : ulong payload_sz ) {
144 :
145 0 : uint const ip_dst = FD_LOAD( uint, ip4_hdr->daddr_c );
146 0 : ulong const ip_sz = FD_IP4_GET_LEN( *ip4_hdr );
147 :
148 0 : fd_txsend_link_out_t * net_out_link = ctx->net_out;
149 0 : uchar * packet_l2 = fd_chunk_to_laddr( net_out_link->mem, net_out_link->chunk );
150 0 : uchar * packet_l3 = packet_l2 + sizeof(fd_eth_hdr_t);
151 0 : uchar * packet_l4 = packet_l3 + ip_sz;
152 0 : uchar * packet_l5 = packet_l4 + sizeof(fd_udp_hdr_t);
153 :
154 0 : fd_memcpy( packet_l2, ctx->packet_hdr->eth, sizeof(fd_eth_hdr_t) );
155 0 : fd_memcpy( packet_l3, ip4_hdr, ip_sz );
156 0 : fd_memcpy( packet_l4, udp_hdr, sizeof(fd_udp_hdr_t) );
157 0 : fd_memcpy( packet_l5, payload, payload_sz );
158 :
159 0 : ulong sig = fd_disco_netmux_sig( ip_dst, 0U, ip_dst, DST_PROTO_OUTGOING, FD_NETMUX_SIG_MIN_HDR_SZ );
160 0 : ulong tspub = (ulong)ctx->now;
161 0 : ulong sz_l2 = sizeof(fd_eth_hdr_t) + ip_sz + sizeof(fd_udp_hdr_t) + payload_sz;
162 :
163 0 : fd_stem_publish( ctx->stem, net_out_link->idx, sig, net_out_link->chunk, sz_l2, 0UL, 0, tspub );
164 0 : net_out_link->chunk = fd_dcache_compact_next( net_out_link->chunk, sz_l2, net_out_link->chunk0, net_out_link->wmark );
165 0 : }
166 :
167 : static int
168 : quic_tx_aio_send( void * _ctx,
169 : fd_aio_pkt_info_t const * batch,
170 : ulong batch_cnt,
171 : ulong * opt_batch_idx,
172 0 : int flush FD_PARAM_UNUSED ) {
173 0 : fd_txsend_tile_ctx_t * ctx = _ctx;
174 :
175 0 : for( ulong i=0; i<batch_cnt; i++ ) {
176 0 : if( FD_UNLIKELY( batch[ i ].buf_sz<FD_NETMUX_SIG_MIN_HDR_SZ ) ) continue;
177 0 : uchar * buf = batch[ i ].buf;
178 0 : fd_ip4_hdr_t * ip4_hdr = fd_type_pun( buf );
179 0 : ulong const ip4_len = FD_IP4_GET_LEN( *ip4_hdr );
180 0 : fd_udp_hdr_t * udp_hdr = fd_type_pun( buf + ip4_len );
181 0 : uchar * payload = buf + ip4_len + sizeof(fd_udp_hdr_t);
182 0 : FD_TEST( batch[ i ].buf_sz >= ip4_len + sizeof(fd_udp_hdr_t) );
183 0 : ulong payload_sz = batch[ i ].buf_sz - ip4_len - sizeof(fd_udp_hdr_t);
184 0 : send_to_net( ctx, ip4_hdr, udp_hdr, payload, payload_sz );
185 0 : }
186 :
187 0 : if( FD_LIKELY( opt_batch_idx ) ) {
188 0 : *opt_batch_idx = batch_cnt;
189 0 : }
190 :
191 0 : return FD_AIO_SUCCESS;
192 0 : }
193 :
194 : static void
195 0 : during_housekeeping( fd_txsend_tile_ctx_t * ctx ) {
196 0 : if( FD_UNLIKELY( ctx->recal_next <= ctx->now ) ) {
197 0 : ctx->recal_next = fd_clock_default_recal( ctx->clock );
198 0 : }
199 0 : }
200 :
201 : /* quic_connect initiates quic connections for a given entry and port. It uses the
202 : contact info stored in entry, and points the conn and entry to each other.
203 : Returns a handle to the new connection, and NULL if creating it failed */
204 :
205 : static fd_quic_conn_t *
206 : quic_connect( fd_txsend_tile_ctx_t * ctx,
207 : fd_txsend_conn_entry_t * entry,
208 0 : ulong port_idx ) {
209 :
210 0 : ulong conn_idx = port_idx;
211 0 : uint dst_ip = entry->ip4s[port_idx];
212 0 : ushort dst_port = entry->ports[port_idx];
213 :
214 0 : FD_TEST( entry->conn[conn_idx] == NULL );
215 :
216 0 : fd_quic_conn_t * conn = fd_quic_connect( ctx->quic, dst_ip, dst_port, ctx->src_ip_addr, ctx->src_port, ctx->now );
217 0 : if( FD_UNLIKELY( !conn ) ) {
218 0 : FD_LOG_WARNING(( "Failed to create QUIC connection to " FD_IP4_ADDR_FMT ":%u", FD_IP4_ADDR_FMT_ARGS(dst_ip), dst_port ));
219 0 : return NULL;
220 0 : }
221 :
222 0 : FD_DEBUG(
223 0 : FD_BASE58_ENCODE_32_BYTES(entry->pubkey.key, pubkey_b58);
224 0 : FD_LOG_DEBUG(("Quic created conn: %p to peer " FD_IP4_ADDR_FMT ":%u in entry %p to pubkey %s",
225 0 : (void*)conn, FD_IP4_ADDR_FMT_ARGS(dst_ip), dst_port, (void*)entry,
226 0 : pubkey_b58));
227 0 : )
228 :
229 0 : entry->conn[conn_idx] = conn;
230 0 : fd_quic_conn_set_context( conn, entry );
231 :
232 0 : return conn;
233 0 : }
234 :
235 : /* ensure_conn_for_slot ensures a connection exists for the given pubkey if it's
236 : a leader for the target slot. Creates connection if needed. */
237 : static void
238 : ensure_conn_for_slot( fd_txsend_tile_ctx_t * ctx,
239 : ulong target_slot,
240 0 : long lifespan ) {
241 0 : fd_pubkey_t const * leader = fd_multi_epoch_leaders_get_leader_for_slot( ctx->mleaders, target_slot );
242 0 : if( FD_UNLIKELY( !leader ) ) {
243 : /* Track NoLeader outcome for both QUIC ports */
244 0 : for( ulong i=0UL; i<FD_TXSEND_PORT_QUIC_CNT; i++ ) {
245 0 : ctx->metrics.ensure_conn_result[i][FD_METRICS_ENUM_TXSEND_ENSURE_CONN_RESULT_V_NO_LEADER_IDX]++;
246 0 : }
247 0 : return;
248 0 : }
249 :
250 0 : fd_txsend_conn_entry_t * entry = fd_txsend_conn_map_query( ctx->conn_map, *leader, NULL );
251 0 : if( FD_UNLIKELY( !entry ) ) FD_LOG_CRIT(( "Tried ensuring conn for unstaked pubkey"));
252 :
253 0 : for( ulong i=0UL; i<FD_TXSEND_PORT_QUIC_CNT; i++ ) {
254 0 : if( FD_UNLIKELY( !entry->ip4s[i] | !entry->ports[i] ) ) {
255 0 : ctx->metrics.ensure_conn_result[i][FD_METRICS_ENUM_TXSEND_ENSURE_CONN_RESULT_V_NO_CI_IDX]++;
256 0 : continue;
257 0 : }
258 :
259 0 : if( !entry->conn[i] ) {
260 : /* Attempting to create new connection */
261 0 : if( (i==FD_TXSEND_PORT_QUIC_VOTE_IDX) & (ctx->now<entry->last_quic_vote_close+1e9L*FD_TXSEND_QUIC_VOTE_MIN_CONN_COOLDOWN_SECONDS) ) {
262 0 : FD_LOG_DEBUG(("Skipping QUIC connection to " FD_IP4_ADDR_FMT ":%u bc of cooldown", FD_IP4_ADDR_FMT_ARGS(entry->ip4s[i]), entry->ports[i] ));
263 0 : ctx->metrics.ensure_conn_result[i][FD_METRICS_ENUM_TXSEND_ENSURE_CONN_RESULT_V_COOLDOWN_IDX]++;
264 0 : continue;
265 0 : }
266 0 : fd_quic_conn_t * conn = quic_connect( ctx, entry, i );
267 0 : if( FD_UNLIKELY( !conn ) ) {
268 0 : ctx->metrics.ensure_conn_result[i][FD_METRICS_ENUM_TXSEND_ENSURE_CONN_RESULT_V_CONN_FAILED_IDX]++;
269 0 : continue;
270 0 : }
271 0 : ctx->metrics.ensure_conn_result[i][FD_METRICS_ENUM_TXSEND_ENSURE_CONN_RESULT_V_NEW_CONNECTION_IDX]++;
272 0 : fd_quic_service( ctx->quic, ctx->now );
273 0 : } else {
274 : /* Connection already exists */
275 0 : ctx->metrics.ensure_conn_result[i][FD_METRICS_ENUM_TXSEND_ENSURE_CONN_RESULT_V_CONNECTED_IDX]++;
276 0 : fd_quic_conn_let_die( entry->conn[i], lifespan, ctx->now );
277 0 : fd_quic_service( ctx->quic, ctx->now );
278 0 : }
279 0 : }
280 0 : }
281 :
282 : /* leader_send sends a payload to 'pubkey' in all possible ways. For quic targets,
283 : it relies on quic connections that are already established. */
284 : static void
285 : leader_send( fd_txsend_tile_ctx_t * ctx,
286 : fd_pubkey_t const * pubkey,
287 : uchar const * payload,
288 0 : ulong payload_sz ) {
289 :
290 0 : fd_txsend_conn_entry_t * entry = fd_txsend_conn_map_query( ctx->conn_map, *pubkey, NULL );
291 0 : if( FD_UNLIKELY( !entry ) ) {
292 0 : FD_LOG_CRIT(( "Tried looking up connection for an unstaked pubkey"));
293 0 : }
294 :
295 0 : for( ulong i=0UL; i<FD_TXSEND_PORT_CNT; i++ ) {
296 : /* skip unroutable */
297 0 : if( !entry->ip4s[i] | !entry->ports[i] ) {
298 0 : ctx->metrics.send_result_cnt[i][FD_METRICS_ENUM_TXN_SEND_RESULT_V_NO_CI_IDX]++;
299 0 : continue;
300 0 : }
301 :
302 0 : if( port_idx_is_quic( i ) ) {
303 0 : fd_quic_conn_t * conn = entry->conn[i];
304 0 : if( FD_UNLIKELY( !conn ) ) {
305 0 : FD_BASE58_ENCODE_32_BYTES( pubkey->key, pubkey_b58 );
306 0 : FD_LOG_DEBUG(("no conn for %s at " FD_IP4_ADDR_FMT ":%u", pubkey_b58, FD_IP4_ADDR_FMT_ARGS(entry->ip4s[i]), entry->ports[i] ));
307 0 : ctx->metrics.send_result_cnt[i][FD_METRICS_ENUM_TXN_SEND_RESULT_V_NO_CONN_IDX]++;
308 0 : continue;
309 0 : }
310 :
311 0 : fd_quic_stream_t * stream = fd_quic_conn_new_stream( conn );
312 0 : if( FD_UNLIKELY( !stream ) ) {
313 0 : FD_BASE58_ENCODE_32_BYTES( pubkey->key, pubkey_b58 );
314 0 : FD_LOG_DEBUG(("new_stream failed for %s at " FD_IP4_ADDR_FMT ":%u bc conn state was %u", pubkey_b58, FD_IP4_ADDR_FMT_ARGS(entry->ip4s[i]), entry->ports[i], conn->state ));
315 0 : ctx->metrics.send_result_cnt[i][FD_METRICS_ENUM_TXN_SEND_RESULT_V_NO_STREAM_IDX]++;
316 0 : continue;
317 0 : }
318 :
319 0 : ctx->metrics.send_result_cnt[i][FD_METRICS_ENUM_TXN_SEND_RESULT_V_SUCCESS_IDX]++;
320 :
321 0 : fd_quic_stream_send( stream, payload, payload_sz, 1 );
322 0 : fd_quic_service( ctx->quic, ctx->now ); /* trigger send ASAP */
323 0 : } else {
324 :
325 0 : fd_ip4_hdr_t * ip4_hdr = ctx->packet_hdr->ip4;
326 0 : fd_udp_hdr_t * udp_hdr = ctx->packet_hdr->udp;
327 :
328 0 : ctx->metrics.send_result_cnt[i][FD_METRICS_ENUM_TXN_SEND_RESULT_V_SUCCESS_IDX]++;
329 :
330 0 : ip4_hdr->daddr = entry->ip4s[i];
331 0 : ip4_hdr->net_tot_len = fd_ushort_bswap( (ushort)(payload_sz + sizeof(fd_ip4_hdr_t)+sizeof(fd_udp_hdr_t)) );
332 0 : ip4_hdr->net_id = fd_ushort_bswap( ctx->net_id++ );
333 0 : ip4_hdr->check = 0;
334 0 : ip4_hdr->check = fd_ip4_hdr_check_fast( ip4_hdr );
335 :
336 0 : udp_hdr->net_dport = fd_ushort_bswap( entry->ports[i] ); /* to net order */
337 0 : udp_hdr->net_len = fd_ushort_bswap( (ushort)( payload_sz + sizeof(fd_udp_hdr_t) ) );
338 0 : send_to_net( ctx, ip4_hdr, udp_hdr, payload, payload_sz );
339 0 : }
340 0 : }
341 0 : }
342 :
343 : /* handle_contact_info_update handles a new contact. Validates contact info
344 : and starts/restarts a connection if necessary. */
345 : static inline void
346 : handle_contact_info_update( fd_txsend_tile_ctx_t * ctx,
347 0 : fd_gossip_update_message_t const * msg ) {
348 0 : static uint const socket_idx[FD_TXSEND_PORT_CNT] = {
349 0 : FD_CONTACT_INFO_SOCKET_TPU_VOTE_QUIC,
350 0 : FD_CONTACT_INFO_SOCKET_TPU_QUIC,
351 0 : FD_CONTACT_INFO_SOCKET_TPU_VOTE,
352 0 : FD_CONTACT_INFO_SOCKET_TPU
353 0 : };
354 :
355 0 : fd_txsend_conn_entry_t * entry = fd_txsend_conn_map_query( ctx->conn_map, *(fd_pubkey_t *)(msg->origin_pubkey), NULL );
356 0 : if( FD_UNLIKELY( !entry ) ) {
357 : /* Skip if UNSTAKED */
358 0 : ctx->metrics.unstaked_ci_rcvd++;
359 0 : return;
360 0 : }
361 :
362 0 : for( ulong i=0UL; i<FD_TXSEND_PORT_CNT; i++ ) {
363 :
364 0 : fd_ip4_port_t const * socket = &msg->contact_info.contact_info->sockets[ socket_idx[i] ];
365 0 : uint new_ip = socket->addr;
366 0 : ushort new_port = fd_ushort_bswap( socket->port ); /* convert port to host order */
367 0 : uint old_ip = entry->ip4s[i];
368 0 : ushort old_port = entry->ports[i];
369 :
370 0 : if( FD_UNLIKELY( !new_ip || !new_port ) ) {
371 0 : FD_BASE58_ENCODE_32_BYTES( msg->origin_pubkey, origin_pubkey_b58 );
372 0 : FD_LOG_DEBUG(( "Unroutable contact info for pubkey %s", origin_pubkey_b58 ));
373 0 : ctx->metrics.new_contact_info[i][FD_METRICS_ENUM_NEW_CONTACT_OUTCOME_V_UNROUTABLE_IDX]++;
374 0 : continue;
375 0 : }
376 :
377 0 : int info_changed = (old_ip != new_ip) | (old_port != new_port);
378 0 : entry->ip4s [i] = new_ip;
379 0 : entry->ports[i] = new_port;
380 :
381 0 : ulong quic_conn_idx = i;
382 0 : if( port_idx_is_quic( i ) && info_changed && entry->conn[quic_conn_idx]!=NULL ) {
383 : /* Track connection finalization before closing */
384 0 : fd_quic_conn_close( entry->conn[i], 0 );
385 0 : }
386 :
387 : /* bc taking branches for just metrics would be sad */
388 : /* !info_changed -> NoChange, info_changed && old_port==0 -> Initialized, info_changed && old_port!=0 -> Changed */
389 0 : static ulong metric_idx_map[] = {
390 0 : FD_METRICS_ENUM_NEW_CONTACT_OUTCOME_V_NO_CHANGE_IDX,
391 0 : FD_METRICS_ENUM_NEW_CONTACT_OUTCOME_V_INITIALIZED_IDX,
392 0 : FD_METRICS_ENUM_NEW_CONTACT_OUTCOME_V_CHANGED_IDX
393 0 : };
394 0 : ulong metric_idx = metric_idx_map[ !!info_changed<<(!!old_port) ];
395 0 : ctx->metrics.new_contact_info[i][metric_idx]++;
396 0 : }
397 0 : }
398 :
399 : static inline void
400 : handle_contact_info_removal( fd_txsend_tile_ctx_t * ctx FD_PARAM_UNUSED,
401 0 : fd_gossip_update_message_t const * msg FD_PARAM_UNUSED ) {
402 0 : fd_txsend_conn_entry_t * entry = fd_txsend_conn_map_query( ctx->conn_map, *(fd_pubkey_t *)(msg->origin_pubkey), NULL );
403 0 : if( FD_LIKELY( entry ) ) {
404 0 : for( ulong i=0UL; i<FD_TXSEND_PORT_QUIC_CNT; i++ ) {
405 0 : if( FD_UNLIKELY( entry->conn[i] ) ) fd_quic_conn_close( entry->conn[i], 0 );
406 0 : entry->ip4s[i] = 0;
407 0 : entry->ports[i] = 0;
408 0 : }
409 0 : ctx->metrics.ci_removed++;
410 0 : }
411 0 : }
412 :
413 : /* Called during after_frag for stake messages. */
414 : static void
415 0 : finalize_stake_msg( fd_txsend_tile_ctx_t * ctx ) {
416 :
417 0 : fd_multi_epoch_leaders_stake_msg_fini( ctx->mleaders );
418 :
419 : /* Get the current stake destinations */
420 0 : fd_vote_stake_weight_t const * stakes = fd_multi_epoch_leaders_get_stake_weights( ctx->mleaders );
421 0 : ulong stake_cnt = fd_multi_epoch_leaders_get_stake_cnt( ctx->mleaders );
422 0 : if( FD_UNLIKELY( !stakes ) ) {
423 0 : FD_LOG_WARNING(( "No stake destinations available for current slot" ));
424 0 : return;
425 0 : }
426 :
427 : /* populate staked validators in connection map */
428 0 : for( ulong i=0UL; i<stake_cnt; i++ ) {
429 0 : fd_vote_stake_weight_t const * stake_info = &stakes[i];
430 0 : fd_pubkey_t const pubkey = stake_info->id_key;
431 :
432 0 : fd_txsend_conn_entry_t * entry = fd_txsend_conn_map_query( ctx->conn_map, pubkey, NULL );
433 : /* UNSTAKED -> NO_CONN: create new entry in NO_CONN state */
434 0 : if( FD_UNLIKELY( !entry ) ) {
435 : /* insert and initialize entry */
436 0 : entry = fd_txsend_conn_map_insert( ctx->conn_map, pubkey );
437 0 : *entry = (fd_txsend_conn_entry_t){.pubkey = entry->pubkey, .hash = entry->hash };
438 0 : }
439 0 : }
440 0 : }
441 :
442 : static void
443 : handle_vote_msg( fd_txsend_tile_ctx_t * ctx,
444 : ulong vote_slot,
445 : uchar * signed_vote_txn,
446 : ulong vote_txn_sz,
447 0 : ulong authority_idx ) {
448 :
449 0 : uchar txn_mem[ FD_TXN_MAX_SZ ] __attribute__((aligned(alignof(fd_txn_t))));
450 0 : fd_txn_t * txn = (fd_txn_t *)txn_mem;
451 0 : FD_TEST( fd_txn_parse( signed_vote_txn, vote_txn_sz, txn_mem, NULL ) );
452 :
453 0 : uchar * signatures = signed_vote_txn + txn->signature_off;
454 0 : uchar const * message = signed_vote_txn + txn->message_off;
455 0 : ulong message_sz = vote_txn_sz - txn->message_off;
456 0 : fd_keyguard_client_vote_txn_sign( ctx->keyguard_client, signatures, authority_idx, message, message_sz );
457 :
458 0 : ulong poh_slot = vote_slot+1;
459 0 : FD_LOG_INFO(( "got vote for slot %lu", vote_slot ));
460 :
461 : /* send to leader for next few slots */
462 0 : for( ulong i=0UL; i<FD_TXSEND_TARGET_LEADER_CNT; i++ ) {
463 0 : ulong target_slot = poh_slot + i*FD_EPOCH_SLOTS_PER_ROTATION;
464 0 : fd_pubkey_t const * leader = fd_multi_epoch_leaders_get_leader_for_slot( ctx->mleaders, target_slot );
465 0 : FD_CRIT( !leader, "leader not found for slot %lu" );
466 0 : leader_send( ctx, leader, signed_vote_txn, vote_txn_sz );
467 0 : }
468 :
469 0 : for( ulong i=0; i<FD_TXSEND_CONNECT_AHEAD_LEADER_CNT; i++ ) {
470 0 : ulong connect_slot = poh_slot + i*FD_EPOCH_SLOTS_PER_ROTATION;
471 : /* keep alive for at least as long as needed */
472 0 : ensure_conn_for_slot( ctx, connect_slot, FD_TXSEND_QUIC_MIN_CONN_LIFETIME_SECONDS * (long)1e9 );
473 0 : }
474 :
475 : /* send to gossip and dedup */
476 0 : fd_txsend_link_out_t * gossip_verify_out = ctx->gossip_verify_out;
477 0 : uchar * msg_to_gossip = fd_chunk_to_laddr( gossip_verify_out->mem, gossip_verify_out->chunk );
478 0 : fd_txn_m_t * txnm = (fd_txn_m_t *)msg_to_gossip;
479 0 : *txnm = (fd_txn_m_t) { 0UL };
480 0 : txnm->payload_sz = (ushort)vote_txn_sz;
481 0 : txnm->source_ipv4 = ctx->src_ip_addr;
482 0 : txnm->source_tpu = FD_TXN_M_TPU_SOURCE_TXSEND;
483 0 : fd_memcpy( msg_to_gossip+sizeof(fd_txn_m_t), signed_vote_txn, vote_txn_sz );
484 0 : ulong msg_sz = fd_txn_m_realized_footprint( txnm, 0, 0 );
485 0 : fd_stem_publish( ctx->stem, gossip_verify_out->idx, 1UL, gossip_verify_out->chunk, msg_sz, 0UL, 0, 0 );
486 0 : gossip_verify_out->chunk = fd_dcache_compact_next(
487 0 : gossip_verify_out->chunk,
488 0 : msg_sz,
489 0 : gossip_verify_out->chunk0,
490 0 : gossip_verify_out->wmark );
491 0 : }
492 :
493 : /* Stem callbacks */
494 :
495 : static inline void
496 : before_credit( fd_txsend_tile_ctx_t * ctx,
497 : fd_stem_context_t * stem,
498 0 : int * charge_busy ) {
499 0 : ctx->stem = stem;
500 :
501 0 : ctx->now = fd_clock_now( ctx->clock );
502 : /* Publishes to mcache via callbacks */
503 0 : *charge_busy = fd_quic_service( ctx->quic, ctx->now );
504 0 : }
505 :
506 : static inline int
507 : before_frag( fd_txsend_tile_ctx_t * ctx,
508 : ulong in_idx,
509 : ulong seq FD_PARAM_UNUSED,
510 0 : ulong sig ) {
511 0 : if( FD_UNLIKELY( ctx->in_links[in_idx].kind==IN_KIND_GOSSIP ) ) {
512 0 : return sig!=FD_GOSSIP_UPDATE_TAG_CONTACT_INFO &&
513 0 : sig!=FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE;
514 0 : }
515 0 : return 0;
516 0 : }
517 :
518 : static void
519 : during_frag( fd_txsend_tile_ctx_t * ctx,
520 : ulong in_idx,
521 : ulong seq FD_PARAM_UNUSED,
522 : ulong sig FD_PARAM_UNUSED,
523 : ulong chunk,
524 : ulong sz,
525 0 : ulong ctl ) {
526 :
527 0 : fd_txsend_link_in_t * in_link = &ctx->in_links[ in_idx ];
528 0 : if( FD_UNLIKELY( chunk<in_link->chunk0 || chunk>in_link->wmark ) ) {
529 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu] on link %lu", chunk, sz, in_link->chunk0, in_link->wmark, in_idx ));
530 0 : }
531 :
532 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( in_link->mem, chunk );
533 0 : ulong kind = in_link->kind;
534 :
535 0 : if( FD_UNLIKELY( kind==IN_KIND_NET ) ) {
536 0 : void const * src = fd_net_rx_translate_frag( &ctx->net_in_bounds[ in_idx ], chunk, ctl, sz );
537 0 : fd_memcpy( ctx->quic_buf, src, sz );
538 0 : }
539 :
540 0 : if( FD_UNLIKELY( kind==IN_KIND_EPOCH ) ) {
541 0 : if( sz>FD_EPOCH_INFO_MAX_MSG_SZ ) {
542 0 : FD_LOG_ERR(( "sz %lu >= max expected epoch update size %lu", sz, FD_EPOCH_INFO_MAX_MSG_SZ ));
543 0 : }
544 0 : fd_multi_epoch_leaders_epoch_msg_init( ctx->mleaders, fd_type_pun_const( dcache_entry ) );
545 0 : }
546 :
547 0 : if( FD_LIKELY( kind==IN_KIND_GOSSIP ) ) {
548 0 : fd_memcpy( ctx->contact_buf, dcache_entry, sz );
549 0 : }
550 :
551 0 : if( FD_UNLIKELY( kind==IN_KIND_TOWER ) ) {
552 0 : if( FD_LIKELY( sig==FD_TOWER_SIG_SLOT_DONE ) ) {
553 0 : FD_TEST( sz==sizeof(fd_tower_slot_done_t) );
554 :
555 0 : fd_tower_slot_done_t const * slot_done = fd_type_pun_const( dcache_entry );
556 :
557 0 : ulong const vote_slot = slot_done->vote_slot;
558 0 : ulong const vote_txn_sz = slot_done->vote_txn_sz;
559 :
560 0 : if( FD_UNLIKELY( vote_slot==ULONG_MAX ) ) return; /* no new vote to send */
561 0 : if( FD_UNLIKELY( !slot_done->has_vote_txn ) ) return; /* invalid vote */
562 :
563 0 : uchar vote_txn[ FD_TPU_MTU ];
564 0 : fd_memcpy( vote_txn, slot_done->vote_txn, vote_txn_sz );
565 :
566 0 : handle_vote_msg( ctx, vote_slot, vote_txn, vote_txn_sz, slot_done->authority_idx );
567 0 : }
568 0 : }
569 0 : }
570 :
571 : static void
572 : after_frag( fd_txsend_tile_ctx_t * ctx,
573 : ulong in_idx,
574 : ulong seq FD_PARAM_UNUSED,
575 : ulong sig FD_PARAM_UNUSED,
576 : ulong sz,
577 : ulong tsorig FD_PARAM_UNUSED,
578 : ulong tspub FD_PARAM_UNUSED,
579 0 : fd_stem_context_t * stem ) {
580 :
581 0 : ctx->stem = stem;
582 :
583 0 : fd_txsend_link_in_t * in_link = &ctx->in_links[ in_idx ];
584 0 : ulong kind = in_link->kind;
585 :
586 0 : if( FD_UNLIKELY( kind==IN_KIND_NET ) ) {
587 0 : uchar * ip_pkt = ctx->quic_buf + sizeof(fd_eth_hdr_t);
588 0 : ulong ip_sz = sz - sizeof(fd_eth_hdr_t);
589 0 : fd_quic_t * quic = ctx->quic;
590 :
591 0 : fd_quic_process_packet( quic, ip_pkt, ip_sz, ctx->now );
592 0 : }
593 :
594 0 : if( FD_UNLIKELY( kind==IN_KIND_GOSSIP ) ) {
595 0 : fd_gossip_update_message_t const * msg = fd_type_pun_const( ctx->contact_buf );
596 0 : if( FD_LIKELY( msg->tag==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO ) ) {
597 0 : handle_contact_info_update( ctx, msg );
598 0 : } else if ( FD_UNLIKELY( msg->tag==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE ) ) {
599 0 : handle_contact_info_removal( ctx, msg );
600 0 : }
601 0 : }
602 :
603 0 : if( FD_UNLIKELY( kind==IN_KIND_EPOCH ) ) {
604 0 : finalize_stake_msg( ctx );
605 0 : }
606 0 : }
607 :
608 : static void
609 : privileged_init( fd_topo_t * topo,
610 0 : fd_topo_tile_t * tile ) {
611 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
612 :
613 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
614 0 : fd_txsend_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_txsend_tile_ctx_t), sizeof(fd_txsend_tile_ctx_t) );
615 0 : fd_memset( ctx, 0, sizeof(fd_txsend_tile_ctx_t) );
616 :
617 0 : if( FD_UNLIKELY( !strcmp( tile->txsend.identity_key_path, "" ) ) )
618 0 : FD_LOG_ERR(( "identity_key_path not set" ));
619 :
620 0 : ctx->identity_key[ 0 ] = *(fd_pubkey_t const *)(fd_keyload_load( tile->txsend.identity_key_path, /* pubkey only: */ 1 ) );
621 0 : }
622 :
623 : static fd_txsend_link_in_t *
624 : setup_input_link( fd_txsend_tile_ctx_t * ctx,
625 : fd_topo_t * topo,
626 : fd_topo_tile_t * tile,
627 : ulong kind,
628 0 : ulong in_idx) {
629 0 : fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ in_idx ] ];
630 0 : fd_txsend_link_in_t * in_link_desc = &ctx->in_links[ in_idx ];
631 :
632 0 : in_link_desc->mem = topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ].wksp;
633 0 : in_link_desc->chunk0 = fd_dcache_compact_chunk0( in_link_desc->mem, in_link->dcache );
634 0 : in_link_desc->wmark = fd_dcache_compact_wmark( in_link_desc->mem, in_link->dcache, in_link->mtu );
635 0 : in_link_desc->dcache = in_link->dcache;
636 0 : in_link_desc->kind = kind;
637 :
638 0 : return in_link_desc;
639 0 : }
640 :
641 : static void
642 : setup_output_link( fd_txsend_link_out_t * desc,
643 : fd_topo_t * topo,
644 : fd_topo_tile_t * tile,
645 0 : const char * name ) {
646 0 : ulong out_idx = fd_topo_find_tile_out_link( topo, tile, name, 0 );
647 0 : FD_TEST( out_idx!=ULONG_MAX );
648 0 : fd_topo_link_t * out_link = &topo->links[ tile->out_link_id[ out_idx ] ];
649 :
650 0 : desc->idx = out_idx;
651 0 : desc->mcache = out_link->mcache;
652 0 : desc->sync = fd_mcache_seq_laddr( desc->mcache );
653 0 : desc->depth = fd_mcache_depth( desc->mcache );
654 0 : desc->mem = topo->workspaces[ topo->objs[ out_link->dcache_obj_id ].wksp_id ].wksp;
655 0 : desc->chunk0 = fd_dcache_compact_chunk0( desc->mem, out_link->dcache );
656 0 : desc->wmark = fd_dcache_compact_wmark( desc->mem, out_link->dcache, out_link->mtu );
657 0 : desc->chunk = desc->chunk0;
658 0 : }
659 :
660 : static void
661 : unprivileged_init( fd_topo_t * topo,
662 0 : fd_topo_tile_t * tile ) {
663 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
664 :
665 0 : if( FD_UNLIKELY( !tile->out_cnt ) ) FD_LOG_ERR(( "txsend has no output link" ));
666 :
667 : /* Scratch mem setup */
668 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
669 0 : fd_txsend_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_txsend_tile_ctx_t), sizeof(fd_txsend_tile_ctx_t) );
670 :
671 0 : ctx->mleaders = fd_multi_epoch_leaders_join( fd_multi_epoch_leaders_new( ctx->mleaders_mem ) );
672 0 : FD_TEST( ctx->mleaders );
673 :
674 0 : fd_aio_t * quic_tx_aio = fd_aio_join( fd_aio_new( ctx->quic_tx_aio, ctx, quic_tx_aio_send ) );
675 0 : if( FD_UNLIKELY( !quic_tx_aio ) ) FD_LOG_ERR(( "fd_aio_join failed" ));
676 :
677 0 : fd_quic_t * quic = fd_quic_join( fd_quic_new( FD_SCRATCH_ALLOC_APPEND( l, fd_quic_align(), fd_quic_footprint( &quic_limits ) ), &quic_limits ) );
678 0 : if( FD_UNLIKELY( !quic ) ) FD_LOG_ERR(( "fd_quic_new failed" ));
679 :
680 0 : quic->config.role = FD_QUIC_ROLE_CLIENT;
681 0 : quic->config.idle_timeout = FD_TXSEND_QUIC_IDLE_TIMEOUT_NS;
682 0 : quic->config.ack_delay = FD_TXSEND_QUIC_ACK_DELAY_NS;
683 0 : quic->config.keep_alive = 1;
684 0 : quic->config.sign = quic_tls_cv_sign;
685 0 : quic->config.sign_ctx = ctx;
686 0 : fd_memcpy( quic->config.identity_public_key, ctx->identity_key, sizeof(ctx->identity_key) );
687 :
688 0 : quic->cb.conn_hs_complete = quic_hs_complete;
689 0 : quic->cb.conn_final = quic_conn_final;
690 0 : quic->cb.quic_ctx = ctx;
691 :
692 0 : fd_quic_set_aio_net_tx( quic, quic_tx_aio );
693 0 : FD_TEST_CUSTOM( fd_quic_init( quic ), "fd_quic_init failed" );
694 :
695 0 : ctx->quic = quic;
696 :
697 : /* Initialize connection map */
698 0 : void * conn_map_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_txsend_conn_map_align(), fd_txsend_conn_map_footprint() );
699 0 : ctx->conn_map = fd_txsend_conn_map_join( fd_txsend_conn_map_new( conn_map_mem ) );
700 0 : if( FD_UNLIKELY( !ctx->conn_map ) ) FD_LOG_ERR(( "fd_txsend_conn_map_join failed" ));
701 :
702 0 : ctx->src_ip_addr = tile->txsend.ip_addr;
703 0 : ctx->src_port = tile->txsend.txsend_src_port;
704 0 : fd_ip4_udp_hdr_init( ctx->packet_hdr, FD_TXN_MTU, ctx->src_ip_addr, ctx->src_port );
705 :
706 : /* Initialize input links */
707 0 : for( ulong i=0; i<tile->in_cnt; i++ ) {
708 0 : fd_topo_link_t * link = &topo->links[ tile->in_link_id[ i ] ];
709 0 : if( 0==strcmp( link->name, "net_txsend" ) ) {
710 0 : setup_input_link( ctx, topo, tile, IN_KIND_NET, i );
711 0 : fd_net_rx_bounds_init( &ctx->net_in_bounds[ i ], link->dcache );
712 0 : } else if( 0==strcmp( link->name, "gossip_out" ) ) {
713 0 : setup_input_link( ctx, topo, tile, IN_KIND_GOSSIP, i );
714 0 : } else if( 0==strcmp( link->name, "replay_epoch" ) ) {
715 0 : setup_input_link( ctx, topo, tile, IN_KIND_EPOCH, i );
716 0 : } else if( 0==strcmp( link->name, "tower_out" ) ) {
717 0 : setup_input_link( ctx, topo, tile, IN_KIND_TOWER, i );
718 0 : }
719 0 : }
720 :
721 0 : setup_output_link( ctx->gossip_verify_out, topo, tile, "txsend_out" );
722 0 : setup_output_link( ctx->net_out, topo, tile, "txsend_net" );
723 :
724 : /* Set up keyguard(s) */
725 0 : ulong sign_in_idx = fd_topo_find_tile_in_link( topo, tile, "sign_txsend", 0 );
726 0 : ulong sign_out_idx = fd_topo_find_tile_out_link( topo, tile, "txsend_sign", 0 );
727 0 : fd_topo_link_t * sign_in = &topo->links[ tile->in_link_id[ sign_in_idx ] ];
728 0 : fd_topo_link_t * sign_out = &topo->links[ tile->out_link_id[ sign_out_idx ] ];
729 :
730 0 : if ( fd_keyguard_client_join( fd_keyguard_client_new( ctx->keyguard_client,
731 0 : sign_out->mcache,
732 0 : sign_out->dcache,
733 0 : sign_in->mcache,
734 0 : sign_in->dcache,
735 0 : sign_out->mtu ) )==NULL ) {
736 0 : FD_LOG_ERR(( "Keyguard join failed" ));
737 0 : }
738 :
739 : /* init metrics */
740 0 : fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
741 0 : fd_histf_join( fd_histf_new( ctx->metrics.sign_duration,
742 0 : FD_MHIST_MIN( TXSEND, SIGN_DURATION_NANOS ),
743 0 : FD_MHIST_MAX( TXSEND, SIGN_DURATION_NANOS ) ) );
744 :
745 : /* Call new/join here rather than in fd_quic so min/max can differ across uses */
746 0 : fd_histf_join( fd_histf_new( quic->metrics.service_duration,
747 0 : FD_MHIST_SECONDS_MIN( TXSEND, SERVICE_DURATION_SECONDS ),
748 0 : FD_MHIST_SECONDS_MAX( TXSEND, SERVICE_DURATION_SECONDS ) ) );
749 0 : fd_histf_join( fd_histf_new( quic->metrics.receive_duration,
750 0 : FD_MHIST_SECONDS_MIN( TXSEND, RECEIVE_DURATION_SECONDS ),
751 0 : FD_MHIST_SECONDS_MAX( TXSEND, RECEIVE_DURATION_SECONDS ) ) );
752 :
753 0 : ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
754 0 : if( FD_UNLIKELY( scratch_top != (ulong)scratch + scratch_footprint( tile ) ) ) {
755 0 : FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
756 0 : }
757 :
758 0 : fd_clock_t * clock = ctx->clock;
759 0 : fd_clock_default_init( clock, ctx->clock_mem );
760 0 : ctx->recal_next = fd_clock_recal_next( clock );
761 0 : ctx->now = fd_clock_now( clock );
762 0 : }
763 :
764 : static ulong
765 : populate_allowed_seccomp( fd_topo_t const * topo FD_PARAM_UNUSED,
766 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
767 : ulong out_cnt,
768 0 : struct sock_filter * out ) {
769 :
770 0 : populate_sock_filter_policy_fd_txsend_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() );
771 0 : return sock_filter_policy_fd_txsend_tile_instr_cnt;
772 0 : }
773 :
774 : static ulong
775 : populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED,
776 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
777 : ulong out_fds_cnt,
778 0 : int * out_fds ) {
779 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
780 :
781 0 : ulong out_cnt = 0;
782 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
783 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
784 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
785 0 : return out_cnt;
786 0 : }
787 :
788 : static void
789 0 : metrics_write( fd_txsend_tile_ctx_t * ctx ) {
790 :
791 : /* Basic counters */
792 0 : FD_MCNT_SET( TXSEND, UNSTAKED_CI, ctx->metrics.unstaked_ci_rcvd );
793 0 : FD_MCNT_SET( TXSEND, CI_REMOVED, ctx->metrics.ci_removed );
794 :
795 : /* Port-separated contact info metrics */
796 0 : FD_MCNT_ENUM_COPY( TXSEND, NEW_CONTACT_INFO_QUIC_VOTE, ctx->metrics.new_contact_info[FD_TXSEND_PORT_QUIC_VOTE_IDX] );
797 0 : FD_MCNT_ENUM_COPY( TXSEND, NEW_CONTACT_INFO_QUIC_TPU, ctx->metrics.new_contact_info[FD_TXSEND_PORT_QUIC_TPU_IDX] );
798 0 : FD_MCNT_ENUM_COPY( TXSEND, NEW_CONTACT_INFO_UDP_VOTE, ctx->metrics.new_contact_info[FD_TXSEND_PORT_UDP_VOTE_IDX] );
799 0 : FD_MCNT_ENUM_COPY( TXSEND, NEW_CONTACT_INFO_UDP_TPU, ctx->metrics.new_contact_info[FD_TXSEND_PORT_UDP_TPU_IDX] );
800 :
801 : /* Port-separated send result metrics */
802 0 : FD_MCNT_ENUM_COPY( TXSEND, SEND_RESULT_QUIC_VOTE, ctx->metrics.send_result_cnt[FD_TXSEND_PORT_QUIC_VOTE_IDX] );
803 0 : FD_MCNT_ENUM_COPY( TXSEND, SEND_RESULT_QUIC_TPU, ctx->metrics.send_result_cnt[FD_TXSEND_PORT_QUIC_TPU_IDX] );
804 0 : FD_MCNT_ENUM_COPY( TXSEND, SEND_RESULT_UDP_VOTE, ctx->metrics.send_result_cnt[FD_TXSEND_PORT_UDP_VOTE_IDX] );
805 0 : FD_MCNT_ENUM_COPY( TXSEND, SEND_RESULT_UDP_TPU, ctx->metrics.send_result_cnt[FD_TXSEND_PORT_UDP_TPU_IDX] );
806 :
807 : /* Port-separated QUIC metrics */
808 0 : FD_MCNT_ENUM_COPY( TXSEND, HANDSHAKE_COMPLETE, ctx->metrics.quic_hs_complete );
809 0 : FD_MCNT_ENUM_COPY( TXSEND, QUIC_CONN_FINAL, ctx->metrics.quic_conn_final );
810 0 : FD_MCNT_ENUM_COPY( TXSEND, ENSURE_CONN_RESULT_QUIC_VOTE, ctx->metrics.ensure_conn_result[FD_METRICS_ENUM_TXSEND_QUIC_PORTS_V_QUIC_VOTE_IDX] );
811 0 : FD_MCNT_ENUM_COPY( TXSEND, ENSURE_CONN_RESULT_QUIC_TPU, ctx->metrics.ensure_conn_result[FD_METRICS_ENUM_TXSEND_QUIC_PORTS_V_QUIC_TPU_IDX] );
812 :
813 0 : FD_MHIST_COPY( TXSEND, SIGN_DURATION_NANOS, ctx->metrics.sign_duration );
814 :
815 : /* General QUIC metrics */
816 0 : FD_MCNT_SET( TXSEND, RECEIVED_BYTES, ctx->quic->metrics.net_rx_byte_cnt );
817 0 : FD_MCNT_ENUM_COPY( TXSEND, RECEIVED_FRAMES, ctx->quic->metrics.frame_rx_cnt );
818 0 : FD_MCNT_SET( TXSEND, RECEIVED_PACKETS, ctx->quic->metrics.net_rx_pkt_cnt );
819 0 : FD_MCNT_SET( TXSEND, STREAM_RECEIVED_BYTES, ctx->quic->metrics.stream_rx_byte_cnt );
820 0 : FD_MCNT_SET( TXSEND, STREAM_RECEIVED_EVENTS, ctx->quic->metrics.stream_rx_event_cnt );
821 :
822 0 : FD_MCNT_SET( TXSEND, SENT_PACKETS, ctx->quic->metrics.net_tx_pkt_cnt );
823 0 : FD_MCNT_SET( TXSEND, SENT_BYTES, ctx->quic->metrics.net_tx_byte_cnt );
824 0 : FD_MCNT_SET( TXSEND, RETRY_SENT, ctx->quic->metrics.retry_tx_cnt );
825 0 : FD_MCNT_ENUM_COPY( TXSEND, ACK_TX, ctx->quic->metrics.ack_tx );
826 :
827 0 : FD_MGAUGE_ENUM_COPY( TXSEND, CONNECTIONS_STATE, ctx->quic->metrics.conn_state_cnt );
828 0 : FD_MGAUGE_SET( TXSEND, CONNECTIONS_ALLOC, ctx->quic->metrics.conn_alloc_cnt );
829 0 : FD_MCNT_SET( TXSEND, CONNECTIONS_CREATED, ctx->quic->metrics.conn_created_cnt );
830 0 : FD_MCNT_SET( TXSEND, CONNECTIONS_CLOSED, ctx->quic->metrics.conn_closed_cnt );
831 0 : FD_MCNT_SET( TXSEND, CONNECTIONS_ABORTED, ctx->quic->metrics.conn_aborted_cnt );
832 0 : FD_MCNT_SET( TXSEND, CONNECTIONS_TIMED_OUT, ctx->quic->metrics.conn_timeout_cnt );
833 0 : FD_MCNT_SET( TXSEND, CONNECTIONS_RETRIED, ctx->quic->metrics.conn_retry_cnt );
834 0 : FD_MCNT_SET( TXSEND, CONNECTION_ERROR_NO_SLOTS, ctx->quic->metrics.conn_err_no_slots_cnt );
835 0 : FD_MCNT_SET( TXSEND, CONNECTION_ERROR_RETRY_FAIL, ctx->quic->metrics.conn_err_retry_fail_cnt );
836 :
837 0 : FD_MCNT_ENUM_COPY( TXSEND, PKT_CRYPTO_FAILED, ctx->quic->metrics.pkt_decrypt_fail_cnt );
838 0 : FD_MCNT_ENUM_COPY( TXSEND, PKT_NO_KEY, ctx->quic->metrics.pkt_no_key_cnt );
839 0 : FD_MCNT_ENUM_COPY( TXSEND, PKT_NO_CONN, ctx->quic->metrics.pkt_no_conn_cnt );
840 0 : FD_MCNT_ENUM_COPY( TXSEND, FRAME_TX_ALLOC, ctx->quic->metrics.frame_tx_alloc_cnt );
841 0 : FD_MCNT_SET( TXSEND, PKT_NET_HEADER_INVALID, ctx->quic->metrics.pkt_net_hdr_err_cnt );
842 0 : FD_MCNT_SET( TXSEND, PKT_QUIC_HEADER_INVALID, ctx->quic->metrics.pkt_quic_hdr_err_cnt );
843 0 : FD_MCNT_SET( TXSEND, PKT_UNDERSZ, ctx->quic->metrics.pkt_undersz_cnt );
844 0 : FD_MCNT_SET( TXSEND, PKT_OVERSZ, ctx->quic->metrics.pkt_oversz_cnt );
845 0 : FD_MCNT_SET( TXSEND, PKT_VERNEG, ctx->quic->metrics.pkt_verneg_cnt );
846 0 : FD_MCNT_ENUM_COPY( TXSEND, PKT_RETRANSMISSIONS, ctx->quic->metrics.pkt_retransmissions_cnt );
847 :
848 0 : FD_MCNT_SET( TXSEND, HANDSHAKES_CREATED, ctx->quic->metrics.hs_created_cnt );
849 0 : FD_MCNT_SET( TXSEND, HANDSHAKE_ERROR_ALLOC_FAIL, ctx->quic->metrics.hs_err_alloc_fail_cnt );
850 0 : FD_MCNT_SET( TXSEND, HANDSHAKE_EVICTED, ctx->quic->metrics.hs_evicted_cnt );
851 :
852 0 : FD_MCNT_SET( TXSEND, FRAME_FAIL_PARSE, ctx->quic->metrics.frame_rx_err_cnt );
853 :
854 0 : FD_MHIST_COPY( TXSEND, SERVICE_DURATION_SECONDS, ctx->quic->metrics.service_duration );
855 0 : FD_MHIST_COPY( TXSEND, RECEIVE_DURATION_SECONDS, ctx->quic->metrics.receive_duration );
856 0 : }
857 :
858 :
859 0 : #define STEM_BURST 1UL
860 0 : #define STEM_LAZY (10e3L) /* 10us */
861 :
862 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_txsend_tile_ctx_t
863 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_txsend_tile_ctx_t)
864 :
865 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
866 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
867 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
868 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
869 0 : #define STEM_CALLBACK_BEFORE_CREDIT before_credit
870 0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
871 : #include "../../disco/stem/fd_stem.c"
872 :
873 : fd_topo_run_tile_t fd_tile_txsend = {
874 : .name = "txsend",
875 : .populate_allowed_seccomp = populate_allowed_seccomp,
876 : .populate_allowed_fds = populate_allowed_fds,
877 : .scratch_align = scratch_align,
878 : .scratch_footprint = scratch_footprint,
879 : .privileged_init = privileged_init,
880 : .unprivileged_init = unprivileged_init,
881 : .run = stem_run,
882 : };
|