Line data Source code
1 : #ifndef HEADER_fd_src_waltz_grpc_fd_grpc_client_private_h 2 : #define HEADER_fd_src_waltz_grpc_fd_grpc_client_private_h 3 : 4 : #include "fd_grpc_client.h" 5 : #include "../grpc/fd_grpc_codec.h" 6 : #include "../h2/fd_h2.h" 7 : 8 : /* fd_grpc_h2_stream_t holds the state of a gRPC request. */ 9 : 10 : struct fd_grpc_h2_stream { 11 : fd_h2_stream_t s; 12 : 13 : ulong request_ctx; 14 : uint next; 15 : 16 : /* Buffer response headers */ 17 : fd_grpc_resp_hdrs_t hdrs; 18 : 19 : /* Buffer an incoming gRPC message */ 20 : uchar * msg_buf; 21 : ulong msg_buf_max; 22 : uint hdrs_received : 1; 23 : ulong msg_buf_used; /* including header */ 24 : ulong msg_sz; /* size of next message */ 25 : 26 : long header_deadline_nanos; /* deadline to first resp header bit */ 27 : long rx_end_deadline_nanos; /* deadline to end of stream signal */ 28 : uint has_header_deadline : 1; 29 : uint has_rx_end_deadline : 1; 30 : }; 31 : 32 : /* Declare a pool of stream objects. 33 : 34 : While only one stream is used to write requests out to the wire at a 35 : time, a gRPC client might be waiting for multiple responses. */ 36 : 37 : #define POOL_NAME fd_grpc_h2_stream_pool 38 6 : #define POOL_T fd_grpc_h2_stream_t 39 : #define POOL_IDX_T uint 40 : #include "../../util/tmpl/fd_pool.c" 41 : 42 : static inline fd_grpc_h2_stream_t * 43 18 : fd_grpc_h2_stream_upcast( fd_h2_stream_t * stream ) { 44 18 : return (fd_grpc_h2_stream_t *)( (ulong)stream - offsetof(fd_grpc_h2_stream_t, s) ); 45 18 : } 46 : 47 : /* I/O paths 48 : 49 : RX path 50 : - fd_grpc_client_rxtx 51 : - calls fd_h2_rbuf_ssl_read 52 : - calls SSL_read_ex 53 : - calls recv(2) 54 : 55 : TX path 56 : - fd_grpc_client_rxtx 57 : - calls fd_h2_rbuf_ssl_write 58 : - calls SSL_write_ex 59 : - calls send(2) */ 60 : 61 : #include "../../waltz/h2/fd_h2_rbuf_ossl.h" 62 : 63 : /* gRPC client internal state. Quick overview: 64 : 65 : - The client maintains exactly one gRPC connection. 66 : - This conn includes a TCP socket, SSL handle, and a fd_h2 conn 67 : (and accompanying buffers). 68 : - The client object dies when the connection dies. 69 : 70 : - The client manages a small pool of stream objects. 71 : - Each stream has one of 3 states: 72 : - IDLE: marked free in stream_pool 73 : - OPEN: sending request data. marked used in stream_pool, present 74 : in stream_ids/streams, referred to by request_stream, has 75 : associated tx_op object. 76 : - CLOSE_TX: request sent, waiting for response. marked used in 77 : stream_pool, present in stream_ids/streams. 78 : - Only 1 stream can be in OPEN state. 79 : 80 : Regular state transitions: 81 : 82 : - IDLE->OPEN: Client acquires a stream object and starts a tx_op 83 : See fd_grpc_client_request_start 84 : - OPEN->CLOSE_TX: tx_op finished writing request data and is now 85 : waiting for the response. tx_op object finalized. 86 : See fd_grpc_client_request_continue1 87 : - CLOSE_TX->IDLE: All response data arrived. Stream object 88 : deallocated. 89 : 90 : Irregular state transitions: 91 : 92 : - CLOSE_TX->IDLE: Server aborts stream before request is fully sent. 93 : - OPEN->IDLE: Server aborts stream before response is received. */ 94 : 95 : struct fd_grpc_client_private { 96 : fd_grpc_client_callbacks_t const * callbacks; 97 : void * ctx; 98 : 99 : fd_h2_hdr_matcher_t matcher[1]; 100 : 101 : /* HTTP/2 connection */ 102 : fd_h2_conn_t conn[1]; 103 : fd_h2_rbuf_t frame_rx[1]; /* unencrypted HTTP/2 RX frame buffer */ 104 : fd_h2_rbuf_t frame_tx[1]; /* unencrypted HTTP/2 TX frame buffer */ 105 : 106 : /* HTTP/2 authority */ 107 : char host[ 256 ]; 108 : ushort port; /* <=65535 */ 109 : uchar host_len; /* <=255 */ 110 : 111 : /* TLS connection */ 112 : uint ssl_hs_done : 1; 113 : uint h2_hs_done : 1; 114 : 115 : /* Inflight request 116 : Non-NULL until a gRPC request is fully written out. */ 117 : fd_grpc_h2_stream_t * request_stream; 118 : fd_h2_tx_op_t request_tx_op[1]; 119 : 120 : /* Stream pool */ 121 : fd_grpc_h2_stream_t * stream_pool; 122 : uchar * stream_bufs; 123 : 124 : /* Stream map */ 125 : /* FIXME pull this into a fd_map_tiny.c? */ 126 : uint stream_ids[ FD_GRPC_CLIENT_MAX_STREAMS ]; 127 : fd_grpc_h2_stream_t * streams [ FD_GRPC_CLIENT_MAX_STREAMS ]; 128 : ulong stream_cnt; 129 : 130 : /* Buffers */ 131 : uchar * nanopb_tx; 132 : ulong nanopb_tx_max; 133 : uchar * frame_scratch; 134 : ulong frame_scratch_max; 135 : 136 : /* Frame buffers */ 137 : uchar * frame_rx_buf; 138 : ulong frame_rx_buf_max; 139 : uchar * frame_tx_buf; 140 : ulong frame_tx_buf_max; 141 : 142 : /* Version string */ 143 : uchar version_len; 144 : char version[ FD_GRPC_CLIENT_VERSION_LEN_MAX ]; 145 : 146 : fd_grpc_client_metrics_t * metrics; 147 : }; 148 : 149 : FD_PROTOTYPES_BEGIN 150 : 151 : /* fd_grpc_client_stream_acquire grabs a new stream ID and a stream 152 : object. */ 153 : 154 : int 155 : fd_grpc_client_stream_acquire_is_safe( fd_grpc_client_t * client ); 156 : 157 : 158 : fd_grpc_h2_stream_t * 159 : fd_grpc_client_stream_acquire( fd_grpc_client_t * client, 160 : ulong request_ctx ); 161 : 162 : void 163 : fd_grpc_client_stream_release( fd_grpc_client_t * client, 164 : fd_grpc_h2_stream_t * stream ); 165 : 166 : /* fd_grpc_client_service_streams checks all streams for timeouts and 167 : optionally generates receive window updates. 168 : FIXME poor algorithmic inefficiency (O(n)). Consider using a service 169 : queue/heap */ 170 : 171 : void 172 : fd_grpc_client_service_streams( fd_grpc_client_t * client, 173 : long ts_nanos ); 174 : 175 : void 176 : fd_grpc_h2_cb_headers( 177 : fd_h2_conn_t * conn, 178 : fd_h2_stream_t * h2_stream, 179 : void const * data, 180 : ulong data_sz, 181 : ulong flags 182 : ); 183 : 184 : FD_PROTOTYPES_END 185 : 186 : #endif /* HEADER_fd_src_waltz_grpc_fd_grpc_client_private_h */