Line data Source code
1 : #ifndef HEADER_fd_src_disco_bundle_fd_bundle_tile_private_h 2 : #define HEADER_fd_src_disco_bundle_fd_bundle_tile_private_h 3 : 4 : #include "fd_bundle_auth.h" 5 : #include "fd_keepalive.h" 6 : #include "../stem/fd_stem.h" 7 : #include "../keyguard/fd_keyswitch.h" 8 : #include "../keyguard/fd_keyguard_client.h" 9 : #include "../../waltz/grpc/fd_grpc_client.h" 10 : #include "../../waltz/resolv/fd_netdb.h" 11 : #include "../../waltz/fd_rtt_est.h" 12 : #include "../../util/alloc/fd_alloc.h" 13 : #include "../../util/hist/fd_histf.h" 14 : 15 : #define FD_BUNDLE_CLIENT_MAX_TXN_PER_BUNDLE (5UL) 16 : 17 : /* Pending transaction buffer. gRPC callbacks push decoded transactions 18 : here. after_credit drains one bundle per call by writing to dcache 19 : and calling fd_stem_publish. 20 : 21 : Sized to match the bundle_verif output link depth. */ 22 : 23 : struct fd_bundle_pending_txn { 24 : uchar payload[ FD_TXN_MTU ]; 25 : ushort payload_sz; 26 : uint source_ipv4; 27 : ulong sig; 28 : ulong bundle_seq; 29 : ulong bundle_txn_cnt; 30 : uchar commission; 31 : uchar commission_pubkey[ 32 ]; 32 : }; 33 : 34 : typedef struct fd_bundle_pending_txn fd_bundle_pending_txn_t; 35 : 36 : #define DEQUE_NAME pending_txn 37 180 : #define DEQUE_T fd_bundle_pending_txn_t 38 : #include "../../util/tmpl/fd_deque_dynamic.c" 39 : 40 : /* Returns true if the drain loop should continue after popping an 41 : entry. Bundles drain atomically (all txns with matching bundle_seq). 42 : Packets drain up to burst consecutive entries. */ 43 : 44 : static inline int 45 : fd_bundle_drain_continue( fd_bundle_pending_txn_t * txns, 46 : ulong drain_sig, 47 : ulong drain_seq, 48 : ulong drain_cnt, 49 132 : ulong burst ) { 50 132 : if( pending_txn_empty( txns ) ) return 0; 51 108 : if( drain_sig==1UL ) return pending_txn_peek_head( txns )->bundle_seq==drain_seq; 52 54 : return drain_cnt<burst && pending_txn_peek_head( txns )->sig==0UL; 53 108 : } 54 : 55 : #if FD_HAS_OPENSSL 56 : #include <openssl/ssl.h> /* SSL_CTX */ 57 : #endif 58 : 59 : struct fd_bundle_out_ctx { 60 : ulong idx; 61 : fd_wksp_t * mem; 62 : ulong chunk0; 63 : ulong wmark; 64 : ulong chunk; 65 : }; 66 : 67 : typedef struct fd_bundle_out_ctx fd_bundle_out_ctx_t; 68 : 69 : /* fd_bundle_metrics_t contains private metric counters. These get 70 : published to fd_metrics periodically. */ 71 : 72 : struct fd_bundle_metrics { 73 : ulong txn_received_cnt; 74 : ulong bundle_received_cnt; 75 : ulong packet_received_cnt; 76 : ulong proto_received_bytes; 77 : ulong shredstream_heartbeat_cnt; 78 : ulong ping_ack_cnt; 79 : 80 : ulong decode_fail_cnt; 81 : ulong transport_fail_cnt; 82 : ulong missing_builder_info_fail_cnt; 83 : ulong backpressure_drop_cnt; 84 : 85 : fd_histf_t msg_rx_delay[1]; 86 : }; 87 : 88 : typedef struct fd_bundle_metrics fd_bundle_metrics_t; 89 : 90 : /* fd_bundle_tile_t is the context object provided to callbacks from 91 : stem, and contains all state needed to progress the tile. */ 92 : 93 : struct fd_bundle_tile { 94 : /* Key switch */ 95 : fd_keyswitch_t * keyswitch; 96 : 97 : /* Key guard */ 98 : fd_keyguard_client_t keyguard_client[1]; 99 : 100 : uint is_ssl : 1; 101 : int keylog_fd; 102 : # if FD_HAS_OPENSSL 103 : /* OpenSSL */ 104 : SSL_CTX * ssl_ctx; 105 : SSL * ssl; 106 : fd_alloc_t * ssl_alloc; 107 : # endif /* FD_HAS_OPENSSL */ 108 : 109 : /* Config */ 110 : char server_fqdn[ 256 ]; /* cstr */ 111 : ulong server_fqdn_len; 112 : char server_sni[ 256 ]; /* cstr */ 113 : ulong server_sni_len; 114 : ushort server_tcp_port; 115 : 116 : /* Resolver */ 117 : fd_netdb_fds_t netdb_fds[1]; 118 : uint server_ip4_addr; /* last DNS lookup result */ 119 : 120 : /* TCP socket */ 121 : int tcp_sock; 122 : int so_rcvbuf; 123 : uint tcp_sock_connected : 1; 124 : uint defer_reset : 1; 125 : long cached_ts; 126 : 127 : /* Keepalive via HTTP/2 PINGs (randomized) */ 128 : long keepalive_interval; 129 : fd_keepalive_t keepalive[1]; 130 : fd_rtt_estimate_t rtt[1]; 131 : 132 : /* gRPC client */ 133 : void * grpc_client_mem; 134 : ulong grpc_buf_max; 135 : fd_grpc_client_t * grpc_client; 136 : fd_grpc_client_metrics_t grpc_metrics[1]; 137 : ulong map_seed; 138 : 139 : /* Bundle authenticator */ 140 : fd_bundle_auther_t auther; 141 : 142 : /* Bundle block builder info */ 143 : uchar builder_pubkey[ 32 ]; 144 : uchar builder_commission; /* in [0,100] (percent) */ 145 : uchar builder_info_avail : 1; /* Block builder info available? (potentially stale) */ 146 : uchar builder_info_wait : 1; /* Request already in-flight? */ 147 : long builder_info_valid_until; 148 : 149 : /* Bundle subscriptions */ 150 : uchar packet_subscription_live : 1; /* Want to subscribe to a stream? */ 151 : uchar packet_subscription_wait : 1; /* Request already in-flight? */ 152 : uchar bundle_subscription_live : 1; 153 : uchar bundle_subscription_wait : 1; 154 : 155 : /* Bundle state */ 156 : ulong bundle_seq; 157 : ulong bundle_txn_cnt; 158 : 159 : /* Error backoff */ 160 : fd_rng_t rng[1]; 161 : uint backoff_iter; 162 : long backoff_until; 163 : long backoff_reset; 164 : 165 : /* Stem publish */ 166 : fd_stem_context_t * stem; 167 : fd_bundle_out_ctx_t verify_out; 168 : fd_bundle_out_ctx_t plugin_out; 169 : fd_bundle_pending_txn_t * pending_txns; 170 : 171 : /* App metrics */ 172 : fd_bundle_metrics_t metrics; 173 : 174 : /* Check engine light */ 175 : uchar bundle_status_recent; /* most recently observed 'check engine light' */ 176 : uchar bundle_status_plugin; /* last 'plugin' update written */ 177 : uchar bundle_status_logged; 178 : long last_bundle_status_log_nanos; 179 : 180 : ulong next_leader_slot; /* from replay_out reset messages, or ULONG_MAX */ 181 : ulong reset_slot; /* from replay_out reset messages, or ULONG_MAX */ 182 : int sleep_mode; /* 1 means sleeping, 0 means connecting/connected */ 183 : long sleep_check_ns; /* next wallclock time to re-evaluate sleeping */ 184 : 185 : /* Staged values from during_frag, committed in after_frag */ 186 : ulong next_leader_slot_staged; 187 : ulong reset_slot_staged; 188 : 189 : int in_kind[ 64 ]; 190 : struct { 191 : fd_wksp_t * mem; 192 : ulong chunk0; 193 : ulong wmark; 194 : } replay_in; 195 : }; 196 : 197 : typedef struct fd_bundle_tile fd_bundle_tile_t; 198 : 199 : /* Define 'request_ctx' IDs to identify different types of gRPC calls */ 200 : 201 102 : #define FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets 4 202 138 : #define FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles 5 203 36 : #define FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo 6 204 : 205 : FD_PROTOTYPES_BEGIN 206 : 207 : /* fd_bundle_now is an externally linked function wrapping 208 : fd_log_wallclock. This is backed by a weak symbol, allowing tests to 209 : override the clock source. */ 210 : 211 : long 212 : fd_bundle_now( void ); 213 : 214 : /* fd_bundle_client_grpc_callbacks provides callbacks for grpc_client. */ 215 : 216 : extern fd_grpc_client_callbacks_t fd_bundle_client_grpc_callbacks; 217 : 218 : /* fd_bundle_client_step is an all-in-one routine to drive client logic. 219 : As long as the tile calls this periodically, the client will 220 : reconnect to the bundle server, authenticate, and subscribe to 221 : packets and bundles. */ 222 : 223 : void 224 : fd_bundle_client_step( fd_bundle_tile_t * bundle, 225 : int * charge_busy ); 226 : 227 : /* fd_bundle_client_step_reconnect drives the 'reconnect' state machine. 228 : Once the HTTP/2 conn is established (SETTINGS exchanged), this 229 : function drives the auth logic, requests block builder info, sets up 230 : packet and bundle subscriptions, and PINGs. */ 231 : 232 : int 233 : fd_bundle_client_step_reconnect( fd_bundle_tile_t * ctx, 234 : long now ); 235 : 236 : /* fd_bundle_tile_backoff is called whenever an error occurs. Stalls 237 : forward progress for a randomized amount of time to prevent error 238 : floods. */ 239 : 240 : void 241 : fd_bundle_tile_backoff( fd_bundle_tile_t * ctx, 242 : long now ); 243 : 244 : /* fd_bundle_tile_should_stall returns 1 if forward progress should be 245 : temporarily prevented due to an error. */ 246 : 247 : FD_FN_PURE static inline int 248 : fd_bundle_tile_should_stall( fd_bundle_tile_t const * ctx, 249 60 : long now ) { 250 60 : return now < ctx->backoff_until; 251 60 : } 252 : 253 : /* fd_bundle_tile_housekeeping runs periodically at a low frequency. */ 254 : 255 : void 256 : fd_bundle_tile_housekeeping( fd_bundle_tile_t * ctx ); 257 : 258 : /* fd_bundle_client_grpc_rx_start is the first RX callback of a stream. */ 259 : 260 : void 261 : fd_bundle_client_grpc_rx_start( 262 : void * app_ctx, 263 : ulong request_ctx 264 : ) ; 265 : 266 : /* fd_bundle_client_grpc_rx_msg is called by grpc_client when a gRPC 267 : message arrives (unary or server-streaming response). */ 268 : 269 : void 270 : fd_bundle_client_grpc_rx_msg( 271 : void * app_ctx, /* (fd_bundle_tile_t *) */ 272 : void const * protobuf, 273 : ulong protobuf_sz, 274 : ulong request_ctx /* FD_BUNDLE_CLIENT_REQ_{...} */ 275 : ); 276 : 277 : /* fd_bundle_client_grpc_rx_end is called by grpc_client when a gRPC 278 : server-streaming response finishes. */ 279 : 280 : void 281 : fd_bundle_client_grpc_rx_end( 282 : void * app_ctx, 283 : ulong request_ctx, 284 : fd_grpc_resp_hdrs_t * resp 285 : ); 286 : 287 : /* fd_bundle_client_grpc_rx_timeout is called by grpc_client when a 288 : gRPC request deadline gets exceeded. */ 289 : 290 : void 291 : fd_bundle_client_grpc_rx_timeout( 292 : void * app_ctx, 293 : ulong request_ctx, /* FD_BUNDLE_CLIENT_REQ_{...} */ 294 : int deadline_kind /* FD_GRPC_DEADLINE_{HEADER|RX_END} */ 295 : ); 296 : 297 : /* fd_bundle_client_status provides a "check engine light". 298 : 299 : Returns 0 if the client has recently failed and is currently backing 300 : off from a reconnect attempt. 301 : 302 : Returns 1 if the client is currently reconnecting. 303 : 304 : Returns 2 if all of the following conditions are met: 305 : - TCP socket is alive 306 : - SSL session is not in an error state 307 : - HTTP/2 connection is established (SETTINGS exchange done) 308 : - gRPC bundle and packet subscriptions are live 309 : - HTTP/2 PING exchange was done recently 310 : 311 : Return codes are compatible with FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_{...}. */ 312 : 313 : int 314 : fd_bundle_client_status( fd_bundle_tile_t const * ctx ); 315 : 316 : /* fd_bundle_request_ctx_cstr returns the gRPC method name for a 317 : FD_BUNDLE_CLIENT_REQ_* ID. Returns "unknown" the ID is not 318 : recognized. */ 319 : 320 : FD_FN_CONST char const * 321 : fd_bundle_request_ctx_cstr( ulong request_ctx ); 322 : 323 : /* fd_bundle_client_reset frees all connection-related resources. */ 324 : 325 : void 326 : fd_bundle_client_reset( fd_bundle_tile_t * ctx ); 327 : 328 : /* fd_bundle_client_ping_tx enqueues a PING frame for sending. Returns 329 : 1 on success and 0 on failure (occurs when frame_tx buf is full). */ 330 : 331 : void 332 : fd_bundle_client_send_ping( fd_bundle_tile_t * ctx ); 333 : 334 : FD_PROTOTYPES_END 335 : 336 : #endif /* HEADER_fd_src_disco_bundle_fd_bundle_tile_private_h */