Line data Source code
1 : #include "fd_send_tile.h"
2 :
3 : #include <errno.h>
4 : #include <sys/random.h>
5 :
6 : /* map leader pubkey to quic conn
7 : a peer entry can be in 3 states:
8 : - UNSTAKED: no element in map
9 : - NO_CONN: pubkey maps to null conn. staked, but no active conn
10 : - CONN: pubkey maps to conn. staked, connection initiated
11 :
12 : The state machine works as follows:
13 :
14 : receive stake msg including pubkey:
15 : - if UNSTAKED, create new entry in NO_CONN state
16 :
17 : receive contact info:
18 : - Update contact info. NO_CONN -> CONN. If CONN and contact info changed,
19 : restart the conn.
20 : - touch last_ci_ticks
21 :
22 : Conn closed:
23 : - reconnect, unless contact info stale
24 : */
25 0 : #define CONTACT_INFO_STALE_TICKS (60e9L) /* ~60 seconds */
26 :
27 : #define MAP_NAME fd_send_conn_map
28 0 : #define MAP_T fd_send_conn_entry_t
29 0 : #define MAP_LG_SLOT_CNT 17
30 0 : #define MAP_KEY pubkey
31 0 : #define MAP_KEY_T fd_pubkey_t
32 0 : #define MAP_KEY_NULL (fd_pubkey_t){0}
33 0 : #define MAP_KEY_EQUAL(k0,k1) (!(memcmp((k0).key,(k1).key,sizeof(fd_pubkey_t))))
34 0 : #define MAP_KEY_INVAL(k) (MAP_KEY_EQUAL((k),MAP_KEY_NULL))
35 : #define MAP_KEY_EQUAL_IS_SLOW 1
36 0 : #define MAP_KEY_HASH(key) ((key).ui[3])
37 : #include "../../util/tmpl/fd_map.c"
38 :
39 : fd_quic_limits_t quic_limits = {
40 : .conn_cnt = MAX_STAKED_LEADERS,
41 : .handshake_cnt = MAX_STAKED_LEADERS,
42 : .conn_id_cnt = FD_QUIC_MIN_CONN_ID_CNT,
43 : .inflight_frame_cnt = 16UL * MAX_STAKED_LEADERS,
44 : .min_inflight_frame_cnt_conn = 4UL,
45 : .stream_id_cnt = 16UL,
46 : .tx_buf_sz = FD_TXN_MTU,
47 : .stream_pool_cnt = 2048UL
48 : };
49 :
50 : FD_FN_CONST static inline ulong
51 0 : scratch_align( void ) {
52 0 : return fd_ulong_max( fd_ulong_max( 128UL, fd_quic_align() ), fd_send_conn_map_align() );
53 0 : }
54 :
55 : FD_FN_PURE static inline ulong
56 0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED) {
57 0 : ulong l = FD_LAYOUT_INIT;
58 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_send_tile_ctx_t), sizeof(fd_send_tile_ctx_t) );
59 0 : l = FD_LAYOUT_APPEND( l, fd_quic_align(), fd_quic_footprint( &quic_limits ) );
60 0 : l = FD_LAYOUT_APPEND( l, fd_send_conn_map_align(), fd_send_conn_map_footprint() );
61 0 : return FD_LAYOUT_FINI( l, scratch_align() );
62 0 : }
63 :
64 : /* QUIC callbacks */
65 :
66 : static void
67 : quic_tls_cv_sign( void * signer_ctx,
68 : uchar signature[ static 64 ],
69 0 : uchar const payload[ static 130 ] ) {
70 0 : fd_send_tile_ctx_t * ctx = signer_ctx;
71 0 : fd_sha512_t * sha512 = fd_sha512_join( ctx->sha512 );
72 0 : fd_ed25519_sign( signature, payload, 130UL, ctx->tls_pub_key, ctx->tls_priv_key, sha512 );
73 0 : fd_sha512_leave( sha512 );
74 0 : }
75 :
76 : /* quic_now is used by quic to get current time */
77 : static ulong
78 0 : quic_now( void * _ctx ) {
79 0 : fd_send_tile_ctx_t * ctx = (fd_send_tile_ctx_t *)_ctx;
80 0 : return (ulong)ctx->now;
81 0 : }
82 :
83 : /* quic_hs_complete is called when the QUIC handshake is complete
84 : It is currently used only for debug logging */
85 : static void
86 : quic_hs_complete( fd_quic_conn_t * conn,
87 0 : void * quic_ctx FD_PARAM_UNUSED ) {
88 0 : fd_send_conn_entry_t * entry = fd_type_pun( fd_quic_conn_get_context( conn ) );
89 0 : if( FD_UNLIKELY( !entry ) ) return;
90 0 : FD_LOG_DEBUG(("send_tile: QUIC handshake complete for leader %s", FD_BASE58_ENC_32_ALLOCA( entry->pubkey.key )));
91 0 : }
92 :
93 : /* quic_conn_final is called when the QUIC connection dies.
94 : Reconnects if contact info is recent enough. */
95 : static void
96 : quic_conn_final( fd_quic_conn_t * conn,
97 0 : void * quic_ctx ) {
98 0 : fd_send_tile_ctx_t * ctx = quic_ctx;
99 :
100 0 : fd_send_conn_entry_t * entry = fd_type_pun( fd_quic_conn_get_context( conn ) );
101 0 : if( FD_UNLIKELY( !entry ) ) {
102 0 : FD_LOG_ERR(( "send_tile: Conn map entry not found in conn_final" ));
103 0 : }
104 :
105 0 : if( ctx->now - entry->last_ci_ticks > CONTACT_INFO_STALE_TICKS ) {
106 : /* stale contact info, don't reconnect */
107 0 : entry->conn = NULL;
108 0 : ctx->metrics.contact_stale++;
109 0 : return;
110 0 : }
111 :
112 0 : uint ip4_addr = entry->ip4_addr;
113 0 : FD_LOG_DEBUG(("send_tile: Quic conn final: %p to peer %u.%u.%u.%u:%u", (void*)conn, ip4_addr&0xFF, (ip4_addr>>8)&0xFF, (ip4_addr>>16)&0xFF, (ip4_addr>>24)&0xFF, entry->udp_port));
114 0 : entry->conn = NULL;
115 0 : quic_connect( ctx, entry );
116 0 : }
117 :
118 : static int
119 : quic_tx_aio_send( void * _ctx,
120 : fd_aio_pkt_info_t const * batch,
121 : ulong batch_cnt,
122 : ulong * opt_batch_idx,
123 0 : int flush ) {
124 0 : (void)flush;
125 :
126 0 : fd_send_tile_ctx_t * ctx = _ctx;
127 :
128 0 : for( ulong i=0; i<batch_cnt; i++ ) {
129 0 : if( FD_UNLIKELY( batch[ i ].buf_sz<FD_NETMUX_SIG_MIN_HDR_SZ ) ) continue;
130 :
131 0 : uint const ip_dst = FD_LOAD( uint, batch[ i ].buf+offsetof( fd_ip4_hdr_t, daddr_c ) );
132 :
133 0 : fd_send_link_out_t * net_out_link = ctx->net_out;
134 0 : uchar * packet_l2 = fd_chunk_to_laddr( net_out_link->mem, net_out_link->chunk );
135 0 : uchar * packet_l3 = packet_l2 + sizeof(fd_eth_hdr_t);
136 0 : memset( packet_l2, 0, 12 );
137 0 : FD_STORE( ushort, packet_l2+offsetof( fd_eth_hdr_t, net_type ), fd_ushort_bswap( FD_ETH_HDR_TYPE_IP ) );
138 0 : fd_memcpy( packet_l3, batch[ i ].buf, batch[ i ].buf_sz );
139 0 : ulong sz_l2 = sizeof(fd_eth_hdr_t) + batch[ i ].buf_sz;
140 :
141 : /* send packets are just round-robined by sequence number, so for now
142 : just indicate where they came from so they don't bounce back */
143 0 : ulong sig = fd_disco_netmux_sig( ip_dst, 0U, ip_dst, DST_PROTO_OUTGOING, FD_NETMUX_SIG_MIN_HDR_SZ );
144 :
145 0 : ulong tspub = (ulong)ctx->now;
146 :
147 0 : fd_stem_publish( ctx->stem, net_out_link->idx, sig, net_out_link->chunk, sz_l2, 0UL, 0, tspub );
148 0 : net_out_link->chunk = fd_dcache_compact_next( net_out_link->chunk, sz_l2, net_out_link->chunk0, net_out_link->wmark );
149 0 : }
150 :
151 0 : if( FD_LIKELY( opt_batch_idx ) ) {
152 0 : *opt_batch_idx = batch_cnt;
153 0 : }
154 :
155 0 : return FD_AIO_SUCCESS;
156 0 : }
157 :
158 : /* QUIC conn management and wrappers */
159 :
160 : fd_quic_conn_t *
161 : quic_connect( fd_send_tile_ctx_t * ctx,
162 0 : fd_send_conn_entry_t * entry ) {
163 0 : uint dst_ip = entry->ip4_addr;
164 0 : ushort dst_port = entry->udp_port;
165 :
166 0 : FD_TEST( entry->conn == NULL );
167 :
168 : // if( FD_UNLIKELY( entry->conn ) ) {
169 : // /* close existing conn */
170 : // fd_quic_conn_close( entry->conn, 0 );
171 : // }
172 :
173 0 : fd_quic_conn_t * conn = fd_quic_connect( ctx->quic, dst_ip, dst_port, ctx->src_ip_addr, ctx->src_port );
174 0 : if( FD_UNLIKELY( !conn ) ) {
175 0 : ctx->metrics.quic_conn_create_failed++;
176 0 : FD_LOG_WARNING(( "send_tile: Failed to create QUIC connection to %u.%u.%u.%u:%u", dst_ip&0xFF, (dst_ip>>8)&0xFF, (dst_ip>>16)&0xFF, (dst_ip>>24)&0xFF, dst_port ));
177 0 : return NULL;
178 0 : }
179 :
180 0 : FD_LOG_DEBUG(("send_tile: Quic conn created: %p to peer %u.%u.%u.%u:%u", (void*)conn, dst_ip&0xFF, (dst_ip>>8)&0xFF, (dst_ip>>16)&0xFF, (dst_ip>>24)&0xFF, dst_port));
181 :
182 0 : entry->conn = conn;
183 0 : fd_quic_conn_set_context( conn, entry );
184 :
185 0 : return conn;
186 0 : }
187 :
188 : /* get_quic_conn looks up a QUIC connection for a given pubkey. */
189 : static fd_quic_conn_t *
190 : get_quic_conn( fd_send_tile_ctx_t * ctx,
191 0 : fd_pubkey_t const * pubkey ) {
192 0 : fd_send_conn_entry_t * entry = fd_send_conn_map_query( ctx->conn_map, *pubkey, NULL );
193 0 : if( FD_LIKELY( entry ) ) {
194 0 : return entry->conn;
195 0 : }
196 0 : return NULL;
197 0 : }
198 :
199 : void
200 : quic_send( fd_send_tile_ctx_t * ctx,
201 : fd_pubkey_t const * pubkey,
202 : uchar const * payload,
203 0 : ulong payload_sz ) {
204 :
205 0 : fd_quic_conn_t * conn = get_quic_conn( ctx, pubkey );
206 0 : if( FD_UNLIKELY( !conn ) ) {
207 0 : ctx->metrics.quic_send_result_cnt[FD_METRICS_ENUM_TXN_QUIC_SEND_RESULT_V_NO_CONN_IDX]++;
208 0 : FD_LOG_DEBUG(("send_tile: Quic conn not found for leader %s", FD_BASE58_ENC_32_ALLOCA( pubkey->key )));
209 0 : return;
210 0 : }
211 :
212 0 : fd_quic_stream_t * stream = fd_quic_conn_new_stream( conn );
213 0 : if( FD_UNLIKELY( !stream ) ) {
214 0 : ctx->metrics.quic_send_result_cnt[FD_METRICS_ENUM_TXN_QUIC_SEND_RESULT_V_NO_STREAM_IDX]++;
215 0 : FD_LOG_DEBUG(("send_tile: Quic stream unavailable for leader %s", FD_BASE58_ENC_32_ALLOCA( pubkey->key )));
216 0 : return;
217 0 : }
218 :
219 0 : FD_LOG_DEBUG(("send_tile: Sending txn to leader %s", FD_BASE58_ENC_32_ALLOCA( pubkey->key )));
220 0 : ctx->metrics.quic_send_result_cnt[FD_METRICS_ENUM_TXN_QUIC_SEND_RESULT_V_SUCCESS_IDX]++;
221 :
222 0 : fd_quic_stream_send( stream, payload, payload_sz, 1 );
223 0 : }
224 :
225 :
226 : /* handle_new_contact_info handles a new contact. Validates contact info
227 : and starts/restarts a connection if necessary. */
228 : static inline void
229 : handle_new_contact_info( fd_send_tile_ctx_t * ctx,
230 0 : fd_shred_dest_wire_t * contact ) {
231 0 : uint new_ip = contact->ip4_addr;
232 0 : ushort new_port = contact->udp_port;
233 0 : if( FD_UNLIKELY( new_ip==0 || new_port==0 ) ) {
234 0 : ctx->metrics.new_contact_info[FD_METRICS_ENUM_NEW_CONTACT_OUTCOME_V_UNROUTABLE_IDX]++;
235 0 : return;
236 0 : }
237 :
238 0 : fd_send_conn_entry_t * entry = fd_send_conn_map_query( ctx->conn_map, *contact->pubkey, NULL );
239 0 : if( FD_UNLIKELY( !entry ) ) {
240 : /* Skip if UNSTAKED */
241 0 : FD_LOG_DEBUG(("send_tile: Skipping unstaked pubkey %s at %u.%u.%u.%u:%u", FD_BASE58_ENC_32_ALLOCA( contact->pubkey->key ), new_ip&0xFF, (new_ip>>8)&0xFF, (new_ip>>16)&0xFF, (new_ip>>24)&0xFF, new_port));
242 0 : ctx->metrics.new_contact_info[FD_METRICS_ENUM_NEW_CONTACT_OUTCOME_V_UNSTAKED_IDX]++;
243 0 : return;
244 0 : }
245 :
246 0 : int info_changed = (entry->ip4_addr != new_ip) | (entry->udp_port != new_port);
247 0 : entry->ip4_addr = new_ip;
248 0 : entry->udp_port = new_port;
249 :
250 0 : if( entry->conn && !info_changed ) {
251 0 : ctx->metrics.new_contact_info[FD_METRICS_ENUM_NEW_CONTACT_OUTCOME_V_NO_CHANGE_IDX]++;
252 0 : return;
253 0 : }
254 :
255 0 : if( FD_UNLIKELY( entry->conn && info_changed ) ) {
256 : /* close conn, will restart with new contact info */
257 0 : fd_quic_conn_close( entry->conn, 0 );
258 0 : entry->conn = NULL;
259 0 : ctx->metrics.new_contact_info[FD_METRICS_ENUM_NEW_CONTACT_OUTCOME_V_CHANGED_IDX]++;
260 0 : }
261 :
262 0 : entry->last_ci_ticks = ctx->now;
263 0 : quic_connect( ctx, entry );
264 0 : ctx->metrics.new_contact_info[FD_METRICS_ENUM_NEW_CONTACT_OUTCOME_V_CONNECT_IDX]++;
265 0 : return;
266 0 : }
267 :
268 : static inline void
269 0 : finalize_new_cluster_contact_info( fd_send_tile_ctx_t * ctx ) {
270 0 : for( ulong i=0UL; i<ctx->contact_cnt; i++ ) {
271 0 : handle_new_contact_info( ctx, &ctx->contact_buf[i] );
272 0 : }
273 0 : }
274 :
275 : /* Called during after_frag for stake messages. */
276 : static void
277 0 : finalize_stake_msg( fd_send_tile_ctx_t * ctx ) {
278 :
279 0 : fd_multi_epoch_leaders_stake_msg_fini( ctx->mleaders );
280 :
281 : /* Get the current stake destinations */
282 0 : fd_stake_weight_t const * stakes = fd_multi_epoch_leaders_get_stake_weights( ctx->mleaders );
283 0 : ulong stake_cnt = fd_multi_epoch_leaders_get_stake_cnt( ctx->mleaders );
284 0 : if( FD_UNLIKELY( !stakes ) ) {
285 0 : FD_LOG_WARNING(( "No stake destinations available for current slot" ));
286 0 : return;
287 0 : }
288 :
289 : /* populate staked validators in connection map */
290 0 : for( ulong i=0UL; i<stake_cnt; i++ ) {
291 0 : fd_stake_weight_t const * stake_info = &stakes[i];
292 0 : fd_pubkey_t const pubkey = stake_info->key;
293 :
294 0 : fd_send_conn_entry_t * entry = fd_send_conn_map_query( ctx->conn_map, pubkey, NULL );
295 : /* UNSTAKED -> NO_CONN: create new entry in NO_CONN state */
296 0 : if( FD_UNLIKELY( !entry ) ) {
297 0 : FD_LOG_DEBUG(("send_tile: creating new entry for pubkey %s", FD_BASE58_ENC_32_ALLOCA( pubkey.key )));
298 0 : entry = fd_send_conn_map_insert( ctx->conn_map, pubkey );
299 0 : entry->conn = NULL;
300 0 : }
301 0 : }
302 0 : }
303 :
304 : /* Stem callbacks */
305 :
306 : static inline void
307 : before_credit( fd_send_tile_ctx_t * ctx,
308 : fd_stem_context_t * stem,
309 0 : int * charge_busy ) {
310 0 : ctx->stem = stem;
311 :
312 0 : ctx->now = fd_tickcount();
313 :
314 : /* Publishes to mcache via callbacks */
315 0 : *charge_busy = fd_quic_service( ctx->quic );
316 0 : }
317 :
318 : static void
319 : during_frag( fd_send_tile_ctx_t * ctx,
320 : ulong in_idx,
321 : ulong seq FD_PARAM_UNUSED,
322 : ulong sig FD_PARAM_UNUSED,
323 : ulong chunk,
324 : ulong sz,
325 0 : ulong ctl ) {
326 :
327 0 : fd_send_link_in_t * in_link = &ctx->in_links[ in_idx ];
328 0 : if( FD_UNLIKELY( chunk<in_link->chunk0 || chunk>in_link->wmark ) ) {
329 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu] on link %lu", chunk, sz, in_link->chunk0, in_link->wmark, in_idx ));
330 0 : }
331 :
332 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( in_link->mem, chunk );
333 0 : ulong kind = in_link->kind;
334 :
335 0 : if( FD_UNLIKELY( kind==IN_KIND_NET ) ) {
336 0 : void const * src = fd_net_rx_translate_frag( &ctx->net_in_bounds, chunk, ctl, sz );
337 0 : fd_memcpy( ctx->quic_buf, src, sz );
338 0 : }
339 :
340 0 : if( FD_UNLIKELY( kind==IN_KIND_STAKE ) ) {
341 0 : if( sz>sizeof(fd_stake_weight_t)*(MAX_STAKED_LEADERS+1UL) ) {
342 0 : FD_LOG_ERR(( "sz %lu >= max expected stake update size %lu", sz, sizeof(fd_stake_weight_t) * (MAX_STAKED_LEADERS+1UL) ));
343 0 : }
344 0 : fd_multi_epoch_leaders_stake_msg_init( ctx->mleaders, fd_type_pun_const( dcache_entry ) );
345 0 : }
346 :
347 0 : if( FD_UNLIKELY( kind==IN_KIND_GOSSIP ) ) {
348 0 : if( sz>sizeof(fd_shred_dest_wire_t)*MAX_STAKED_LEADERS ) {
349 0 : FD_LOG_ERR(( "sz %lu >= max expected gossip update size %lu", sz, sizeof(fd_shred_dest_wire_t) * MAX_STAKED_LEADERS ));
350 0 : }
351 0 : ctx->contact_cnt = sz / sizeof(fd_shred_dest_wire_t);
352 0 : fd_memcpy( ctx->contact_buf, dcache_entry, sz );
353 0 : }
354 :
355 0 : if( FD_UNLIKELY( kind==IN_KIND_TOWER ) ) {
356 0 : if( sz!=sizeof(fd_txn_p_t) ) {
357 0 : FD_LOG_ERR(( "sz %lu != expected txn size %lu", sz, sizeof(fd_txn_p_t) ));
358 0 : }
359 0 : fd_memcpy( ctx->txn_buf, dcache_entry, sz );
360 0 : }
361 0 : }
362 :
363 : static void
364 : after_frag( fd_send_tile_ctx_t * ctx,
365 : ulong in_idx,
366 : ulong seq FD_PARAM_UNUSED,
367 : ulong sig,
368 : ulong sz,
369 : ulong tsorig FD_PARAM_UNUSED,
370 : ulong tspub FD_PARAM_UNUSED,
371 0 : fd_stem_context_t * stem ) {
372 :
373 0 : ctx->stem = stem;
374 :
375 0 : fd_send_link_in_t * in_link = &ctx->in_links[ in_idx ];
376 0 : ulong kind = in_link->kind;
377 :
378 0 : if( FD_UNLIKELY( kind==IN_KIND_NET ) ) {
379 0 : uchar * ip_pkt = ctx->quic_buf + sizeof(fd_eth_hdr_t);
380 0 : ulong ip_sz = sz - sizeof(fd_eth_hdr_t);
381 0 : fd_quic_t * quic = ctx->quic;
382 :
383 0 : long dt = -fd_tickcount();
384 0 : fd_quic_process_packet( quic, ip_pkt, ip_sz );
385 0 : dt += fd_tickcount();
386 0 : fd_histf_sample( quic->metrics.receive_duration, (ulong)dt );
387 0 : }
388 :
389 0 : if( FD_UNLIKELY( kind==IN_KIND_TOWER ) ) {
390 :
391 0 : fd_txn_p_t * txn = (fd_txn_p_t *)fd_type_pun(ctx->txn_buf);
392 :
393 : /* sign the txn */
394 0 : uchar * signature = txn->payload + TXN(txn)->signature_off;
395 0 : uchar * message = txn->payload + TXN(txn)->message_off;
396 0 : ulong message_sz = txn->payload_sz - TXN(txn)->message_off;
397 0 : fd_keyguard_client_sign( ctx->keyguard_client, signature, message, message_sz, FD_KEYGUARD_SIGN_TYPE_ED25519 );
398 :
399 0 : ulong poh_slot = sig;
400 :
401 : /* send to leader for next few slots */
402 0 : for( ulong i=0UL; i<SEND_TO_LEADER_CNT; i++ ) {
403 0 : fd_pubkey_t const * leader = fd_multi_epoch_leaders_get_leader_for_slot( ctx->mleaders, poh_slot );
404 0 : if( FD_LIKELY( leader ) ) {
405 0 : quic_send( ctx, leader, txn->payload, txn->payload_sz );
406 0 : } else {
407 0 : ctx->metrics.leader_not_found++;
408 0 : FD_LOG_DEBUG(("send_tile: Failed to get leader contact"));
409 0 : }
410 0 : }
411 :
412 : /* send to gossip and dedup */
413 0 : fd_send_link_out_t * gossip_verify_out = ctx->gossip_verify_out;
414 0 : uchar * msg_to_gossip = fd_chunk_to_laddr( gossip_verify_out->mem, gossip_verify_out->chunk );
415 0 : fd_memcpy( msg_to_gossip, txn->payload, txn->payload_sz );
416 0 : fd_stem_publish( stem, gossip_verify_out->idx, 1UL, gossip_verify_out->chunk, txn->payload_sz, 0UL, 0, 0 );
417 0 : gossip_verify_out->chunk = fd_dcache_compact_next( gossip_verify_out->chunk, txn->payload_sz, gossip_verify_out->chunk0,
418 0 : gossip_verify_out->wmark );
419 0 : }
420 :
421 0 : if( FD_UNLIKELY( kind==IN_KIND_GOSSIP ) ) {
422 0 : finalize_new_cluster_contact_info( ctx );
423 0 : return;
424 0 : }
425 :
426 0 : if( FD_UNLIKELY( kind==IN_KIND_STAKE ) ) {
427 0 : finalize_stake_msg( ctx );
428 0 : return;
429 0 : }
430 0 : }
431 :
432 : static void
433 : privileged_init( fd_topo_t * topo,
434 0 : fd_topo_tile_t * tile ) {
435 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
436 :
437 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
438 0 : fd_send_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_send_tile_ctx_t), sizeof(fd_send_tile_ctx_t) );
439 :
440 : /* identity not used yet, but will be soon to sign quic txns */
441 0 : if( FD_UNLIKELY( !strcmp( tile->send.identity_key_path, "" ) ) )
442 0 : FD_LOG_ERR(( "identity_key_path not set" ));
443 :
444 0 : ctx->identity_key[ 0 ] = *(fd_pubkey_t const *)fd_type_pun_const( fd_keyload_load( tile->send.identity_key_path, /* pubkey only: */ 1 ) );
445 0 : }
446 :
447 : static fd_send_link_in_t *
448 : setup_input_link( fd_send_tile_ctx_t * ctx,
449 : fd_topo_t * topo,
450 : fd_topo_tile_t * tile,
451 : ulong kind,
452 0 : const char * name ) {
453 0 : ulong in_idx = fd_topo_find_tile_in_link( topo, tile, name, 0 );
454 0 : FD_TEST( in_idx!=ULONG_MAX );
455 0 : fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ in_idx ] ];
456 0 : fd_send_link_in_t * in_link_desc = &ctx->in_links[ in_idx ];
457 0 : in_link_desc->mem = topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ].wksp;
458 0 : in_link_desc->chunk0 = fd_dcache_compact_chunk0( in_link_desc->mem, in_link->dcache );
459 0 : in_link_desc->wmark = fd_dcache_compact_wmark( in_link_desc->mem, in_link->dcache, in_link->mtu );
460 0 : in_link_desc->dcache = in_link->dcache;
461 0 : in_link_desc->kind = kind;
462 0 : return in_link_desc;
463 0 : }
464 :
465 : static void
466 : setup_output_link( fd_send_link_out_t * desc,
467 : fd_topo_t * topo,
468 : fd_topo_tile_t * tile,
469 0 : const char * name ) {
470 0 : ulong out_idx = fd_topo_find_tile_out_link( topo, tile, name, 0 );
471 0 : FD_TEST( out_idx!=ULONG_MAX );
472 0 : fd_topo_link_t * out_link = &topo->links[ tile->out_link_id[ out_idx ] ];
473 0 : desc->idx = out_idx;
474 0 : desc->mcache = out_link->mcache;
475 0 : desc->sync = fd_mcache_seq_laddr( desc->mcache );
476 0 : desc->depth = fd_mcache_depth( desc->mcache );
477 0 : desc->mem = topo->workspaces[ topo->objs[ out_link->dcache_obj_id ].wksp_id ].wksp;
478 0 : desc->chunk0 = fd_dcache_compact_chunk0( desc->mem, out_link->dcache );
479 0 : desc->wmark = fd_dcache_compact_wmark( desc->mem, out_link->dcache, out_link->mtu );
480 0 : desc->chunk = desc->chunk0;
481 0 : }
482 :
483 : static void
484 : unprivileged_init( fd_topo_t * topo,
485 0 : fd_topo_tile_t * tile ) {
486 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
487 :
488 0 : if( FD_UNLIKELY( !tile->out_cnt ) ) FD_LOG_ERR(( "send has no primary output link" ));
489 :
490 : /* Scratch mem setup */
491 :
492 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
493 0 : fd_send_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_send_tile_ctx_t), sizeof(fd_send_tile_ctx_t) );
494 0 : fd_memset( ctx, 0, sizeof(fd_send_tile_ctx_t) );
495 :
496 0 : ctx->mleaders = fd_multi_epoch_leaders_join( fd_multi_epoch_leaders_new( ctx->mleaders_mem ) );
497 0 : FD_TEST( ctx->mleaders );
498 :
499 0 : if( FD_UNLIKELY( getrandom( ctx->tls_priv_key, ED25519_PRIV_KEY_SZ, 0 )!=ED25519_PRIV_KEY_SZ ) ) {
500 0 : FD_LOG_ERR(( "getrandom failed (%i-%s)", errno, fd_io_strerror( errno ) ));
501 0 : }
502 0 : fd_sha512_t * sha512 = fd_sha512_join( fd_sha512_new( ctx->sha512 ) );
503 0 : fd_ed25519_public_from_private( ctx->tls_pub_key, ctx->tls_priv_key, sha512 );
504 0 : fd_sha512_leave( sha512 );
505 :
506 0 : fd_aio_t * quic_tx_aio = fd_aio_join( fd_aio_new( ctx->quic_tx_aio, ctx, quic_tx_aio_send ) );
507 0 : if( FD_UNLIKELY( !quic_tx_aio ) ) FD_LOG_ERR(( "fd_aio_join failed" ));
508 :
509 0 : fd_quic_t * quic = fd_quic_join( fd_quic_new( FD_SCRATCH_ALLOC_APPEND( l, fd_quic_align(), fd_quic_footprint( &quic_limits ) ), &quic_limits ) );
510 0 : if( FD_UNLIKELY( !quic ) ) FD_LOG_ERR(( "fd_quic_new failed" ));
511 :
512 0 : quic->config.role = FD_QUIC_ROLE_CLIENT;
513 0 : quic->config.idle_timeout = QUIC_IDLE_TIMEOUT_NS;
514 0 : quic->config.ack_delay = QUIC_ACK_DELAY_NS;
515 0 : quic->config.keep_alive = 1;
516 0 : quic->config.sign = quic_tls_cv_sign;
517 0 : quic->config.sign_ctx = ctx;
518 0 : fd_memcpy( quic->config.identity_public_key, ctx->tls_pub_key, ED25519_PUB_KEY_SZ );
519 :
520 0 : quic->cb.conn_hs_complete = quic_hs_complete;
521 0 : quic->cb.conn_final = quic_conn_final;
522 0 : quic->cb.now = quic_now;
523 0 : quic->cb.now_ctx = ctx;
524 0 : quic->cb.quic_ctx = ctx;
525 :
526 0 : fd_quic_set_aio_net_tx( quic, quic_tx_aio );
527 0 : fd_quic_set_clock_tickcount( quic );
528 0 : FD_TEST_CUSTOM( fd_quic_init( quic ), "fd_quic_init failed" );
529 :
530 0 : fd_histf_join( fd_histf_new( quic->metrics.service_duration, FD_MHIST_SECONDS_MIN( SEND, SERVICE_DURATION_SECONDS ),
531 0 : FD_MHIST_SECONDS_MAX( SEND, SERVICE_DURATION_SECONDS ) ) );
532 0 : fd_histf_join( fd_histf_new( quic->metrics.receive_duration, FD_MHIST_SECONDS_MIN( SEND, RECEIVE_DURATION_SECONDS ),
533 0 : FD_MHIST_SECONDS_MAX( SEND, RECEIVE_DURATION_SECONDS ) ) );
534 :
535 0 : ctx->quic = quic;
536 :
537 : /* Initialize connection map */
538 0 : void * conn_map_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_send_conn_map_align(), fd_send_conn_map_footprint() );
539 0 : ctx->conn_map = fd_send_conn_map_join( fd_send_conn_map_new( conn_map_mem ) );
540 0 : if( FD_UNLIKELY( !ctx->conn_map ) ) FD_LOG_ERR(( "fd_send_conn_map_join failed" ));
541 :
542 0 : ctx->src_ip_addr = tile->send.ip_addr;
543 0 : ctx->src_port = tile->send.send_src_port;
544 0 : fd_ip4_udp_hdr_init( ctx->packet_hdr, FD_TXN_MTU, ctx->src_ip_addr, ctx->src_port );
545 :
546 0 : setup_input_link( ctx, topo, tile, IN_KIND_GOSSIP, "gossip_send" );
547 0 : setup_input_link( ctx, topo, tile, IN_KIND_STAKE, "stake_out" );
548 0 : setup_input_link( ctx, topo, tile, IN_KIND_TOWER, "tower_send" );
549 :
550 0 : fd_send_link_in_t * net_in = setup_input_link( ctx, topo, tile, IN_KIND_NET, "net_send" );
551 0 : fd_net_rx_bounds_init( &ctx->net_in_bounds, net_in->dcache );
552 :
553 0 : setup_output_link( ctx->gossip_verify_out, topo, tile, "send_txns" );
554 0 : setup_output_link( ctx->net_out, topo, tile, "send_net" );
555 :
556 : /* Set up keyguard(s) */
557 0 : ulong sign_in_idx = fd_topo_find_tile_in_link( topo, tile, "sign_send", 0 );
558 0 : ulong sign_out_idx = fd_topo_find_tile_out_link( topo, tile, "send_sign", 0 );
559 0 : fd_topo_link_t * sign_in = &topo->links[ tile->in_link_id[ sign_in_idx ] ];
560 0 : fd_topo_link_t * sign_out = &topo->links[ tile->out_link_id[ sign_out_idx ] ];
561 :
562 0 : if ( fd_keyguard_client_join( fd_keyguard_client_new( ctx->keyguard_client,
563 0 : sign_out->mcache,
564 0 : sign_out->dcache,
565 0 : sign_in->mcache,
566 0 : sign_in->dcache ) )==NULL ) {
567 0 : FD_LOG_ERR(( "Keyguard join failed" ));
568 0 : }
569 :
570 : /* init metrics */
571 0 : fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
572 :
573 0 : ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
574 0 : if( FD_UNLIKELY( scratch_top != (ulong)scratch + scratch_footprint( tile ) ) ) {
575 0 : FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
576 0 : }
577 0 : }
578 :
579 : static ulong
580 : populate_allowed_seccomp( fd_topo_t const * topo FD_PARAM_UNUSED,
581 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
582 : ulong out_cnt,
583 0 : struct sock_filter * out ) {
584 :
585 0 : populate_sock_filter_policy_fd_send_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() );
586 0 : return sock_filter_policy_fd_send_tile_instr_cnt;
587 0 : }
588 :
589 : static ulong
590 : populate_allowed_fds( fd_topo_t const * topo,
591 : fd_topo_tile_t const * tile,
592 : ulong out_fds_cnt,
593 0 : int * out_fds ) {
594 0 : (void)topo;
595 0 : (void)tile;
596 :
597 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
598 :
599 0 : ulong out_cnt = 0;
600 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
601 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
602 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
603 0 : return out_cnt;
604 0 : }
605 :
606 : static void
607 0 : metrics_write( fd_send_tile_ctx_t * ctx ) {
608 :
609 0 : FD_MCNT_SET( SEND, LEADER_NOT_FOUND, ctx->metrics.leader_not_found );
610 0 : FD_MCNT_SET( SEND, CONTACT_STALE, ctx->metrics.contact_stale );
611 0 : FD_MCNT_SET( SEND, QUIC_CONN_CREATE_FAILED, ctx->metrics.quic_conn_create_failed );
612 :
613 0 : FD_MCNT_ENUM_COPY( SEND, NEW_CONTACT_INFO, ctx->metrics.new_contact_info );
614 0 : FD_MCNT_ENUM_COPY( SEND, QUIC_SEND_RESULT, ctx->metrics.quic_send_result_cnt );
615 :
616 : /* QUIC metrics */
617 0 : FD_MCNT_SET( SEND, RECEIVED_BYTES, ctx->quic->metrics.net_rx_byte_cnt );
618 0 : FD_MCNT_ENUM_COPY( SEND, RECEIVED_FRAMES, ctx->quic->metrics.frame_rx_cnt );
619 0 : FD_MCNT_SET( SEND, RECEIVED_PACKETS, ctx->quic->metrics.net_rx_pkt_cnt );
620 0 : FD_MCNT_SET( SEND, STREAM_RECEIVED_BYTES, ctx->quic->metrics.stream_rx_byte_cnt );
621 0 : FD_MCNT_SET( SEND, STREAM_RECEIVED_EVENTS, ctx->quic->metrics.stream_rx_event_cnt );
622 :
623 0 : FD_MCNT_SET( SEND, SENT_PACKETS, ctx->quic->metrics.net_tx_pkt_cnt );
624 0 : FD_MCNT_SET( SEND, SENT_BYTES, ctx->quic->metrics.net_tx_byte_cnt );
625 0 : FD_MCNT_SET( SEND, RETRY_SENT, ctx->quic->metrics.retry_tx_cnt );
626 0 : FD_MCNT_ENUM_COPY( SEND, ACK_TX, ctx->quic->metrics.ack_tx );
627 :
628 0 : FD_MGAUGE_SET( SEND, CONNECTIONS_ACTIVE, ctx->quic->metrics.conn_active_cnt );
629 0 : FD_MCNT_SET( SEND, CONNECTIONS_CREATED, ctx->quic->metrics.conn_created_cnt );
630 0 : FD_MCNT_SET( SEND, CONNECTIONS_CLOSED, ctx->quic->metrics.conn_closed_cnt );
631 0 : FD_MCNT_SET( SEND, CONNECTIONS_ABORTED, ctx->quic->metrics.conn_aborted_cnt );
632 0 : FD_MCNT_SET( SEND, CONNECTIONS_TIMED_OUT, ctx->quic->metrics.conn_timeout_cnt );
633 0 : FD_MCNT_SET( SEND, CONNECTIONS_RETRIED, ctx->quic->metrics.conn_retry_cnt );
634 0 : FD_MCNT_SET( SEND, CONNECTION_ERROR_NO_SLOTS, ctx->quic->metrics.conn_err_no_slots_cnt );
635 0 : FD_MCNT_SET( SEND, CONNECTION_ERROR_RETRY_FAIL, ctx->quic->metrics.conn_err_retry_fail_cnt );
636 :
637 0 : FD_MCNT_ENUM_COPY( SEND, PKT_CRYPTO_FAILED, ctx->quic->metrics.pkt_decrypt_fail_cnt );
638 0 : FD_MCNT_ENUM_COPY( SEND, PKT_NO_KEY, ctx->quic->metrics.pkt_no_key_cnt );
639 0 : FD_MCNT_SET( SEND, PKT_NO_CONN, ctx->quic->metrics.pkt_no_conn_cnt );
640 0 : FD_MCNT_ENUM_COPY( SEND, FRAME_TX_ALLOC, ctx->quic->metrics.frame_tx_alloc_cnt );
641 0 : FD_MCNT_SET( SEND, PKT_NET_HEADER_INVALID, ctx->quic->metrics.pkt_net_hdr_err_cnt );
642 0 : FD_MCNT_SET( SEND, PKT_QUIC_HEADER_INVALID, ctx->quic->metrics.pkt_quic_hdr_err_cnt );
643 0 : FD_MCNT_SET( SEND, PKT_UNDERSZ, ctx->quic->metrics.pkt_undersz_cnt );
644 0 : FD_MCNT_SET( SEND, PKT_OVERSZ, ctx->quic->metrics.pkt_oversz_cnt );
645 0 : FD_MCNT_SET( SEND, PKT_VERNEG, ctx->quic->metrics.pkt_verneg_cnt );
646 0 : FD_MCNT_SET( SEND, PKT_RETRANSMISSIONS, ctx->quic->metrics.pkt_retransmissions_cnt );
647 :
648 0 : FD_MCNT_SET( SEND, HANDSHAKES_CREATED, ctx->quic->metrics.hs_created_cnt );
649 0 : FD_MCNT_SET( SEND, HANDSHAKE_ERROR_ALLOC_FAIL, ctx->quic->metrics.hs_err_alloc_fail_cnt );
650 0 : FD_MCNT_SET( SEND, HANDSHAKE_EVICTED, ctx->quic->metrics.hs_evicted_cnt );
651 :
652 0 : FD_MCNT_SET( SEND, FRAME_FAIL_PARSE, ctx->quic->metrics.frame_rx_err_cnt );
653 :
654 0 : FD_MHIST_COPY( SEND, SERVICE_DURATION_SECONDS, ctx->quic->metrics.service_duration );
655 0 : FD_MHIST_COPY( SEND, RECEIVE_DURATION_SECONDS, ctx->quic->metrics.receive_duration );
656 0 : }
657 :
658 :
659 0 : #define STEM_BURST 1UL /* send_txns */
660 :
661 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_send_tile_ctx_t
662 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_send_tile_ctx_t)
663 :
664 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
665 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
666 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
667 0 : #define STEM_CALLBACK_BEFORE_CREDIT before_credit
668 : #include "../../disco/stem/fd_stem.c"
669 :
670 : fd_topo_run_tile_t fd_tile_send = {
671 : .name = "send",
672 : .populate_allowed_seccomp = populate_allowed_seccomp,
673 : .populate_allowed_fds = populate_allowed_fds,
674 : .scratch_align = scratch_align,
675 : .scratch_footprint = scratch_footprint,
676 : .privileged_init = privileged_init,
677 : .unprivileged_init = unprivileged_init,
678 : .run = stem_run,
679 : };
|