Line data Source code
1 : #define _GNU_SOURCE
2 : #include "fd_http_server_private.h"
3 :
4 : #include "picohttpparser.h"
5 : #include "../../ballet/sha1/fd_sha1.h"
6 : #include "../../ballet/base64/fd_base64.h"
7 : #include "../../util/net/fd_ip4.h"
8 : #include "fd_http.h"
9 :
10 : #include <stdarg.h>
11 : #include <stdio.h>
12 : #include <errno.h>
13 : #include <unistd.h>
14 : #include <poll.h>
15 : #include <stdlib.h>
16 : #include <strings.h>
17 : #include <sys/socket.h>
18 : #include <netinet/in.h>
19 :
20 30 : #define FD_HTTP_SERVER_POLL_CHUNK_SZ 128UL
21 :
22 : #if FD_HAS_ZSTD
23 42 : #define FD_HTTP_ZSTD_COMPRESSION_LEVEL 3
24 : #define ZSTD_STATIC_LINKING_ONLY
25 : #include <zstd.h>
26 : #endif
27 :
28 : #define POOL_NAME ws_conn_pool
29 42 : #define POOL_T struct fd_http_server_ws_connection
30 : #define POOL_IDX_T ushort
31 12 : #define POOL_NEXT parent
32 : #include "../../util/tmpl/fd_pool.c"
33 :
34 : #define POOL_NAME conn_pool
35 42 : #define POOL_T struct fd_http_server_connection
36 : #define POOL_IDX_T ushort
37 63 : #define POOL_NEXT parent
38 : #include "../../util/tmpl/fd_pool.c"
39 :
40 : #define TREAP_NAME ws_conn_treap
41 : #define TREAP_T struct fd_http_server_ws_connection
42 : #define TREAP_QUERY_T void * /* We don't use query ... */
43 : #define TREAP_CMP(q,e) (__extension__({ (void)(q); (void)(e); -1; })) /* which means we don't need to give a real
44 : implementation to cmp either */
45 0 : #define TREAP_IDX_T ushort
46 : #define TREAP_OPTIMIZE_ITERATION 1
47 0 : #define TREAP_LT(e0,e1) ((e0)->send_frames[ (e0)->send_frame_idx ].off<(e1)->send_frames[ (e1)->send_frame_idx ].off)
48 :
49 : #include "../../util/tmpl/fd_treap.c"
50 :
51 : #define TREAP_NAME conn_treap
52 : #define TREAP_T struct fd_http_server_connection
53 : #define TREAP_QUERY_T void * /* We don't use query ... */
54 : #define TREAP_CMP(q,e) (__extension__({ (void)(q); (void)(e); -1; })) /* which means we don't need to give a real
55 : implementation to cmp either */
56 0 : #define TREAP_IDX_T ushort
57 : #define TREAP_OPTIMIZE_ITERATION 1
58 0 : #define TREAP_LT(e0,e1) ((e0)->response._body_off<(e1)->response._body_off)
59 :
60 : #include "../../util/tmpl/fd_treap.c"
61 :
62 : #define FD_HTTP_SERVER_DEBUG 0
63 :
64 : FD_FN_CONST char const *
65 0 : fd_http_server_connection_close_reason_str( int reason ) {
66 0 : switch( reason ) {
67 0 : case FD_HTTP_SERVER_CONNECTION_CLOSE_OK: return "OK-Connection was closed normally";
68 0 : case FD_HTTP_SERVER_CONNECTION_CLOSE_EVICTED: return "EVICTED-Connection was evicted to make room for a new one";
69 0 : case FD_HTTP_SERVER_CONNECTION_CLOSE_TOO_SLOW: return "TOO_SLOW-Client was too slow and did not read the response in time";
70 0 : case FD_HTTP_SERVER_CONNECTION_CLOSE_EXPECTED_EOF: return "EXPECTED_EOF-Client continued to send data when we expected no more";
71 0 : case FD_HTTP_SERVER_CONNECTION_CLOSE_PEER_RESET: return "PEER_RESET-Connection was reset by peer";
72 0 : case FD_HTTP_SERVER_CONNECTION_CLOSE_LARGE_REQUEST: return "LARGE_REQUEST-Request body was too large";
73 0 : case FD_HTTP_SERVER_CONNECTION_CLOSE_BAD_REQUEST: return "BAD_REQUEST-Request was malformed";
74 0 : case FD_HTTP_SERVER_CONNECTION_CLOSE_MISSING_CONTENT_LENGTH_HEADER: return "MISSING_CONTENT_LENGTH_HEADER-Missing Content-Length header field";
75 0 : case FD_HTTP_SERVER_CONNECTION_CLOSE_UNKNOWN_METHOD: return "UNKNOWN_METHOD-Request method was not recognized";
76 0 : case FD_HTTP_SERVER_CONNECTION_CLOSE_PATH_TOO_LONG: return "PATH_TOO_LONG-Request path was too long";
77 0 : case FD_HTTP_SERVER_CONNECTION_CLOSE_WS_BAD_KEY: return "WS_BAD_KEY-Malformed Sec-WebSocket-Key header field";
78 0 : case FD_HTTP_SERVER_CONNECTION_CLOSE_WS_UNEXPECTED_VERSION: return "WS_UNEXPECTED_VERSION-Unexpected Sec-Websocket-Version field";
79 0 : case FD_HTTP_SERVER_CONNECTION_CLOSE_WS_MISSING_KEY_HEADER: return "WS_MISSING_KEY_HEADER-Missing Sec-WebSocket-Key header field";
80 0 : case FD_HTTP_SERVER_CONNECTION_CLOSE_WS_MISSING_VERSION_HEADER: return "WS_MISSING_VERSION_HEADER-Missing Sec-WebSocket-Version header field";
81 0 : case FD_HTTP_SERVER_CONNECTION_CLOSE_WS_BAD_MASK: return "WS_BAD_MASK-Got frame from client without mask flag set";
82 0 : case FD_HTTP_SERVER_CONNECTION_CLOSE_WS_UNKNOWN_OPCODE: return "WS_UNKNOWN_OPCODE-Unknown opcode in websocket frame";
83 0 : case FD_HTTP_SERVER_CONNECTION_CLOSE_WS_OVERSIZE_FRAME: return "WS_OVERSIZE_FRAME-Websocket frame was too large";
84 0 : case FD_HTTP_SERVER_CONNECTION_CLOSE_WS_CLIENT_TOO_SLOW: return "WS_CLIENT_TOO_SLOW-Client was too slow to keep up with sender";
85 0 : case FD_HTTP_SERVER_CONNECTION_CLOSE_WS_MISSING_UPGRADE: return "WS_MISSING_UPGRADE-Missing Upgrade header field";
86 0 : case FD_HTTP_SERVER_CONNECTION_CLOSE_WS_EXPECTED_CONT_OPCODE: return "WS_EXPECTED_CONT_OPCODE-Expected continuation opcode in websocket frame";
87 0 : case FD_HTTP_SERVER_CONNECTION_CLOSE_WS_EXPECTED_TEXT_OPCODE: return "WS_EXPECTED_TEXT_OPCODE-Expected text opcode in websocket frame";
88 0 : case FD_HTTP_SERVER_CONNECTION_CLOSE_WS_CONTROL_FRAME_TOO_LARGE: return "WS_CONTROL_FRAME_TOO_LARGE-Websocket control frame was too large";
89 0 : case FD_HTTP_SERVER_CONNECTION_CLOSE_WS_CHANGED_OPCODE: return "FD_HTTP_SERVER_CONNECTION_CLOSE_WS_CHANGED_OPCODE-Websocket frame type changed unexpectedly";
90 0 : case FD_HTTP_SERVER_CONNECTION_CLOSE_UNSUPPORTED_TRANSFER_ENCODING: return "UNSUPPORTED_TRANSFER_ENCODING-Transfer-Encoding is not supported";
91 0 : default: break;
92 0 : }
93 :
94 0 : return "unknown";
95 0 : }
96 :
97 : FD_FN_CONST char const *
98 0 : fd_http_server_method_str( uchar method ) {
99 0 : switch( method ) {
100 0 : case FD_HTTP_SERVER_METHOD_GET: return "GET";
101 0 : case FD_HTTP_SERVER_METHOD_POST: return "POST";
102 0 : case FD_HTTP_SERVER_METHOD_PUT: return "PUT";
103 0 : default: break;
104 0 : }
105 :
106 0 : return "unknown";
107 0 : }
108 :
109 : FD_FN_CONST ulong
110 111 : fd_http_server_align( void ) {
111 111 : return FD_HTTP_SERVER_ALIGN;
112 111 : }
113 :
114 : FD_FN_CONST ulong
115 24 : fd_http_server_footprint( fd_http_server_params_t params ) {
116 24 : ulong l = FD_LAYOUT_INIT;
117 24 : l = FD_LAYOUT_APPEND( l, FD_HTTP_SERVER_ALIGN, sizeof( fd_http_server_t ) );
118 24 : l = FD_LAYOUT_APPEND( l, conn_pool_align(), conn_pool_footprint( params.max_connection_cnt ) );
119 24 : l = FD_LAYOUT_APPEND( l, ws_conn_pool_align(), ws_conn_pool_footprint( params.max_ws_connection_cnt ) );
120 24 : l = FD_LAYOUT_APPEND( l, conn_treap_align(), conn_treap_footprint( params.max_connection_cnt ) );
121 24 : l = FD_LAYOUT_APPEND( l, ws_conn_treap_align(), ws_conn_treap_footprint( params.max_ws_connection_cnt ) );
122 24 : l = FD_LAYOUT_APPEND( l, alignof( struct pollfd ), (params.max_connection_cnt+params.max_ws_connection_cnt+1UL)*sizeof( struct pollfd ) );
123 24 : l = FD_LAYOUT_APPEND( l, 1UL, params.max_request_len*params.max_connection_cnt );
124 24 : l = FD_LAYOUT_APPEND( l, 1UL, params.max_ws_recv_frame_len*params.max_ws_connection_cnt );
125 24 : l = FD_LAYOUT_APPEND( l, alignof( struct fd_http_server_ws_frame ), params.max_ws_send_frame_cnt*params.max_ws_connection_cnt*sizeof( struct fd_http_server_ws_frame ) );
126 24 : l = FD_LAYOUT_APPEND( l, 1UL, params.outgoing_buffer_sz );
127 24 : #if FD_HAS_ZSTD
128 24 : l = FD_LAYOUT_APPEND( l, 16UL, ZSTD_estimateCCtxSize( FD_HTTP_ZSTD_COMPRESSION_LEVEL ) );
129 24 : #endif
130 24 : return FD_LAYOUT_FINI( l, fd_http_server_align() );
131 24 : }
132 :
133 : void *
134 : fd_http_server_new( void * shmem,
135 : fd_http_server_params_t params,
136 : fd_http_server_callbacks_t callbacks,
137 21 : void * callback_ctx ) {
138 21 : if( FD_UNLIKELY( !shmem ) ) {
139 0 : FD_LOG_WARNING(( "NULL shmem" ));
140 0 : return NULL;
141 0 : }
142 :
143 21 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_http_server_align() ) ) ) {
144 0 : FD_LOG_WARNING(( "misaligned shmem" ));
145 0 : return NULL;
146 0 : }
147 :
148 21 : if( FD_UNLIKELY( params.max_ws_connection_cnt && params.max_ws_recv_frame_len<params.max_request_len ) ) {
149 0 : FD_LOG_WARNING(( "max_ws_recv_frame_len<max_request_len" ));
150 0 : return NULL;
151 0 : }
152 :
153 21 : FD_SCRATCH_ALLOC_INIT( l, shmem );
154 21 : fd_http_server_t * http = FD_SCRATCH_ALLOC_APPEND( l, FD_HTTP_SERVER_ALIGN, sizeof(fd_http_server_t) );
155 21 : void * conn_pool = FD_SCRATCH_ALLOC_APPEND( l, conn_pool_align(), conn_pool_footprint( params.max_connection_cnt ) );
156 21 : void * ws_conn_pool = FD_SCRATCH_ALLOC_APPEND( l, ws_conn_pool_align(), ws_conn_pool_footprint( params.max_ws_connection_cnt ) );
157 21 : http->conn_treap = FD_SCRATCH_ALLOC_APPEND( l, conn_treap_align(), conn_treap_footprint( params.max_connection_cnt ) );
158 21 : http->ws_conn_treap = FD_SCRATCH_ALLOC_APPEND( l, ws_conn_treap_align(), ws_conn_treap_footprint( params.max_ws_connection_cnt ) );
159 21 : http->pollfds = FD_SCRATCH_ALLOC_APPEND( l, alignof(struct pollfd), (params.max_connection_cnt+params.max_ws_connection_cnt+1UL)*sizeof( struct pollfd ) );
160 21 : char * _request_bytes = FD_SCRATCH_ALLOC_APPEND( l, 1UL, params.max_request_len*params.max_connection_cnt );
161 21 : uchar * _ws_recv_bytes = FD_SCRATCH_ALLOC_APPEND( l, 1UL, params.max_ws_recv_frame_len*params.max_ws_connection_cnt );
162 21 : struct fd_http_server_ws_frame * _ws_send_frames = FD_SCRATCH_ALLOC_APPEND( l, alignof(struct fd_http_server_ws_frame), params.max_ws_send_frame_cnt*params.max_ws_connection_cnt*sizeof(struct fd_http_server_ws_frame) );
163 21 : http->oring = FD_SCRATCH_ALLOC_APPEND( l, 1UL, params.outgoing_buffer_sz );
164 21 : #if FD_HAS_ZSTD
165 21 : uchar * _zstd_ctx = FD_SCRATCH_ALLOC_APPEND( l, 16UL, ZSTD_estimateCCtxSize( FD_HTTP_ZSTD_COMPRESSION_LEVEL ) );
166 21 : #endif
167 21 : http->oring_sz = params.outgoing_buffer_sz;
168 21 : http->stage_err = 0;
169 21 : http->stage_off = 0UL;
170 21 : http->stage_len = 0UL;
171 21 : http->stage_comp_len = 0UL;
172 :
173 21 : http->callbacks = callbacks;
174 21 : http->callback_ctx = callback_ctx;
175 21 : http->evict_conn_id = 0UL;
176 21 : http->evict_ws_conn_id = 0UL;
177 21 : http->poll_conn_idx = 0UL;
178 21 : http->max_conns = params.max_connection_cnt;
179 21 : http->max_ws_conns = params.max_ws_connection_cnt;
180 21 : http->max_request_len = params.max_request_len;
181 21 : http->max_ws_recv_frame_len = params.max_ws_recv_frame_len;
182 21 : http->max_ws_send_frame_cnt = params.max_ws_send_frame_cnt;
183 21 : http->compress_websocket = params.compress_websocket;
184 :
185 21 : #if FD_HAS_ZSTD
186 21 : http->zstd_ctx = ZSTD_initStaticCCtx( _zstd_ctx, ZSTD_estimateCCtxSize( FD_HTTP_ZSTD_COMPRESSION_LEVEL ) );
187 21 : FD_TEST( http->zstd_ctx );
188 21 : ulong err = ZSTD_CCtx_setParameter( http->zstd_ctx, 100, FD_HTTP_ZSTD_COMPRESSION_LEVEL );
189 21 : if( FD_UNLIKELY( ZSTD_isError( err ) ) )
190 0 : FD_LOG_ERR(( "ZSTD_CCtx_setParameter failed (%s)", ZSTD_getErrorName( err ) ) );
191 21 : #endif
192 :
193 21 : http->conns = conn_pool_join( conn_pool_new( conn_pool, params.max_connection_cnt ) );
194 21 : conn_treap_join( conn_treap_new( http->conn_treap, params.max_connection_cnt ) );
195 21 : conn_treap_seed( http->conns, params.max_connection_cnt, 42UL );
196 :
197 21 : http->ws_conns = ws_conn_pool_join( ws_conn_pool_new( ws_conn_pool, params.max_ws_connection_cnt ) );
198 21 : ws_conn_treap_join( ws_conn_treap_new( http->ws_conn_treap, params.max_ws_connection_cnt ) );
199 21 : ws_conn_treap_seed( http->ws_conns, params.max_ws_connection_cnt, 42UL );
200 :
201 54 : for( ulong i=0UL; i<params.max_connection_cnt; i++ ) {
202 33 : http->pollfds[ i ].fd = -1;
203 33 : http->pollfds[ i ].events = POLLIN | POLLOUT;
204 33 : http->conns[ i ] = (struct fd_http_server_connection){
205 33 : .request_bytes = _request_bytes+i*params.max_request_len,
206 33 : .parent = http->conns[ i ].parent,
207 33 : };
208 33 : }
209 :
210 33 : for( ulong i=0UL; i<params.max_ws_connection_cnt; i++ ) {
211 12 : http->pollfds[ params.max_connection_cnt+i ].fd = -1;
212 12 : http->pollfds[ params.max_connection_cnt+i ].events = POLLIN | POLLOUT;
213 12 : http->ws_conns[ i ] = (struct fd_http_server_ws_connection){
214 12 : .recv_bytes = _ws_recv_bytes+i*params.max_ws_recv_frame_len,
215 12 : .send_frames = _ws_send_frames+i*params.max_ws_send_frame_cnt,
216 12 : .parent = http->ws_conns[ i ].parent,
217 12 : };
218 12 : }
219 :
220 21 : http->pollfds[ params.max_connection_cnt+params.max_ws_connection_cnt ].fd = -1;
221 21 : http->pollfds[ params.max_connection_cnt+params.max_ws_connection_cnt ].events = POLLIN | POLLOUT;
222 :
223 21 : memset( &http->metrics, 0, sizeof( http->metrics ) );
224 :
225 21 : FD_COMPILER_MFENCE();
226 21 : FD_VOLATILE( http->magic ) = FD_HTTP_SERVER_MAGIC;
227 21 : FD_COMPILER_MFENCE();
228 :
229 21 : return (void *)http;
230 21 : }
231 :
232 : fd_http_server_t *
233 21 : fd_http_server_join( void * shhttp ) {
234 :
235 21 : if( FD_UNLIKELY( !shhttp ) ) {
236 0 : FD_LOG_WARNING(( "NULL shhttp" ));
237 0 : return NULL;
238 0 : }
239 :
240 21 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shhttp, fd_http_server_align() ) ) ) {
241 0 : FD_LOG_WARNING(( "misaligned shhttp" ));
242 0 : return NULL;
243 0 : }
244 :
245 21 : fd_http_server_t * http = (fd_http_server_t *)shhttp;
246 :
247 21 : if( FD_UNLIKELY( http->magic!=FD_HTTP_SERVER_MAGIC ) ) {
248 0 : FD_LOG_WARNING(( "bad magic" ));
249 0 : return NULL;
250 0 : }
251 :
252 21 : return http;
253 21 : }
254 :
255 : void *
256 18 : fd_http_server_leave( fd_http_server_t * http ) {
257 :
258 18 : if( FD_UNLIKELY( !http ) ) {
259 0 : FD_LOG_WARNING(( "NULL http" ));
260 0 : return NULL;
261 0 : }
262 :
263 18 : return (void *)http;
264 18 : }
265 :
266 : void *
267 18 : fd_http_server_delete( void * shhttp ) {
268 :
269 18 : if( FD_UNLIKELY( !shhttp ) ) {
270 0 : FD_LOG_WARNING(( "NULL shhttp" ));
271 0 : return NULL;
272 0 : }
273 :
274 18 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shhttp, fd_http_server_align() ) ) ) {
275 0 : FD_LOG_WARNING(( "misaligned shhttp" ));
276 0 : return NULL;
277 0 : }
278 :
279 18 : fd_http_server_t * http = (fd_http_server_t *)shhttp;
280 :
281 18 : if( FD_UNLIKELY( http->magic!=FD_HTTP_SERVER_MAGIC ) ) {
282 0 : FD_LOG_WARNING(( "bad magic" ));
283 0 : return NULL;
284 0 : }
285 :
286 18 : FD_COMPILER_MFENCE();
287 18 : FD_VOLATILE( http->magic ) = 0UL;
288 18 : FD_COMPILER_MFENCE();
289 :
290 18 : return (void *)http;
291 18 : }
292 :
293 : int
294 30 : fd_http_server_fd( fd_http_server_t * http ) {
295 30 : return http->socket_fd;
296 30 : }
297 :
298 : fd_http_server_t *
299 : fd_http_server_listen( fd_http_server_t * http,
300 : uint address,
301 15 : ushort port ) {
302 15 : int sockfd = socket( AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0 );
303 15 : if( FD_UNLIKELY( -1==sockfd ) ) FD_LOG_ERR(( "socket failed (%i-%s)", errno, strerror( errno ) ));
304 :
305 15 : int optval = 1;
306 15 : if( FD_UNLIKELY( -1==setsockopt( sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof( optval ) ) ) )
307 0 : FD_LOG_ERR(( "setsockopt failed (%i-%s)", errno, strerror( errno ) ));
308 :
309 15 : struct sockaddr_in addr = {
310 15 : .sin_family = AF_INET,
311 15 : .sin_port = fd_ushort_bswap( port ),
312 15 : .sin_addr.s_addr = address,
313 15 : };
314 :
315 15 : if( FD_UNLIKELY( -1==bind( sockfd, fd_type_pun( &addr ), sizeof( addr ) ) ) ) {
316 0 : FD_LOG_ERR(( "bind(%i,AF_INET," FD_IP4_ADDR_FMT ":%u) failed (%i-%s)",
317 0 : sockfd, FD_IP4_ADDR_FMT_ARGS( address ), port,
318 0 : errno, fd_io_strerror( errno ) ));
319 0 : }
320 15 : if( FD_UNLIKELY( -1==listen( sockfd, (int)http->max_conns ) ) ) FD_LOG_ERR(( "listen failed (%i-%s)", errno, fd_io_strerror( errno ) ));
321 :
322 15 : http->socket_fd = sockfd;
323 15 : http->pollfds[ http->max_conns+http->max_ws_conns ].fd = http->socket_fd;
324 :
325 15 : return http;
326 15 : }
327 :
328 : static void
329 : close_conn( fd_http_server_t * http,
330 : ulong conn_idx,
331 15 : int reason ) {
332 15 : FD_TEST( http->pollfds[ conn_idx ].fd!=-1 );
333 : #if FD_HTTP_SERVER_DEBUG
334 : FD_LOG_NOTICE(( "Closing connection %lu (fd=%d) (%d-%s)", conn_idx, http->pollfds[ conn_idx ].fd, reason, fd_http_server_connection_close_reason_str( reason ) ));
335 : #endif
336 :
337 15 : if( FD_UNLIKELY( -1==close( http->pollfds[ conn_idx ].fd ) ) ) FD_LOG_ERR(( "close failed (%i-%s)", errno, strerror( errno ) ));
338 :
339 15 : http->pollfds[ conn_idx ].fd = -1;
340 15 : if( FD_LIKELY( conn_idx<http->max_conns ) ) {
341 15 : if( FD_LIKELY( http->callbacks.close ) ) http->callbacks.close( conn_idx, reason, http->callback_ctx );
342 15 : } else {
343 0 : if( FD_LIKELY( http->callbacks.ws_close ) ) http->callbacks.ws_close( conn_idx-http->max_conns, reason, http->callback_ctx );
344 0 : }
345 :
346 15 : if( FD_UNLIKELY( conn_idx<http->max_conns ) ) {
347 15 : struct fd_http_server_connection * conn = &http->conns[ conn_idx ];
348 15 : if( FD_LIKELY( (conn->state==FD_HTTP_SERVER_CONNECTION_STATE_WRITING_HEADER || conn->state==FD_HTTP_SERVER_CONNECTION_STATE_WRITING_BODY)
349 15 : && !conn->response.static_body ) ) {
350 0 : conn_treap_ele_remove( http->conn_treap, conn, http->conns );
351 0 : }
352 15 : conn_pool_ele_release( http->conns, conn );
353 15 : } else {
354 0 : struct fd_http_server_ws_connection * ws_conn = &http->ws_conns[ conn_idx-http->max_conns ];
355 0 : if( FD_LIKELY( ws_conn->send_frame_cnt ) ) ws_conn_treap_ele_remove( http->ws_conn_treap, ws_conn, http->ws_conns );
356 0 : ws_conn_pool_ele_release( http->ws_conns, ws_conn );
357 0 : }
358 :
359 15 : if( FD_LIKELY( conn_idx<http->max_conns ) ) http->metrics.connection_cnt--;
360 0 : else http->metrics.ws_connection_cnt--;
361 15 : }
362 :
363 : void
364 : fd_http_server_close( fd_http_server_t * http,
365 : ulong conn_id,
366 0 : int reason ) {
367 0 : close_conn( http, conn_id, reason );
368 0 : }
369 :
370 : void
371 : fd_http_server_ws_close( fd_http_server_t * http,
372 : ulong ws_conn_id,
373 0 : int reason ) {
374 0 : close_conn( http, http->max_conns+ws_conn_id, reason );
375 0 : }
376 :
377 : /* These are the expected network errors which just mean the connection
378 : should be closed. Any errors from an accept(2), read(2), or send(2)
379 : that are not expected here will be considered fatal and terminate the
380 : server. */
381 :
382 : static inline int
383 0 : is_expected_network_error( int err ) {
384 0 : return
385 0 : err==ENETDOWN ||
386 0 : err==EPROTO ||
387 0 : err==ENOPROTOOPT ||
388 0 : err==EHOSTDOWN ||
389 0 : err==ENONET ||
390 0 : err==EHOSTUNREACH ||
391 0 : err==EOPNOTSUPP ||
392 0 : err==ENETUNREACH ||
393 0 : err==ETIMEDOUT ||
394 0 : err==ENETRESET ||
395 0 : err==ECONNABORTED ||
396 0 : err==ECONNRESET ||
397 0 : err==EPIPE ||
398 0 : err==EPERM || /* iptables */
399 0 : err==ENOMEM; /* net stack OOM */
400 0 : }
401 :
402 : static void
403 15 : accept_conns( fd_http_server_t * http ) {
404 30 : for(;;) {
405 30 : int fd = accept4( http->socket_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC );
406 :
407 30 : if( FD_UNLIKELY( -1==fd ) ) {
408 15 : if( FD_LIKELY( EAGAIN==errno ) ) break;
409 0 : else if( FD_LIKELY( is_expected_network_error( errno ) ) ) continue;
410 0 : else FD_LOG_ERR(( "accept failed (%i-%s)", errno, strerror( errno ) ));
411 15 : }
412 :
413 15 : if( FD_UNLIKELY( !conn_pool_free( http->conns ) ) ) {
414 0 : conn_treap_fwd_iter_t it = conn_treap_fwd_iter_init( http->conn_treap, http->conns );
415 0 : if( FD_LIKELY( !conn_treap_fwd_iter_done( it ) ) ) {
416 0 : ulong conn_id = conn_treap_fwd_iter_idx( it );
417 0 : close_conn( http, conn_id, FD_HTTP_SERVER_CONNECTION_CLOSE_EVICTED );
418 0 : } else {
419 : /* If nobody is slow to read, just evict round robin */
420 0 : close_conn( http, http->evict_conn_id, FD_HTTP_SERVER_CONNECTION_CLOSE_EVICTED );
421 0 : http->evict_conn_id = (http->evict_conn_id+1UL) % http->max_conns;
422 0 : }
423 0 : }
424 :
425 15 : ulong conn_id = conn_pool_idx_acquire( http->conns );
426 :
427 15 : http->pollfds[ conn_id ].fd = fd;
428 15 : http->conns[ conn_id ].state = FD_HTTP_SERVER_CONNECTION_STATE_READING;
429 15 : http->conns[ conn_id ].request_bytes_read = 0UL;
430 15 : http->conns[ conn_id ].response_bytes_written = 0UL;
431 :
432 15 : if( FD_UNLIKELY( http->callbacks.open ) ) {
433 0 : http->callbacks.open( conn_id, fd, http->callback_ctx );
434 0 : }
435 :
436 15 : http->metrics.connection_cnt++;
437 : #if FD_HTTP_SERVER_DEBUG
438 : FD_LOG_NOTICE(( "Accepted connection %lu (fd=%d)", conn_id, fd ));
439 : #endif
440 15 : }
441 15 : }
442 :
443 : static void
444 : read_conn_http( fd_http_server_t * http,
445 15 : ulong conn_idx ) {
446 15 : struct fd_http_server_connection * conn = &http->conns[ conn_idx ];
447 :
448 15 : if( FD_UNLIKELY( conn->state!=FD_HTTP_SERVER_CONNECTION_STATE_READING ) ) {
449 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_EXPECTED_EOF );
450 0 : return;
451 0 : }
452 :
453 15 : long sz = read( http->pollfds[ conn_idx ].fd, conn->request_bytes+conn->request_bytes_read, http->max_request_len-conn->request_bytes_read );
454 15 : if( FD_UNLIKELY( -1==sz && errno==EAGAIN ) ) return; /* No data to read, continue. */
455 15 : else if( FD_UNLIKELY( !sz || (-1==sz && is_expected_network_error( errno ) ) ) ) {
456 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_PEER_RESET );
457 0 : return;
458 0 : }
459 15 : else if( FD_UNLIKELY( -1==sz ) ) FD_LOG_ERR(( "read failed (%i-%s)", errno, strerror( errno ) )); /* Unexpected programmer error, abort */
460 :
461 : /* New data was read... process it */
462 15 : http->metrics.bytes_read += (ulong)sz;
463 15 : conn->request_bytes_read += (ulong)sz;
464 15 : if( FD_UNLIKELY( conn->request_bytes_read==http->max_request_len ) ) {
465 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_LARGE_REQUEST );
466 0 : return;
467 0 : }
468 :
469 15 : char const * method;
470 15 : ulong method_len;
471 15 : char const * path;
472 15 : ulong path_len;
473 15 : int minor_version;
474 15 : struct phr_header headers[ 32 ];
475 15 : ulong num_headers = 32UL;
476 15 : int result = phr_parse_request( conn->request_bytes,
477 15 : conn->request_bytes_read,
478 15 : &method, &method_len,
479 15 : &path, &path_len,
480 15 : &minor_version,
481 15 : headers, &num_headers,
482 15 : conn->request_bytes_read - (ulong)sz );
483 15 : if( FD_UNLIKELY( -2==result ) ) return; /* Request still partial, wait for more data */
484 15 : else if( FD_UNLIKELY( -1==result ) ) {
485 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_BAD_REQUEST );
486 0 : return;
487 0 : }
488 :
489 15 : FD_TEST( result>0 && (ulong)result<=conn->request_bytes_read );
490 :
491 15 : uchar method_enum = UCHAR_MAX;
492 15 : if( FD_LIKELY( method_len==3UL && !strncmp( method, "GET", method_len ) ) ) method_enum = FD_HTTP_SERVER_METHOD_GET;
493 9 : else if( FD_LIKELY( method_len==4UL && !strncmp( method, "POST", method_len ) ) ) method_enum = FD_HTTP_SERVER_METHOD_POST;
494 0 : else if( FD_LIKELY( method_len==7UL && !strncmp( method, "OPTIONS", method_len ) ) ) method_enum = FD_HTTP_SERVER_METHOD_OPTIONS;
495 0 : else if( FD_LIKELY( method_len==3UL && !strncmp( method, "PUT", method_len ) ) ) method_enum = FD_HTTP_SERVER_METHOD_PUT;
496 :
497 15 : if( FD_UNLIKELY( method_enum==UCHAR_MAX ) ) {
498 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_UNKNOWN_METHOD );
499 0 : return;
500 0 : }
501 :
502 : /* RFC 7230 s3.3.3 */
503 66 : for( ulong i=0UL; i<num_headers; i++ ) {
504 54 : if( FD_UNLIKELY( headers[ i ].name_len==17UL && !strncasecmp( headers[ i ].name, "Transfer-Encoding", 17UL ) ) ) {
505 3 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_UNSUPPORTED_TRANSFER_ENCODING );
506 3 : return;
507 3 : }
508 54 : }
509 :
510 12 : ulong content_len = 0UL;
511 12 : if( FD_UNLIKELY( method_enum==FD_HTTP_SERVER_METHOD_POST || method_enum==FD_HTTP_SERVER_METHOD_PUT ) ) {
512 6 : int found = 0;
513 18 : for( ulong i=0UL; i<num_headers; i++ ) {
514 18 : if( FD_LIKELY( headers[ i ].name_len==14UL && !strncasecmp( headers[ i ].name, "Content-Length", 14UL ) ) ) {
515 9 : ulong this_content_len = 0UL;
516 9 : int parse_err = fd_http_parse_content_len( headers[ i ].value, (ulong)headers[ i ].value_len, &this_content_len );
517 9 : if( FD_UNLIKELY( parse_err ) ) {
518 3 : close_conn( http, conn_idx, fd_int_if( parse_err==FD_HTTP_PARSE_CONTENT_LEN_OVERFLOW, FD_HTTP_SERVER_CONNECTION_CLOSE_LARGE_REQUEST, FD_HTTP_SERVER_CONNECTION_CLOSE_BAD_REQUEST ) );
519 3 : return;
520 3 : }
521 6 : if( FD_UNLIKELY( found && this_content_len!=content_len ) ) {
522 : /* RFC 7230 s3.3.3 rule 4: reject if duplicate Content-Length
523 : values differ */
524 3 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_BAD_REQUEST );
525 3 : return;
526 3 : }
527 3 : content_len = this_content_len;
528 3 : found = 1;
529 3 : }
530 18 : }
531 :
532 0 : if( FD_UNLIKELY( !found ) ) {
533 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_MISSING_CONTENT_LENGTH_HEADER );
534 0 : return;
535 0 : }
536 :
537 0 : ulong total_len = (ulong)result+content_len;
538 :
539 0 : if( FD_UNLIKELY( total_len<content_len || total_len>http->max_request_len ) ) { /* Overflow */
540 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_LARGE_REQUEST );
541 0 : return;
542 0 : }
543 :
544 :
545 0 : if( FD_UNLIKELY( conn->request_bytes_read<(ulong)result+content_len ) ) {
546 0 : return; /* Request still partial, wait for more data */
547 0 : }
548 0 : }
549 :
550 6 : char content_type_nul_terminated[ 128 ] = {0};
551 6 : char accept_encoding_nul_terminated[ 128 ] = {0};
552 36 : for( ulong i=0UL; i<num_headers; i++ ) {
553 30 : if( FD_LIKELY( headers[ i ].name_len==12UL && !strncasecmp( headers[ i ].name, "Content-Type", 12UL ) ) ) {
554 0 : if( FD_UNLIKELY( headers[ i ].value_len>(sizeof(content_type_nul_terminated)-1UL) ) ) {
555 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_BAD_REQUEST );
556 0 : return;
557 0 : }
558 0 : memcpy( content_type_nul_terminated, headers[ i ].value, headers[ i ].value_len );
559 0 : break;
560 0 : }
561 :
562 30 : if( FD_LIKELY( headers[ i ].name_len==15UL && !strncasecmp( headers[ i ].name, "Accept-Encoding", 15UL ) ) ) {
563 0 : if( FD_UNLIKELY( headers[ i ].value_len>(sizeof(accept_encoding_nul_terminated)-1UL) ) ) {
564 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_BAD_REQUEST );
565 0 : return;
566 0 : }
567 0 : memcpy( accept_encoding_nul_terminated, headers[ i ].value, headers[ i ].value_len );
568 0 : }
569 30 : }
570 :
571 6 : char path_nul_terminated[ 128 ] = {0};
572 6 : if( FD_UNLIKELY( path_len>(sizeof( path_nul_terminated )-1UL) ) ) {
573 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_PATH_TOO_LONG );
574 0 : return;
575 0 : }
576 6 : memcpy( path_nul_terminated, path, path_len );
577 :
578 6 : char const * upgrade_key = NULL;
579 12 : for( ulong i=0UL; i<num_headers; i++ ) {
580 12 : if( FD_LIKELY( headers[ i ].name_len==7UL && !strncasecmp( headers[ i ].name, "Upgrade", 7UL ) && headers[ i ].value_len==9UL ) ) {
581 6 : upgrade_key = headers[ i ].value;
582 6 : break;
583 6 : }
584 12 : }
585 :
586 6 : conn->upgrade_websocket = 0;
587 6 : int compress_websocket = 0;
588 6 : if( FD_UNLIKELY( upgrade_key && !strncasecmp( upgrade_key, "websocket", 9UL ) ) ) {
589 6 : conn->request_bytes_len = (ulong)result;
590 6 : conn->upgrade_websocket = 1;
591 :
592 6 : #if FD_HAS_ZSTD
593 36 : for( ulong i=0UL; i<num_headers; i++ ) {
594 30 : if( FD_LIKELY( headers[ i ].name_len==22UL && !strncasecmp( headers[ i ].name, "Sec-WebSocket-Protocol", 22UL ) &&
595 30 : headers[ i ].value_len==13UL && !strncmp( headers[ i ].value, "compress-zstd", 13UL ) ) ) {
596 0 : compress_websocket = 1;
597 0 : }
598 30 : }
599 6 : #endif
600 :
601 6 : char const * sec_websocket_key = NULL;
602 24 : for( ulong i=0UL; i<num_headers; i++ ) {
603 24 : if( FD_LIKELY( headers[ i ].name_len==17UL && !strncasecmp( headers[ i ].name, "Sec-WebSocket-Key", 17UL ) ) ) {
604 6 : sec_websocket_key = headers[ i ].value;
605 6 : if( FD_UNLIKELY( headers[ i ].value_len!=24 ) ) {
606 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_WS_BAD_KEY );
607 0 : return;
608 0 : }
609 : /* RFC 6455 s4.2.1: Sec-WebSocket-Key must base64-decode to
610 : exactly 16 bytes. fd_base64_decode also validates the
611 : alphabet and padding rules. */
612 6 : uchar decoded_key[ FD_BASE64_DEC_SZ( 24UL ) ];
613 6 : if( FD_UNLIKELY( 16L!=fd_base64_decode( decoded_key, sec_websocket_key, 24UL ) ) ) {
614 3 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_WS_BAD_KEY );
615 3 : return;
616 3 : }
617 3 : break;
618 6 : }
619 24 : }
620 :
621 3 : char const * sec_websocket_version = NULL;
622 15 : for( ulong i=0UL; i<num_headers; i++ ) {
623 15 : if( FD_LIKELY( headers[ i ].name_len==21UL && !strncasecmp( headers[ i ].name, "Sec-Websocket-Version", 21UL ) ) ) {
624 3 : sec_websocket_version = headers[ i ].value;
625 3 : if( FD_UNLIKELY( headers[ i ].value_len!=2 || strncmp( sec_websocket_version, "13", 2UL ) ) ) {
626 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_WS_UNEXPECTED_VERSION );
627 0 : return;
628 0 : }
629 3 : break;
630 3 : }
631 15 : }
632 :
633 3 : if( FD_UNLIKELY( !sec_websocket_key ) ) {
634 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_WS_MISSING_KEY_HEADER );
635 0 : return;
636 0 : }
637 :
638 3 : if( FD_UNLIKELY( !sec_websocket_version ) ) {
639 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_WS_MISSING_VERSION_HEADER );
640 0 : return;
641 0 : }
642 :
643 3 : conn->sec_websocket_key = sec_websocket_key;
644 :
645 : /* RFC 6455 s4.2.2: the client must not send WebSocket frames until
646 : after it receives the 101 response. */
647 3 : if( FD_UNLIKELY( conn->request_bytes_read>conn->request_bytes_len ) ) {
648 3 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_BAD_REQUEST );
649 3 : return;
650 3 : }
651 3 : }
652 :
653 0 : conn->state = FD_HTTP_SERVER_CONNECTION_STATE_WRITING_HEADER;
654 :
655 0 : fd_http_server_request_t request = {
656 0 : .connection_id = conn_idx,
657 :
658 0 : .method = method_enum,
659 0 : .path = path_nul_terminated,
660 :
661 0 : .ctx = http->callback_ctx,
662 :
663 0 : .headers.content_type = content_type_nul_terminated,
664 0 : .headers.accept_encoding = accept_encoding_nul_terminated,
665 0 : .headers.compress_websocket = compress_websocket,
666 0 : .headers.upgrade_websocket = conn->upgrade_websocket,
667 0 : };
668 :
669 0 : switch( method_enum ) {
670 0 : case FD_HTTP_SERVER_METHOD_POST:
671 0 : case FD_HTTP_SERVER_METHOD_PUT: {
672 0 : request.post.body = (uchar*)conn->request_bytes+result;
673 0 : request.post.body_len = content_len;
674 0 : } break;
675 0 : default: break;
676 0 : }
677 :
678 0 : fd_http_server_response_t response = http->callbacks.request( &request );
679 0 : if( FD_LIKELY( http->pollfds[ conn_idx ].fd==-1 ) ) return; /* Connection was closed by callback */
680 0 : conn->response = response;
681 :
682 : #if FD_HTTP_SERVER_DEBUG
683 : FD_LOG_NOTICE(( "Received %s request \"%s\" from %lu (fd=%d) response code %lu", fd_http_server_method_str( method_enum ), path_nul_terminated, conn_idx, http->pollfds[ conn_idx ].fd, conn->response.status ));
684 : #endif
685 :
686 0 : if( FD_LIKELY( !conn->response.static_body ) ) conn_treap_ele_insert( http->conn_treap, conn, http->conns );
687 0 : }
688 :
689 : static void
690 : read_conn_ws( fd_http_server_t * http,
691 0 : ulong conn_idx ) {
692 0 : struct fd_http_server_ws_connection * conn = &http->ws_conns[ conn_idx-http->max_conns ];
693 :
694 0 : long sz = read( http->pollfds[ conn_idx ].fd, conn->recv_bytes+conn->recv_bytes_parsed+conn->recv_bytes_read, http->max_ws_recv_frame_len-conn->recv_bytes_parsed-conn->recv_bytes_read );
695 0 : if( FD_UNLIKELY( -1==sz && errno==EAGAIN ) ) return; /* No data to read, continue. */
696 0 : else if( FD_UNLIKELY( !sz || (-1==sz && is_expected_network_error( errno ) ) ) ) {
697 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_PEER_RESET );
698 0 : return;
699 0 : }
700 0 : else if( FD_UNLIKELY( -1==sz ) ) FD_LOG_ERR(( "read failed (%i-%s)", errno, strerror( errno ) )); /* Unexpected programmer error, abort */
701 :
702 : /* New data was read... process it */
703 0 : conn->recv_bytes_read += (ulong)sz;
704 0 : http->metrics.bytes_read += (ulong)sz;
705 0 : again:
706 0 : if( FD_UNLIKELY( conn->recv_bytes_read<2UL ) ) return; /* Need at least 2 bytes to determine frame length */
707 :
708 0 : int is_mask_set = conn->recv_bytes[ conn->recv_bytes_parsed+1UL ] & 0x80;
709 0 : if( FD_UNLIKELY( !is_mask_set ) ) {
710 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_WS_BAD_MASK );
711 0 : return;
712 0 : }
713 :
714 0 : int opcode = conn->recv_bytes[ conn->recv_bytes_parsed ] & 0x0F;
715 0 : if( FD_UNLIKELY( opcode!=0x0 && opcode!=0x1 && opcode!=0x2 && opcode!=0x8 && opcode!=0x9 && opcode!=0xA ) ) {
716 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_WS_UNKNOWN_OPCODE );
717 0 : return;
718 0 : }
719 :
720 0 : ulong payload_len = conn->recv_bytes[ conn->recv_bytes_parsed+1UL ] & 0x7F;
721 0 : if( FD_UNLIKELY( (payload_len==126 || payload_len==127) && (opcode==0x8 || opcode==0x9 || opcode==0xA) ) ) {
722 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_WS_CONTROL_FRAME_TOO_LARGE );
723 0 : return;
724 0 : }
725 :
726 0 : ulong len_bytes;
727 0 : if( FD_LIKELY( payload_len<126UL ) ) {
728 0 : len_bytes = 1UL;
729 0 : } else if( FD_LIKELY( payload_len==126 ) ) {
730 0 : if( FD_UNLIKELY( conn->recv_bytes_read<4UL ) ) return; /* Need at least 4 bytes to determine frame length */
731 0 : payload_len = ((ulong)conn->recv_bytes[ conn->recv_bytes_parsed+2UL ]<<8UL) | (ulong)conn->recv_bytes[ conn->recv_bytes_parsed+3UL ];
732 0 : len_bytes = 3UL;
733 0 : } else if( FD_LIKELY( payload_len==127 ) ) {
734 0 : if( FD_UNLIKELY( conn->recv_bytes_read<10UL ) ) return; /* Need at least 10 bytes to determine frame length */
735 0 : payload_len = ((ulong)conn->recv_bytes[ conn->recv_bytes_parsed+2UL ]<<56UL) | ((ulong)conn->recv_bytes[ conn->recv_bytes_parsed+3UL ]<<48UL) | ((ulong)conn->recv_bytes[ conn->recv_bytes_parsed+4UL ]<<40UL) | ((ulong)conn->recv_bytes[ conn->recv_bytes_parsed+5UL ]<<32UL) |
736 0 : ((ulong)conn->recv_bytes[ conn->recv_bytes_parsed+6UL ]<<24UL) | ((ulong)conn->recv_bytes[ conn->recv_bytes_parsed+7UL ]<<16UL) | ((ulong)conn->recv_bytes[ conn->recv_bytes_parsed+8UL ]<<8UL ) | (ulong)conn->recv_bytes[ conn->recv_bytes_parsed+9UL ];
737 0 : len_bytes = 9UL;
738 0 : } else {
739 0 : FD_LOG_ERR(( "unexpected payload_len %lu", payload_len )); /* Silence clang sanitizer, not possible */
740 0 : }
741 :
742 0 : ulong header_len = 1UL+len_bytes+4UL;
743 0 : ulong frame_len = header_len+payload_len;
744 0 : if( FD_UNLIKELY( frame_len<header_len ) ) { /* Overflow */
745 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_WS_OVERSIZE_FRAME );
746 0 : return;
747 0 : }
748 :
749 0 : if( FD_UNLIKELY( conn->recv_bytes_parsed+frame_len+1UL>http->max_ws_recv_frame_len ) ) {
750 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_WS_OVERSIZE_FRAME );
751 0 : return;
752 0 : }
753 :
754 0 : if( FD_UNLIKELY( conn->recv_bytes_read<frame_len ) ) return; /* Need more data to read the full frame */
755 :
756 : /* Data frame, process it */
757 :
758 0 : int is_fin_set = conn->recv_bytes[ conn->recv_bytes_parsed+0UL ] & 0x80;
759 :
760 0 : uchar * mask = conn->recv_bytes+conn->recv_bytes_parsed+1UL+len_bytes;
761 0 : uchar mask_copy[ 4 ] = { mask[ 0 ], mask[ 1 ], mask[ 2 ], mask[ 3 ] }; /* Bytes will be overwritten by the memmove below */
762 :
763 0 : uchar * payload = conn->recv_bytes+conn->recv_bytes_parsed+header_len;
764 0 : for( ulong i=0UL; i<payload_len; i++ ) conn->recv_bytes[ conn->recv_bytes_parsed+i ] = payload[ i ] ^ mask_copy[ i % 4 ];
765 :
766 0 : http->metrics.frames_read++;
767 :
768 : /* Frame is complete, process it */
769 :
770 0 : if( FD_UNLIKELY( opcode==0x8 ) ) {
771 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_PEER_RESET );
772 0 : return;
773 0 : } else if( FD_UNLIKELY( opcode==0x9 ) ) {
774 : /* Ping frame, queue pong unless we are already sending one */
775 0 : if( FD_LIKELY( conn->pong_state!=FD_HTTP_SERVER_PONG_STATE_WAITING ) ) {
776 0 : conn->pong_state = FD_HTTP_SERVER_PONG_STATE_WAITING;
777 0 : conn->pong_data_len = payload_len;
778 0 : FD_TEST( payload_len<=125UL );
779 0 : memcpy( conn->pong_data, conn->recv_bytes+conn->recv_bytes_parsed, payload_len );
780 0 : }
781 0 : if( FD_UNLIKELY( conn->recv_bytes_read-frame_len ) ) {
782 0 : memmove( conn->recv_bytes, conn->recv_bytes+conn->recv_bytes_parsed+frame_len, conn->recv_bytes_read-frame_len );
783 0 : }
784 0 : conn->recv_bytes_parsed = 0UL;
785 0 : conn->recv_bytes_read -= frame_len;
786 0 : return;
787 0 : } else if( FD_UNLIKELY( opcode==0xA ) ) {
788 : /* Pong frame, ignore */
789 0 : if( FD_UNLIKELY( conn->recv_bytes_read-frame_len ) ) {
790 0 : memmove( conn->recv_bytes, conn->recv_bytes+conn->recv_bytes_parsed+frame_len, conn->recv_bytes_read-frame_len );
791 0 : }
792 0 : conn->recv_bytes_parsed = 0UL;
793 0 : conn->recv_bytes_read -= frame_len;
794 0 : return;
795 0 : }
796 :
797 0 : if( FD_UNLIKELY( conn->recv_started_msg && opcode!=0x0 ) ) {
798 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_WS_EXPECTED_CONT_OPCODE );
799 0 : return;
800 0 : }
801 :
802 0 : if( FD_UNLIKELY( !conn->recv_started_msg && opcode!=0x1 && opcode!=0x2 ) ) {
803 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_WS_EXPECTED_TEXT_OPCODE );
804 0 : return;
805 0 : }
806 :
807 0 : if( FD_UNLIKELY( conn->recv_started_msg && opcode!=conn->recv_last_opcode ) ) {
808 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_WS_CHANGED_OPCODE );
809 0 : return;
810 0 : }
811 0 : conn->recv_last_opcode = opcode;
812 :
813 : /* Check if this is a complete message */
814 :
815 0 : if( FD_UNLIKELY( !is_fin_set ) ) {
816 0 : conn->recv_started_msg = 1;
817 0 : conn->recv_bytes_read -= frame_len;
818 0 : conn->recv_bytes_parsed += payload_len;
819 0 : return; /* Not a complete message yet */
820 0 : }
821 :
822 : /* Complete message, process it */
823 :
824 0 : uchar * trailing_data = conn->recv_bytes+conn->recv_bytes_parsed+frame_len;
825 0 : ulong trailing_data_len = conn->recv_bytes_read-frame_len;
826 :
827 0 : conn->recv_bytes_parsed += payload_len;
828 0 : conn->recv_bytes_read -= frame_len;
829 :
830 0 : uchar tmp = conn->recv_bytes[ conn->recv_bytes_parsed ];
831 0 : conn->recv_bytes[ conn->recv_bytes_parsed ] = 0; /* NUL terminate */
832 0 : http->callbacks.ws_message( conn_idx-http->max_conns, conn->recv_bytes, conn->recv_bytes_parsed, http->callback_ctx );
833 0 : if( FD_UNLIKELY( -1==http->pollfds[ conn_idx ].fd ) ) return; /* Connection was closed by callback */
834 0 : conn->recv_bytes[ conn->recv_bytes_parsed ] = tmp;
835 :
836 0 : conn->recv_started_msg = 0;
837 0 : conn->recv_bytes_parsed = 0UL;
838 0 : if( FD_UNLIKELY( trailing_data_len ) ) {
839 0 : memmove( conn->recv_bytes, trailing_data, trailing_data_len );
840 0 : goto again; /* Might be another message in the buffer to process */
841 0 : }
842 0 : }
843 :
844 : static void
845 : read_conn( fd_http_server_t * http,
846 15 : ulong conn_idx ) {
847 15 : if( FD_LIKELY( conn_idx<http->max_conns ) ) read_conn_http( http, conn_idx );
848 0 : else read_conn_ws( http, conn_idx );
849 15 : }
850 :
851 : static void
852 : write_conn_http( fd_http_server_t * http,
853 0 : ulong conn_idx ) {
854 0 : struct fd_http_server_connection * conn = &http->conns[ conn_idx ];
855 :
856 0 : char header_buf[ 1024 ];
857 :
858 0 : uchar const * response;
859 0 : ulong response_len;
860 0 : switch( conn->state ) {
861 0 : case FD_HTTP_SERVER_CONNECTION_STATE_READING:
862 0 : return; /* No data staged for write yet. */
863 0 : case FD_HTTP_SERVER_CONNECTION_STATE_WRITING_HEADER:
864 0 : switch( conn->response.status ) {
865 0 : case 200:
866 0 : if( FD_UNLIKELY( conn->response.upgrade_websocket ) ) {
867 0 : if( FD_UNLIKELY( !conn->upgrade_websocket ) ) {
868 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_WS_MISSING_UPGRADE );
869 0 : return;
870 0 : }
871 :
872 0 : uchar sec_websocket_key[ 60 ];
873 0 : fd_memcpy( sec_websocket_key, conn->sec_websocket_key, 24 );
874 0 : fd_memcpy( sec_websocket_key+24, "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", 36 );
875 :
876 0 : uchar sec_websocket_accept[ 20 ];
877 0 : fd_sha1_hash( sec_websocket_key, 60, sec_websocket_accept );
878 0 : char sec_websocket_accept_base64[ FD_BASE64_ENC_SZ( 20 ) ];
879 0 : ulong encoded_len = fd_base64_encode( sec_websocket_accept_base64, sec_websocket_accept, 20 );
880 0 : FD_TEST( fd_cstr_printf_check( header_buf, sizeof( header_buf ), &response_len, "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: %.*s\r\n", (int)encoded_len, sec_websocket_accept_base64 ) );
881 0 : } else {
882 0 : ulong body_len = conn->response.static_body ? conn->response.static_body_len : conn->response._body_len;
883 0 : FD_TEST( fd_cstr_printf_check( header_buf, sizeof( header_buf ), &response_len, "HTTP/1.1 200 OK\r\nContent-Length: %lu\r\nConnection: close\r\n", body_len ) );
884 0 : }
885 0 : break;
886 0 : case 204: {
887 0 : ulong body_len = conn->response.static_body ? conn->response.static_body_len : conn->response._body_len;
888 0 : FD_TEST( fd_cstr_printf_check( header_buf, sizeof( header_buf ), &response_len, "HTTP/1.1 204 No Content\r\nContent-Length: %lu\r\n", body_len ) );
889 0 : break;
890 0 : }
891 0 : case 400: {
892 0 : ulong body_len = conn->response.static_body ? conn->response.static_body_len : conn->response._body_len;
893 0 : FD_TEST( fd_cstr_printf_check( header_buf, sizeof( header_buf ), &response_len, "HTTP/1.1 400 Bad Request\r\nContent-Length: %lu\r\n", body_len ) );
894 0 : break;
895 0 : }
896 0 : case 404:
897 0 : FD_TEST( fd_cstr_printf_check( header_buf, sizeof( header_buf ), &response_len, "HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\n" ) );
898 0 : break;
899 0 : case 405:
900 0 : FD_TEST( fd_cstr_printf_check( header_buf, sizeof( header_buf ), &response_len, "HTTP/1.1 405 Method Not Allowed\r\nContent-Length: 0\r\n" ) );
901 0 : break;
902 0 : case 500:
903 0 : FD_TEST( fd_cstr_printf_check( header_buf, sizeof( header_buf ), &response_len, "HTTP/1.1 500 Internal Server Error\r\nContent-Length: 0\r\n" ) );
904 0 : break;
905 0 : default:
906 0 : FD_TEST( fd_cstr_printf_check( header_buf, sizeof( header_buf ), &response_len, "HTTP/1.1 500 Internal Server Error\r\nContent-Length: 0\r\n" ) );
907 0 : break;
908 0 : }
909 :
910 0 : if( FD_LIKELY( conn->response.compress_websocket ) ) {
911 0 : ulong compress_websocket_len;
912 0 : FD_TEST( fd_cstr_printf_check( header_buf+response_len, sizeof( header_buf )-response_len, &compress_websocket_len, "Sec-WebSocket-Protocol: compress-zstd\r\n" ) );
913 0 : response_len += compress_websocket_len;
914 0 : }
915 0 : if( FD_LIKELY( conn->response.content_type ) ) {
916 0 : ulong content_type_len;
917 0 : FD_TEST( fd_cstr_printf_check( header_buf+response_len, sizeof( header_buf )-response_len, &content_type_len, "Content-Type: %s\r\n", conn->response.content_type ) );
918 0 : response_len += content_type_len;
919 0 : }
920 0 : if( FD_LIKELY( conn->response.cache_control ) ) {
921 0 : ulong cache_control_len;
922 0 : FD_TEST( fd_cstr_printf_check( header_buf+response_len, sizeof( header_buf )-response_len, &cache_control_len, "Cache-Control: %s\r\n", conn->response.cache_control ) );
923 0 : response_len += cache_control_len;
924 0 : }
925 0 : if( FD_LIKELY( conn->response.content_encoding ) ) {
926 0 : ulong content_encoding_len;
927 0 : FD_TEST( fd_cstr_printf_check( header_buf+response_len, sizeof( header_buf )-response_len, &content_encoding_len, "Content-Encoding: %s\r\n", conn->response.content_encoding ) );
928 0 : response_len += content_encoding_len;
929 0 : }
930 0 : if( FD_LIKELY( conn->response.access_control_allow_origin ) ) {
931 0 : ulong access_control_allow_origin_len;
932 0 : FD_TEST( fd_cstr_printf_check( header_buf+response_len, sizeof( header_buf )-response_len, &access_control_allow_origin_len, "Access-Control-Allow-Origin: %s\r\n", conn->response.access_control_allow_origin ) );
933 0 : response_len += access_control_allow_origin_len;
934 0 : }
935 0 : if( FD_LIKELY( conn->response.access_control_allow_methods ) ) {
936 0 : ulong access_control_allow_methods_len;
937 0 : FD_TEST( fd_cstr_printf_check( header_buf+response_len, sizeof( header_buf )-response_len, &access_control_allow_methods_len, "Access-Control-Allow-Methods: %s\r\n", conn->response.access_control_allow_methods ) );
938 0 : response_len += access_control_allow_methods_len;
939 0 : }
940 0 : if( FD_LIKELY( conn->response.access_control_allow_headers ) ) {
941 0 : ulong access_control_allow_headers_len;
942 0 : FD_TEST( fd_cstr_printf_check( header_buf+response_len, sizeof( header_buf )-response_len, &access_control_allow_headers_len, "Access-Control-Allow-Headers: %s\r\n", conn->response.access_control_allow_headers ) );
943 0 : response_len += access_control_allow_headers_len;
944 0 : }
945 0 : if( FD_LIKELY( conn->response.access_control_max_age ) ) {
946 0 : ulong access_control_max_age_len;
947 0 : FD_TEST( fd_cstr_printf_check( header_buf+response_len, sizeof( header_buf )-response_len, &access_control_max_age_len, "Access-Control-Max-Age: %lu\r\n", conn->response.access_control_max_age ) );
948 0 : response_len += access_control_max_age_len;
949 0 : }
950 0 : FD_TEST( fd_cstr_printf_check( header_buf+response_len, sizeof( header_buf )-response_len, NULL, "\r\n" ) );
951 0 : response_len += 2UL;
952 :
953 0 : response = (uchar const *)header_buf;
954 0 : break;
955 0 : case FD_HTTP_SERVER_CONNECTION_STATE_WRITING_BODY:
956 0 : if( FD_UNLIKELY( conn->response.static_body ) ) {
957 0 : response = conn->response.static_body;
958 0 : response_len = conn->response.static_body_len;
959 0 : } else {
960 0 : response = http->oring+(conn->response._body_off%http->oring_sz);
961 0 : response_len = conn->response._body_len;
962 0 : }
963 0 : break;
964 0 : default:
965 0 : FD_LOG_ERR(( "invalid server state (%d)", conn->state ));
966 0 : }
967 :
968 0 : long sz = send( http->pollfds[ conn_idx ].fd, response+conn->response_bytes_written, response_len-conn->response_bytes_written, MSG_NOSIGNAL );
969 0 : if( FD_UNLIKELY( -1==sz && errno==EAGAIN ) ) return; /* No data was written, continue. */
970 0 : if( FD_UNLIKELY( -1==sz && is_expected_network_error( errno ) ) ) {
971 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_PEER_RESET );
972 0 : return;
973 0 : }
974 0 : if( FD_UNLIKELY( -1==sz ) ) FD_LOG_ERR(( "write failed (%i-%s)", errno, strerror( errno ) )); /* Unexpected programmer error, abort */
975 :
976 0 : http->metrics.bytes_written += (ulong)sz;
977 0 : conn->response_bytes_written += (ulong)sz;
978 0 : if( FD_UNLIKELY( conn->response_bytes_written==response_len ) ) {
979 0 : switch( conn->state ) {
980 0 : case FD_HTTP_SERVER_CONNECTION_STATE_WRITING_HEADER:
981 0 : if( FD_UNLIKELY( conn->response.upgrade_websocket ) ) {
982 0 : if( FD_UNLIKELY( !conn->upgrade_websocket ) ) {
983 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_WS_MISSING_UPGRADE );
984 0 : return;
985 0 : }
986 :
987 0 : int fd = http->pollfds[ conn_idx ].fd;
988 0 : http->pollfds[ conn_idx ].fd = -1;
989 :
990 0 : struct fd_http_server_connection * conn = &http->conns[ conn_idx ];
991 :
992 0 : int ws_compress = conn->response.compress_websocket;
993 0 : ulong req_bytes_read = conn->request_bytes_read;
994 0 : ulong req_bytes_len = conn->request_bytes_len;
995 :
996 0 : if( FD_LIKELY( !conn->response.static_body ) ) conn_treap_ele_remove( http->conn_treap, conn, http->conns );
997 0 : conn_pool_ele_release( http->conns, conn );
998 :
999 0 : if( FD_UNLIKELY( !ws_conn_pool_free( http->ws_conns ) ) ) {
1000 0 : ws_conn_treap_rev_iter_t it = ws_conn_treap_rev_iter_init( http->ws_conn_treap, http->ws_conns );
1001 0 : if( FD_LIKELY( !ws_conn_treap_rev_iter_done( it ) ) ) {
1002 0 : ulong ws_conn_id = ws_conn_treap_rev_iter_idx( it );
1003 0 : close_conn( http, http->max_conns+ws_conn_id, FD_HTTP_SERVER_CONNECTION_CLOSE_EVICTED );
1004 0 : } else {
1005 0 : close_conn( http, http->max_conns+http->evict_ws_conn_id, FD_HTTP_SERVER_CONNECTION_CLOSE_EVICTED );
1006 0 : http->evict_ws_conn_id = (http->evict_ws_conn_id+1UL) % http->max_ws_conns;
1007 0 : }
1008 0 : }
1009 :
1010 0 : ulong ws_conn_id = ws_conn_pool_idx_acquire( http->ws_conns );
1011 0 : http->pollfds[ http->max_conns+ws_conn_id ].fd = fd;
1012 :
1013 0 : http->ws_conns[ ws_conn_id ].pong_state = FD_HTTP_SERVER_PONG_STATE_NONE;
1014 0 : http->ws_conns[ ws_conn_id ].send_frame_cnt = 0UL;
1015 0 : http->ws_conns[ ws_conn_id ].send_frame_state = FD_HTTP_SERVER_SEND_FRAME_STATE_HEADER;
1016 0 : http->ws_conns[ ws_conn_id ].send_frame_idx = 0UL;
1017 0 : http->ws_conns[ ws_conn_id ].recv_started_msg = 0;
1018 0 : http->ws_conns[ ws_conn_id ].recv_bytes_parsed = 0UL;
1019 0 : http->ws_conns[ ws_conn_id ].recv_bytes_read = 0UL;
1020 0 : http->ws_conns[ ws_conn_id ].send_frame_bytes_written = 0UL;
1021 0 : http->ws_conns[ ws_conn_id ].compress_websocket = ws_compress;
1022 :
1023 0 : http->metrics.connection_cnt--;
1024 0 : http->metrics.ws_connection_cnt++;
1025 :
1026 : /* Trailing data after the HTTP request was already rejected
1027 : in read_conn_http, so req_bytes_read==req_bytes_len. */
1028 0 : FD_TEST( req_bytes_read==req_bytes_len );
1029 :
1030 : #if FD_HTTP_SERVER_DEBUG
1031 : FD_LOG_WARNING(( "Upgraded connection %lu (fd=%d) to websocket connection %lu", conn_idx, fd, ws_conn_id ));
1032 : #endif
1033 :
1034 0 : if( FD_LIKELY( http->callbacks.ws_open ) ) http->callbacks.ws_open( ws_conn_id, http->callback_ctx );
1035 0 : } else {
1036 0 : conn->state = FD_HTTP_SERVER_CONNECTION_STATE_WRITING_BODY;
1037 0 : conn->response_bytes_written = 0UL;
1038 0 : }
1039 0 : break;
1040 0 : case FD_HTTP_SERVER_CONNECTION_STATE_WRITING_BODY:
1041 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_OK );
1042 0 : break;
1043 0 : }
1044 0 : }
1045 0 : }
1046 :
1047 : static int
1048 : maybe_write_pong( fd_http_server_t * http,
1049 0 : ulong conn_idx ) {
1050 0 : struct fd_http_server_ws_connection * conn = &http->ws_conns[ conn_idx-http->max_conns ];
1051 :
1052 : /* No need to pong if ....
1053 :
1054 : Client has not sent a ping */
1055 0 : if( FD_LIKELY( conn->pong_state==FD_HTTP_SERVER_PONG_STATE_NONE ) ) return 0;
1056 : /* We are in the middle of writing a data frame */
1057 0 : if( FD_LIKELY( conn->send_frame_cnt && (conn->send_frame_state==FD_HTTP_SERVER_SEND_FRAME_STATE_DATA || conn->send_frame_bytes_written ) ) ) return 0;
1058 :
1059 : /* Otherwise, we need to pong */
1060 0 : if( FD_LIKELY( conn->pong_state==FD_HTTP_SERVER_PONG_STATE_WAITING ) ) {
1061 0 : conn->pong_state = FD_HTTP_SERVER_PONG_STATE_WRITING;
1062 0 : conn->pong_bytes_written = 0UL;
1063 0 : }
1064 :
1065 0 : uchar frame[ 2UL+125UL ];
1066 0 : frame[ 0 ] = 0x80 | 0x0A; /* FIN, 0xA for pong. */
1067 0 : frame[ 1 ] = (uchar)conn->pong_data_len;
1068 0 : fd_memcpy( frame+2UL, conn->pong_data, conn->pong_data_len );
1069 :
1070 0 : long sz = send( http->pollfds[ conn_idx ].fd, frame+conn->pong_bytes_written, 2UL+conn->pong_data_len-conn->pong_bytes_written, MSG_NOSIGNAL );
1071 0 : if( FD_UNLIKELY( -1==sz && errno==EAGAIN ) ) return 1; /* No data was written, continue. */
1072 0 : else if( FD_UNLIKELY( -1==sz && is_expected_network_error( errno ) ) ) {
1073 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_PEER_RESET );
1074 0 : return 1;
1075 0 : }
1076 0 : else if( FD_UNLIKELY( -1==sz ) ) FD_LOG_ERR(( "write failed (%i-%s)", errno, strerror( errno ) )); /* Unexpected programmer error, abort */
1077 :
1078 0 : http->metrics.bytes_written += (ulong)sz;
1079 0 : conn->pong_bytes_written += (ulong)sz;
1080 0 : if( FD_UNLIKELY( conn->pong_bytes_written==2UL+conn->pong_data_len ) ) {
1081 0 : conn->pong_state = FD_HTTP_SERVER_PONG_STATE_NONE;
1082 0 : return 0;
1083 0 : }
1084 :
1085 0 : return 1;
1086 0 : }
1087 :
1088 : static void
1089 : write_conn_ws( fd_http_server_t * http,
1090 0 : ulong conn_idx ) {
1091 0 : struct fd_http_server_ws_connection * conn = &http->ws_conns[ conn_idx-http->max_conns ];
1092 :
1093 0 : if( FD_UNLIKELY( maybe_write_pong( http, conn_idx ) ) ) return;
1094 0 : if( FD_UNLIKELY( !conn->send_frame_cnt ) ) return;
1095 :
1096 0 : struct iovec iovecs[ 512UL*2UL ];
1097 0 : uchar headers[ 512UL ][ 10UL ];
1098 :
1099 0 : ulong batch_cnt = fd_ulong_min( conn->send_frame_cnt, 512UL );
1100 0 : ulong out_idx = 0UL;
1101 0 : for( ulong i=0UL; i<batch_cnt; i++ ) {
1102 0 : fd_http_server_ws_frame_t * frame = &conn->send_frames[ (conn->send_frame_idx+i) % http->max_ws_send_frame_cnt ];
1103 0 : if( FD_UNLIKELY( i || conn->send_frame_state==FD_HTTP_SERVER_SEND_FRAME_STATE_HEADER ) ) {
1104 0 : ulong header_len;
1105 0 : headers[ i ][ 0 ] = 0x80 | fd_uchar_if(frame->compressed, 0x02, 0x01); /* FIN, 0x1 for text, 0x2 for binary */
1106 0 : if( FD_LIKELY( frame->len<126UL ) ) {
1107 0 : headers[ i ][ 1 ] = (uchar)frame->len;
1108 0 : header_len = 2UL;
1109 0 : } else if( FD_LIKELY( frame->len<65536UL ) ) {
1110 0 : headers[ i ][ 1 ] = 126;
1111 0 : headers[ i ][ 2 ] = (uchar)(frame->len>>8);
1112 0 : headers[ i ][ 3 ] = (uchar)(frame->len);
1113 0 : header_len = 4UL;
1114 0 : } else {
1115 0 : headers[ i ][ 1 ] = 127;
1116 0 : headers[ i ][ 2 ] = (uchar)(frame->len>>56);
1117 0 : headers[ i ][ 3 ] = (uchar)(frame->len>>48);
1118 0 : headers[ i ][ 4 ] = (uchar)(frame->len>>40);
1119 0 : headers[ i ][ 5 ] = (uchar)(frame->len>>32);
1120 0 : headers[ i ][ 6 ] = (uchar)(frame->len>>24);
1121 0 : headers[ i ][ 7 ] = (uchar)(frame->len>>16);
1122 0 : headers[ i ][ 8 ] = (uchar)(frame->len>>8);
1123 0 : headers[ i ][ 9 ] = (uchar)(frame->len);
1124 0 : header_len = 10UL;
1125 0 : }
1126 :
1127 0 : ulong header_bytes_written = fd_ulong_if( i==0UL, conn->send_frame_bytes_written, 0UL );
1128 :
1129 0 : iovecs[ out_idx ].iov_base = headers[ i ]+header_bytes_written;
1130 0 : iovecs[ out_idx ].iov_len = header_len-header_bytes_written;
1131 0 : out_idx++;
1132 0 : }
1133 :
1134 0 : ulong data_bytes_written = fd_ulong_if( i==0UL && conn->send_frame_state==FD_HTTP_SERVER_SEND_FRAME_STATE_DATA, conn->send_frame_bytes_written, 0UL );
1135 0 : iovecs[ out_idx ].iov_base = http->oring+(frame->off%http->oring_sz)+data_bytes_written;
1136 0 : iovecs[ out_idx ].iov_len = frame->len-data_bytes_written;
1137 0 : out_idx++;
1138 0 : }
1139 :
1140 0 : struct msghdr msg = {0};
1141 0 : msg.msg_iov = iovecs;
1142 0 : msg.msg_iovlen = out_idx;
1143 :
1144 0 : long sz = sendmsg( http->pollfds[ conn_idx ].fd, &msg, MSG_NOSIGNAL );
1145 0 : if( FD_UNLIKELY( -1==sz && errno==EAGAIN ) ) return; /* No data was written, continue. */
1146 0 : else if( FD_UNLIKELY( -1==sz && is_expected_network_error( errno ) ) ) {
1147 0 : close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_PEER_RESET );
1148 0 : return;
1149 0 : }
1150 0 : else if( FD_UNLIKELY( -1==sz ) ) FD_LOG_ERR(( "write failed (%i-%s)", errno, fd_io_strerror( errno ) )); /* Unexpected programmer error, abort */
1151 :
1152 0 : ulong sent = (ulong)sz;
1153 0 : http->metrics.bytes_written += sent;
1154 :
1155 0 : for( ulong i=0UL; i<out_idx; i++ ) {
1156 0 : ulong iov_len = iovecs[ i ].iov_len;
1157 0 : if( FD_LIKELY( sent>=iov_len ) ) {
1158 0 : conn->send_frame_bytes_written = 0UL;
1159 :
1160 0 : if( FD_LIKELY( conn->send_frame_state==FD_HTTP_SERVER_SEND_FRAME_STATE_DATA ) ) {
1161 0 : conn->send_frame_state = FD_HTTP_SERVER_SEND_FRAME_STATE_HEADER;
1162 0 : conn->send_frame_idx = (conn->send_frame_idx+1UL) % http->max_ws_send_frame_cnt;
1163 0 : conn->send_frame_cnt--;
1164 :
1165 0 : ws_conn_treap_ele_remove( http->ws_conn_treap, conn, http->ws_conns );
1166 0 : if( FD_LIKELY( conn->send_frame_cnt ) ) ws_conn_treap_ele_insert( http->ws_conn_treap, conn, http->ws_conns );
1167 :
1168 0 : http->metrics.frames_written++;
1169 0 : } else {
1170 0 : conn->send_frame_state = FD_HTTP_SERVER_SEND_FRAME_STATE_DATA;
1171 0 : }
1172 :
1173 0 : sent -= iov_len;
1174 0 : } else {
1175 0 : conn->send_frame_bytes_written += sent;
1176 0 : break;
1177 0 : }
1178 0 : }
1179 0 : }
1180 :
1181 : static void
1182 : write_conn( fd_http_server_t * http,
1183 0 : ulong conn_idx ) {
1184 0 : if( FD_LIKELY( conn_idx<http->max_conns ) ) write_conn_http( http, conn_idx );
1185 0 : else write_conn_ws( http, conn_idx );
1186 0 : }
1187 :
1188 : int
1189 : fd_http_server_poll( fd_http_server_t * http,
1190 30 : int poll_timeout ) {
1191 30 : int nfds = fd_syscall_poll( http->pollfds, (uint)( http->max_conns+http->max_ws_conns+1UL ), poll_timeout );
1192 30 : if( FD_UNLIKELY( 0==nfds ) ) return 0;
1193 30 : else if( FD_UNLIKELY( -1==nfds && errno==EINTR ) ) return 0;
1194 30 : else if( FD_UNLIKELY( -1==nfds ) ) FD_LOG_ERR(( "poll failed (%i-%s)", errno, strerror( errno ) ));
1195 :
1196 : /* Always check the listener socket for new connections. */
1197 30 : ulong listener_idx = http->max_conns+http->max_ws_conns;
1198 30 : if( FD_UNLIKELY( http->pollfds[ listener_idx ].fd!=-1 && (http->pollfds[ listener_idx ].revents & POLLIN) ) ) {
1199 15 : accept_conns( http );
1200 15 : }
1201 :
1202 : /* Service existing connections in chunks of
1203 : FD_HTTP_SERVER_POLL_CHUNK_SZ to bound the amount of work done per
1204 : poll call. poll_conn_idx tracks where we left off so all
1205 : connections get serviced fairly over multiple calls. */
1206 30 : ulong conn_cnt = http->max_conns+http->max_ws_conns;
1207 30 : ulong start = http->poll_conn_idx;
1208 30 : ulong end = start+FD_HTTP_SERVER_POLL_CHUNK_SZ;
1209 30 : if( FD_UNLIKELY( end>conn_cnt ) ) end = conn_cnt;
1210 :
1211 84 : for( ulong i=start; i<end; i++ ) {
1212 54 : if( FD_UNLIKELY( -1==http->pollfds[ i ].fd ) ) continue;
1213 30 : if( FD_LIKELY( http->pollfds[ i ].revents & POLLIN ) ) read_conn( http, i );
1214 30 : if( FD_UNLIKELY( -1==http->pollfds[ i ].fd ) ) continue;
1215 15 : if( FD_LIKELY( http->pollfds[ i ].revents & POLLOUT ) ) write_conn( http, i );
1216 : /* No need to handle POLLHUP, read() will return 0 soon enough. */
1217 15 : }
1218 :
1219 30 : http->poll_conn_idx = fd_ulong_if( end>=conn_cnt, 0UL, end );
1220 :
1221 30 : return 1;
1222 30 : }
1223 :
1224 : static void
1225 : fd_http_server_evict_until( fd_http_server_t * http,
1226 87687 : ulong off ) {
1227 87687 : conn_treap_fwd_iter_t next;
1228 87687 : for( conn_treap_fwd_iter_t it=conn_treap_fwd_iter_init( http->conn_treap, http->conns ); !conn_treap_fwd_iter_done( it ); it=next ) {
1229 0 : next = conn_treap_fwd_iter_next( it, http->conns );
1230 0 : struct fd_http_server_connection * conn = conn_treap_fwd_iter_ele( it, http->conns );
1231 :
1232 0 : if( FD_UNLIKELY( conn->response._body_off<off ) ) {
1233 0 : close_conn( http, conn_treap_fwd_iter_idx( it ), FD_HTTP_SERVER_CONNECTION_CLOSE_EVICTED );
1234 0 : } else {
1235 0 : break;
1236 0 : }
1237 0 : }
1238 :
1239 87687 : ws_conn_treap_fwd_iter_t ws_next;
1240 87687 : for( ws_conn_treap_fwd_iter_t it=ws_conn_treap_fwd_iter_init( http->ws_conn_treap, http->ws_conns ); !ws_conn_treap_fwd_iter_done( it ); it=ws_next ) {
1241 0 : ws_next = ws_conn_treap_fwd_iter_next( it, http->ws_conns );
1242 0 : struct fd_http_server_ws_connection * conn = ws_conn_treap_fwd_iter_ele( it, http->ws_conns );
1243 :
1244 0 : if( FD_UNLIKELY( conn->send_frames[ conn->send_frame_idx ].off<off ) ) {
1245 0 : close_conn( http, ws_conn_treap_fwd_iter_idx( it )+http->max_conns, FD_HTTP_SERVER_CONNECTION_CLOSE_WS_CLIENT_TOO_SLOW );
1246 0 : } else {
1247 0 : break;
1248 0 : }
1249 0 : }
1250 87687 : }
1251 :
1252 : static void
1253 : fd_http_server_reserve( fd_http_server_t * http,
1254 87762 : ulong len ) {
1255 : /* fd_http_server_reserve should not be called after
1256 : fd_http_ws_compress_maybe */
1257 87762 : FD_TEST( http->stage_comp_len == 0 );
1258 :
1259 87762 : ulong remaining = http->oring_sz-((http->stage_off%http->oring_sz)+http->stage_len);
1260 87762 : if( FD_UNLIKELY( len>remaining ) ) {
1261 : /* Appending the format string into the hcache would go past the end
1262 : of the buffer... two cases, */
1263 15369 : if( FD_UNLIKELY( http->stage_len+len>http->oring_sz ) ) {
1264 : /* Case 1: The snap is going to be larger than the entire buffer,
1265 : there's no way to fit it even if we evict everything
1266 : else. Mark the hcache as errored and exit. */
1267 :
1268 75 : FD_LOG_WARNING(( "tried to reserve %lu bytes for an outgoing message which exceeds the entire data size", http->stage_len+len ));
1269 75 : FD_LOG_HEXDUMP_WARNING(( "start of message", http->oring+(http->stage_off%http->oring_sz), fd_ulong_min( 500UL, http->oring_sz-(http->stage_off%http->oring_sz) ) ));
1270 75 : FD_LOG_HEXDUMP_WARNING(( "start of buffer", http->oring, fd_ulong_min( 500UL, http->oring_sz ) ));
1271 75 : http->stage_err = 1;
1272 75 : return;
1273 15294 : } else {
1274 : /* Case 2: The snap can fit if we relocate it to the start of the
1275 : buffer and evict whatever was there. We also evict the
1276 : rest of the buffer behind where the snap was to
1277 : preserve the invariant that snaps are always evicted in
1278 : circular order. */
1279 :
1280 15294 : ulong stage_end = http->stage_off+remaining+http->stage_len+len;
1281 15294 : ulong clamp = fd_ulong_if( stage_end>=http->oring_sz, stage_end-http->oring_sz, 0UL );
1282 15294 : fd_http_server_evict_until( http, clamp );
1283 15294 : memmove( http->oring, http->oring+(http->stage_off%http->oring_sz), http->stage_len );
1284 15294 : http->stage_off += http->stage_len+remaining;
1285 15294 : }
1286 72393 : } else {
1287 : /* The snap can fit in the buffer, we just need to evict whatever
1288 : was there before. */
1289 72393 : ulong stage_end = http->stage_off+http->stage_len+len;
1290 72393 : ulong clamp = fd_ulong_if( stage_end>=http->oring_sz, stage_end-http->oring_sz, 0UL );
1291 72393 : fd_http_server_evict_until( http, clamp );
1292 72393 : }
1293 87762 : }
1294 :
1295 : static int
1296 0 : fd_http_ws_compress_maybe( fd_http_server_t * http ) {
1297 : /* we don't compress if the message is small, or if compression is
1298 : disabled in the config */
1299 0 : if( FD_LIKELY( !http->compress_websocket || http->stage_len <= 200 || http->stage_err ) ) return 0;
1300 :
1301 0 : #if FD_HAS_ZSTD
1302 0 : ulong worst_case_compressed_sz = ZSTD_compressBound( http->stage_len );
1303 0 : fd_http_server_reserve( http, worst_case_compressed_sz );
1304 :
1305 0 : if( FD_UNLIKELY( http->stage_err ) ) return 0;
1306 :
1307 0 : ulong compressed_sz = ZSTD_compress2( http->zstd_ctx, http->oring+(http->stage_off%http->oring_sz)+http->stage_len, worst_case_compressed_sz, http->oring+(http->stage_off%http->oring_sz), http->stage_len );
1308 0 : if( FD_UNLIKELY( ZSTD_isError( compressed_sz ) ) ) {
1309 0 : FD_LOG_WARNING(( "ZSTD_compress2 failed (%s)", ZSTD_getErrorName( compressed_sz ) ) );
1310 0 : http->stage_err = 1;
1311 0 : return 0;
1312 0 : }
1313 0 : FD_TEST( compressed_sz <= worst_case_compressed_sz );
1314 :
1315 0 : http->stage_comp_len = compressed_sz;
1316 :
1317 0 : return 1;
1318 : #else
1319 : return 0;
1320 : #endif
1321 0 : }
1322 :
1323 : uchar *
1324 : fd_http_server_append_start( fd_http_server_t * http,
1325 0 : ulong len ) {
1326 0 : fd_http_server_reserve( http, len );
1327 0 : if( FD_UNLIKELY( http->stage_err ) ) return NULL;
1328 0 : return http->oring+(http->stage_off%http->oring_sz)+http->stage_len;
1329 0 : }
1330 :
1331 : void
1332 : fd_http_server_append_end( fd_http_server_t * http,
1333 0 : ulong len ) {
1334 0 : http->stage_len += len;
1335 0 : }
1336 :
1337 : int
1338 : fd_http_server_ws_send( fd_http_server_t * http,
1339 0 : ulong ws_conn_id ) {
1340 0 : struct fd_http_server_ws_connection * conn = &http->ws_conns[ ws_conn_id ];
1341 0 : int compressed = conn->compress_websocket;
1342 0 : if( FD_LIKELY( compressed ) ) compressed = fd_http_ws_compress_maybe( http );
1343 :
1344 :
1345 0 : if( FD_UNLIKELY( http->stage_err ) ) {
1346 0 : http->stage_err = 0;
1347 0 : http->stage_len = 0;
1348 0 : http->stage_comp_len = 0;
1349 0 : return -1;
1350 0 : }
1351 :
1352 : /* It is possible that ws_conn_id has already been closed by
1353 : fd_http_server_reserve during staging. If the staging buffer is
1354 : full, the incoming frame is added to the beginning of the buffer,
1355 : and any connections that were previously using that allotted space
1356 : are closed. There is a small chance that ws_conn_id is one of
1357 : those connections, and has therefore already been closed. */
1358 0 : if( FD_LIKELY( http->pollfds[ http->max_conns+ws_conn_id ].fd==-1 ) ) {
1359 0 : http->stage_len = 0;
1360 0 : http->stage_comp_len = 0;
1361 0 : return 0;
1362 0 : }
1363 :
1364 0 : if( FD_UNLIKELY( conn->send_frame_cnt==http->max_ws_send_frame_cnt ) ) {
1365 0 : close_conn( http, ws_conn_id+http->max_conns, FD_HTTP_SERVER_CONNECTION_CLOSE_WS_CLIENT_TOO_SLOW );
1366 0 : http->stage_len = 0;
1367 0 : http->stage_comp_len = 0;
1368 0 : return 0;
1369 0 : }
1370 :
1371 : /* A frame is compressed only if the connection is configured to do
1372 : so, and if the compression step wasn't skipped (i.e. stage_len>200) */
1373 0 : fd_http_server_ws_frame_t frame = {
1374 0 : .off = fd_ulong_if(compressed, http->stage_off+http->stage_len, http->stage_off),
1375 0 : .len = fd_ulong_if(compressed, http->stage_comp_len, http->stage_len),
1376 0 : .compressed = compressed,
1377 0 : };
1378 :
1379 0 : conn->send_frames[ (conn->send_frame_idx+conn->send_frame_cnt) % http->max_ws_send_frame_cnt ] = frame;
1380 0 : conn->send_frame_cnt++;
1381 :
1382 0 : if( FD_LIKELY( conn->send_frame_cnt==1UL ) ) {
1383 0 : ws_conn_treap_ele_insert( http->ws_conn_treap, conn, http->ws_conns );
1384 0 : }
1385 :
1386 0 : http->stage_off += http->stage_len+http->stage_comp_len;
1387 0 : http->stage_len = 0;
1388 0 : http->stage_comp_len = 0;
1389 :
1390 0 : return 0;
1391 0 : }
1392 :
1393 : int
1394 0 : fd_http_server_ws_broadcast( fd_http_server_t * http ) {
1395 0 : int compressed = fd_http_ws_compress_maybe( http );
1396 :
1397 0 : if( FD_UNLIKELY( http->stage_err ) ) {
1398 0 : http->stage_err = 0;
1399 0 : http->stage_len = 0;
1400 0 : http->stage_comp_len = 0;
1401 0 : return -1;
1402 0 : }
1403 :
1404 0 : for( ulong i=0UL; i<http->max_ws_conns; i++ ) {
1405 0 : if( FD_LIKELY( http->pollfds[ http->max_conns+i ].fd==-1 ) ) continue;
1406 :
1407 0 : struct fd_http_server_ws_connection * conn = &http->ws_conns[ i ];
1408 0 : if( FD_UNLIKELY( conn->send_frame_cnt==http->max_ws_send_frame_cnt ) ) {
1409 0 : close_conn( http, i+http->max_conns, FD_HTTP_SERVER_CONNECTION_CLOSE_WS_CLIENT_TOO_SLOW );
1410 0 : continue;
1411 0 : }
1412 :
1413 0 : fd_http_server_ws_frame_t frame = {
1414 0 : .off = fd_ulong_if(conn->compress_websocket && compressed, http->stage_off+http->stage_len, http->stage_off),
1415 0 : .len = fd_ulong_if(conn->compress_websocket && compressed, http->stage_comp_len, http->stage_len),
1416 0 : .compressed = conn->compress_websocket && compressed,
1417 0 : };
1418 :
1419 0 : conn->send_frames[ (conn->send_frame_idx+conn->send_frame_cnt) % http->max_ws_send_frame_cnt ] = frame;
1420 0 : conn->send_frame_cnt++;
1421 :
1422 0 : if( FD_LIKELY( conn->send_frame_cnt==1UL ) ) {
1423 0 : ws_conn_treap_ele_insert( http->ws_conn_treap, conn, http->ws_conns );
1424 0 : }
1425 0 : }
1426 :
1427 0 : http->stage_off += http->stage_len+http->stage_comp_len;
1428 0 : http->stage_len = 0;
1429 0 : http->stage_comp_len = 0;
1430 :
1431 0 : return 0;
1432 0 : }
1433 :
1434 : void
1435 : fd_http_server_stage_trunc( fd_http_server_t * http,
1436 0 : ulong len ) {
1437 0 : http->stage_comp_len = 0;
1438 0 : http->stage_len = len;
1439 0 : }
1440 :
1441 : ulong
1442 0 : fd_http_server_stage_len( fd_http_server_t * http ) {
1443 0 : return http->stage_len;
1444 0 : }
1445 :
1446 : void
1447 : fd_http_server_printf( fd_http_server_t * http,
1448 : char const * fmt,
1449 88590 : ... ) {
1450 88590 : if( FD_UNLIKELY( http->stage_err ) ) return;
1451 :
1452 87762 : va_list ap;
1453 87762 : va_start( ap, fmt );
1454 87762 : ulong printed_len = (ulong)vsnprintf( NULL, 0UL, fmt, ap );
1455 87762 : va_end( ap );
1456 :
1457 : /* reserve enough for the NULL terminator */
1458 87762 : fd_http_server_reserve( http, printed_len+1UL );
1459 87762 : if( FD_UNLIKELY( http->stage_err ) ) return;
1460 :
1461 87762 : va_start( ap, fmt );
1462 87687 : vsnprintf( (char *)http->oring+(http->stage_off%http->oring_sz)+http->stage_len,
1463 87687 : INT_MAX, /* We already proved it's going to fit above */
1464 87687 : fmt,
1465 87687 : ap );
1466 87687 : va_end( ap );
1467 :
1468 87687 : http->stage_len += printed_len;
1469 87687 : }
1470 :
1471 : void
1472 : fd_http_server_memcpy( fd_http_server_t * http,
1473 : uchar const * data,
1474 0 : ulong data_len ) {
1475 0 : fd_http_server_reserve( http, data_len );
1476 0 : if( FD_UNLIKELY( http->stage_err ) ) return;
1477 :
1478 0 : fd_memcpy( (char *)http->oring+(http->stage_off%http->oring_sz)+http->stage_len,
1479 0 : data,
1480 0 : data_len );
1481 0 : http->stage_len += data_len;
1482 0 : }
1483 :
1484 : void
1485 6 : fd_http_server_unstage( fd_http_server_t * http ) {
1486 6 : http->stage_err = 0;
1487 6 : http->stage_len = 0UL;
1488 6 : http->stage_comp_len = 0UL;
1489 6 : }
1490 :
1491 : int
1492 : fd_http_server_stage_body( fd_http_server_t * http,
1493 21588 : fd_http_server_response_t * response ) {
1494 21588 : if( FD_UNLIKELY( http->stage_err ) ) {
1495 75 : http->stage_err = 0;
1496 75 : http->stage_len = 0UL;
1497 75 : http->stage_comp_len = 0UL;
1498 75 : return -1;
1499 75 : }
1500 :
1501 21513 : response->_body_off = http->stage_off;
1502 21513 : response->_body_len = http->stage_len;
1503 21513 : http->stage_off += http->stage_len;
1504 21513 : http->stage_len = 0;
1505 21513 : return 0;
1506 21588 : }
|