Line data Source code
1 : #define _GNU_SOURCE
2 : #include "fd_ipecho_server.h"
3 :
4 : #include "../../util/fd_util.h"
5 : #include "../../util/net/fd_ip4.h"
6 :
7 : #include <errno.h>
8 : #include <unistd.h>
9 : #include <sys/poll.h>
10 : #include <sys/socket.h>
11 : #include <netinet/in.h>
12 :
13 0 : #define STATE_READING (0)
14 0 : #define STATE_WRITING (1)
15 :
16 0 : #define CLOSE_OK ( 0)
17 0 : #define CLOSE_EXPECTED_EOF (-1)
18 0 : #define CLOSE_PEER_RESET (-2)
19 0 : #define CLOSE_LARGE_REQUEST (-3)
20 0 : #define CLOSE_BAD_HEADER (-4)
21 0 : #define CLOSE_BAD_TRAILER (-5)
22 0 : #define CLOSE_BAD_LENGTH (-6)
23 0 : #define CLOSE_EVICTED (-7)
24 :
25 : struct fd_ipecho_server_connection {
26 : int state;
27 :
28 : uint ipv4;
29 :
30 : ushort parent;
31 :
32 : ulong request_bytes_read;
33 : uchar request_bytes[ 22UL ];
34 : ulong response_bytes_written;
35 : uchar response_bytes[ 27UL ];
36 : };
37 :
38 : typedef struct fd_ipecho_server_connection fd_ipecho_server_connection_t;
39 :
40 : #define POOL_NAME conn_pool
41 0 : #define POOL_T fd_ipecho_server_connection_t
42 : #define POOL_IDX_T ushort
43 0 : #define POOL_NEXT parent
44 : #include "../../util/tmpl/fd_pool.c"
45 :
46 : struct fd_ipecho_server {
47 : int sockfd;
48 :
49 : ushort shred_version;
50 :
51 : ulong evict_idx;
52 : ulong max_connection_cnt;
53 :
54 : fd_ipecho_server_connection_t * pool;
55 : struct pollfd * pollfds;
56 :
57 : fd_ipecho_server_metrics_t metrics[ 1 ];
58 :
59 : ulong magic;
60 : };
61 :
62 : FD_FN_CONST ulong
63 0 : fd_ipecho_server_align( void ) {
64 0 : return 128UL;
65 0 : }
66 :
67 : FD_FN_CONST ulong
68 0 : fd_ipecho_server_footprint( ulong max_connection_cnt ) {
69 0 : ulong l = FD_LAYOUT_INIT;
70 0 : l = FD_LAYOUT_APPEND( l, fd_ipecho_server_align(), sizeof(fd_ipecho_server_t) );
71 0 : l = FD_LAYOUT_APPEND( l, conn_pool_align(), conn_pool_footprint( max_connection_cnt ) );
72 0 : l = FD_LAYOUT_APPEND( l, alignof(struct pollfd), (1UL+max_connection_cnt)*sizeof(struct pollfd) );
73 0 : return FD_LAYOUT_FINI( l, fd_ipecho_server_align() );
74 0 : }
75 :
76 : void *
77 : fd_ipecho_server_new( void * shmem,
78 0 : ulong max_connection_cnt ) {
79 0 : if( FD_UNLIKELY( !shmem ) ) {
80 0 : FD_LOG_WARNING(( "NULL shmem" ));
81 0 : return NULL;
82 0 : }
83 :
84 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_ipecho_server_align() ) ) ) {
85 0 : FD_LOG_WARNING(( "misaligned shmem" ));
86 0 : return NULL;
87 0 : }
88 :
89 0 : FD_SCRATCH_ALLOC_INIT( l, shmem );
90 0 : fd_ipecho_server_t * server = FD_SCRATCH_ALLOC_APPEND( l, fd_ipecho_server_align(), sizeof(fd_ipecho_server_t) );
91 0 : void * pool = FD_SCRATCH_ALLOC_APPEND( l, conn_pool_align(), conn_pool_footprint( max_connection_cnt ) );
92 0 : server->pollfds = FD_SCRATCH_ALLOC_APPEND( l, alignof(struct pollfd), (1UL+max_connection_cnt)*sizeof(struct pollfd) );
93 :
94 0 : server->pool = conn_pool_join( conn_pool_new( pool, max_connection_cnt ) );
95 0 : FD_TEST( server->pool );
96 :
97 0 : for( ulong i=0UL; i<max_connection_cnt; i++ ) {
98 0 : server->pollfds[ i ].fd = -1;
99 0 : server->pollfds[ i ].events = POLLIN | POLLOUT;
100 0 : }
101 :
102 0 : server->evict_idx = 0UL;
103 0 : server->max_connection_cnt = max_connection_cnt;
104 :
105 0 : memset( &server->metrics, 0, sizeof(server->metrics) );
106 :
107 0 : FD_COMPILER_MFENCE();
108 0 : FD_VOLATILE( server->magic ) = FD_IPECHO_SERVER_MAGIC;
109 0 : FD_COMPILER_MFENCE();
110 :
111 0 : return server;
112 0 : }
113 :
114 : fd_ipecho_server_t *
115 0 : fd_ipecho_server_join( void * shipe ) {
116 0 : if( FD_UNLIKELY( !shipe ) ) {
117 0 : FD_LOG_WARNING(( "NULL shipe" ));
118 0 : return NULL;
119 0 : }
120 :
121 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shipe, fd_ipecho_server_align() ) ) ) {
122 0 : FD_LOG_WARNING(( "misaligned shipe" ));
123 0 : return NULL;
124 0 : }
125 :
126 0 : fd_ipecho_server_t * server = (fd_ipecho_server_t *)shipe;
127 :
128 0 : if( FD_UNLIKELY( server->magic!=FD_IPECHO_SERVER_MAGIC ) ) {
129 0 : FD_LOG_WARNING(( "bad magic" ));
130 0 : return NULL;
131 0 : }
132 :
133 0 : return server;
134 0 : }
135 :
136 : void
137 : fd_ipecho_server_init( fd_ipecho_server_t * server,
138 : uint address,
139 : ushort port,
140 0 : ushort shred_version ) {
141 :
142 : /* If the shred version is 0 that means that the shred version has not
143 : been set yet. */
144 0 : server->shred_version = shred_version;
145 :
146 0 : server->sockfd = socket( AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0 );
147 0 : if( FD_UNLIKELY( -1==server->sockfd ) ) FD_LOG_ERR(( "socket() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
148 :
149 0 : int optval = 1;
150 0 : if( FD_UNLIKELY( -1==setsockopt( server->sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof( optval ) ) ) )
151 0 : FD_LOG_ERR(( "setsockopt failed (%i-%s)", errno, strerror( errno ) ));
152 :
153 0 : struct sockaddr_in addr = {
154 0 : .sin_family = AF_INET,
155 0 : .sin_port = fd_ushort_bswap( port ),
156 0 : .sin_addr.s_addr = address,
157 0 : };
158 :
159 0 : if( FD_UNLIKELY( -1==bind( server->sockfd, fd_type_pun( &addr ), sizeof( addr ) ) ) ) {
160 0 : FD_LOG_ERR(( "bind(%i,AF_INET," FD_IP4_ADDR_FMT ":%u) failed (%i-%s)",
161 0 : server->sockfd, FD_IP4_ADDR_FMT_ARGS( address ), port,
162 0 : errno, fd_io_strerror( errno ) ));
163 0 : }
164 0 : if( FD_UNLIKELY( -1==listen( server->sockfd, (int)server->max_connection_cnt ) ) ) FD_LOG_ERR(( "listen() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
165 :
166 0 : server->pollfds[ server->max_connection_cnt ] = (struct pollfd){ .fd = server->sockfd, .events = POLLIN, .revents = 0 };
167 0 : }
168 :
169 : void
170 : fd_ipecho_server_set_shred_version( fd_ipecho_server_t * server,
171 0 : ushort shred_version ) {
172 0 : server->shred_version = shred_version;
173 0 : }
174 :
175 : static inline int
176 0 : is_expected_network_error( int err ) {
177 0 : return
178 0 : err==ENETDOWN ||
179 0 : err==EPROTO ||
180 0 : err==ENOPROTOOPT ||
181 0 : err==EHOSTDOWN ||
182 0 : err==ENONET ||
183 0 : err==EHOSTUNREACH ||
184 0 : err==EOPNOTSUPP ||
185 0 : err==ENETUNREACH ||
186 0 : err==ETIMEDOUT ||
187 0 : err==ENETRESET ||
188 0 : err==ECONNABORTED ||
189 0 : err==ECONNRESET ||
190 0 : err==EPIPE;
191 0 : }
192 :
193 : static void
194 : close_conn( fd_ipecho_server_t * server,
195 : ulong conn_idx,
196 0 : int reason ) {
197 0 : (void)reason;
198 0 : FD_TEST( server->pollfds[ conn_idx ].fd!=-1 );
199 :
200 0 : if( FD_UNLIKELY( -1==close( server->pollfds[ conn_idx ].fd ) ) ) FD_LOG_ERR(( "close failed (%i-%s)", errno, strerror( errno ) ));
201 0 : server->pollfds[ conn_idx ].fd = -1;
202 0 : conn_pool_ele_release( server->pool, &server->pool[ conn_idx ] );
203 :
204 0 : FD_TEST( server->metrics->connection_cnt );
205 0 : server->metrics->connection_cnt--;
206 0 : if( FD_UNLIKELY( reason==CLOSE_OK ) ) server->metrics->connections_closed_ok++;
207 0 : else server->metrics->connections_closed_error++;
208 0 : }
209 :
210 : static void
211 0 : accept_conns( fd_ipecho_server_t * server ) {
212 0 : for(;;) {
213 0 : struct sockaddr_in addr;
214 0 : socklen_t addr_len = sizeof(addr);
215 0 : int fd = accept4( server->pollfds[ server->max_connection_cnt ].fd, fd_type_pun( &addr ), &addr_len, SOCK_NONBLOCK|SOCK_CLOEXEC );
216 :
217 0 : if( FD_UNLIKELY( -1==fd ) ) {
218 0 : if( FD_LIKELY( EAGAIN==errno ) ) break;
219 0 : else if( FD_LIKELY( is_expected_network_error( errno ) ) ) continue;
220 0 : else FD_LOG_ERR(( "accept4() failed (%i-%s)", errno, strerror( errno ) ));
221 0 : }
222 :
223 0 : if( FD_UNLIKELY( !conn_pool_free( server->pool ) ) ) {
224 0 : close_conn( server, server->evict_idx, CLOSE_EVICTED );
225 0 : server->evict_idx = (server->evict_idx+1UL) % server->max_connection_cnt;
226 0 : }
227 0 : ulong conn_id = conn_pool_idx_acquire( server->pool );
228 :
229 0 : server->pollfds[ conn_id ].fd = fd;
230 0 : server->pool[ conn_id ].ipv4 = addr.sin_addr.s_addr;
231 0 : server->pool[ conn_id ].state = STATE_READING;
232 0 : server->pool[ conn_id ].request_bytes_read = 0UL;
233 0 : server->pool[ conn_id ].response_bytes_written = 0UL;
234 :
235 0 : server->metrics->connection_cnt++;
236 0 : }
237 0 : }
238 :
239 : static void
240 : read_conn( fd_ipecho_server_t * server,
241 0 : ulong conn_idx ) {
242 0 : fd_ipecho_server_connection_t * conn = &server->pool[ conn_idx ];
243 :
244 0 : if( FD_UNLIKELY( conn->state!=STATE_READING ) ) {
245 0 : close_conn( server, conn_idx, CLOSE_EXPECTED_EOF );
246 0 : return;
247 0 : }
248 :
249 0 : long sz = read( server->pollfds[ conn_idx ].fd, conn->request_bytes+conn->request_bytes_read, sizeof(conn->request_bytes)-conn->request_bytes_read );
250 0 : if( FD_UNLIKELY( -1==sz && errno==EAGAIN ) ) return; /* No data to read, continue. */
251 0 : else if( -1==sz && is_expected_network_error( errno ) ) {
252 0 : close_conn( server, conn_idx, CLOSE_PEER_RESET );
253 0 : return;
254 0 : }
255 0 : else if( FD_UNLIKELY( -1==sz ) ) FD_LOG_ERR(( "read failed (%i-%s)", errno, strerror( errno ) )); /* Unexpected programmer error, abort */
256 :
257 0 : if( FD_UNLIKELY( !sz && conn->request_bytes_read!=21UL ) ) {
258 0 : close_conn( server, conn_idx, CLOSE_BAD_LENGTH );
259 0 : return;
260 0 : }
261 :
262 : /* New data was read... process it */
263 0 : server->metrics->bytes_read += (ulong)sz;
264 0 : conn->request_bytes_read += (ulong)sz;
265 0 : if( FD_UNLIKELY( conn->request_bytes_read==sizeof(conn->request_bytes) ) ) {
266 0 : close_conn( server, conn_idx, CLOSE_LARGE_REQUEST );
267 0 : return;
268 0 : }
269 :
270 0 : if( FD_UNLIKELY( memcmp( conn->request_bytes, "\0\0\0\0", 4UL ) ) ) {
271 0 : close_conn( server, conn_idx, CLOSE_BAD_HEADER );
272 0 : return;
273 0 : }
274 :
275 0 : if( FD_UNLIKELY( conn->request_bytes[ 20UL ]!='\n' ) ) {
276 0 : close_conn( server, conn_idx, CLOSE_BAD_TRAILER );
277 0 : return;
278 0 : }
279 :
280 0 : uchar response[ 27UL ] = {
281 0 : 0, 0, 0, 0, /* Magic */
282 0 : 0, 0, 0, 0, /* IP address variant */
283 0 : 0, 0, 0, 0, /* IP address */
284 0 : 1, /* Shred version option variant */
285 0 : 0, 0, /* Shred version */
286 0 : 0, /* [...] 12 bytes of trailing garbage, as in Agave */
287 0 : };
288 :
289 0 : FD_STORE( uint, response+8UL, conn->ipv4 );
290 0 : FD_STORE( ushort, response+13UL, server->shred_version );
291 :
292 : /* Now have a complete request ... buffer response */
293 0 : conn->state = STATE_WRITING;
294 0 : conn->response_bytes_written = 0UL;
295 0 : memcpy( conn->response_bytes, response, sizeof(response) );
296 0 : }
297 :
298 : static void
299 : write_conn( fd_ipecho_server_t * server,
300 0 : ulong conn_idx ) {
301 0 : fd_ipecho_server_connection_t * conn = &server->pool[ conn_idx ];
302 :
303 0 : if( FD_LIKELY( conn->state==STATE_READING ) ) return;
304 :
305 0 : long sz = sendto( server->pollfds[ conn_idx ].fd, conn->response_bytes+conn->response_bytes_written, sizeof(conn->response_bytes)-conn->response_bytes_written, MSG_NOSIGNAL, NULL, 0 );
306 0 : if( FD_UNLIKELY( -1==sz && errno==EAGAIN ) ) return; /* No data was written, continue. */
307 0 : if( FD_UNLIKELY( -1==sz && is_expected_network_error( errno ) ) ) {
308 0 : close_conn( server, conn_idx, CLOSE_PEER_RESET );
309 0 : return;
310 0 : }
311 0 : if( FD_UNLIKELY( -1==sz ) ) FD_LOG_ERR(( "write failed (%i-%s)", errno, strerror( errno ) )); /* Unexpected programmer error, abort */
312 :
313 0 : server->metrics->bytes_written += (ulong)sz;
314 0 : conn->response_bytes_written += (ulong)sz;
315 0 : if( FD_UNLIKELY( conn->response_bytes_written<sizeof(conn->response_bytes) ) ) return;
316 :
317 0 : close_conn( server, conn_idx, CLOSE_OK );
318 0 : }
319 :
320 : void
321 : fd_ipecho_server_poll( fd_ipecho_server_t * server,
322 : int * charge_busy,
323 0 : int timeout_ms ) {
324 :
325 : /* If the shred version is 0 that means that the shred version just
326 : has not been set yet. Don't try to accept connections yet. */
327 0 : if( FD_UNLIKELY( server->shred_version==0U ) ) return;
328 :
329 0 : int nfds = fd_syscall_poll( server->pollfds, (uint)(server->max_connection_cnt+1UL), timeout_ms );
330 0 : if( FD_UNLIKELY( 0==nfds ) ) return;
331 0 : else if( FD_UNLIKELY( -1==nfds && errno==EINTR ) ) return;
332 0 : else if( FD_UNLIKELY( -1==nfds ) ) FD_LOG_ERR(( "poll() failed (%i-%s)", errno, strerror( errno ) ));
333 :
334 0 : *charge_busy = 1;
335 :
336 0 : for( ulong i=0UL; i<server->max_connection_cnt+1UL; i++ ) {
337 0 : if( FD_UNLIKELY( -1==server->pollfds[ i ].fd ) ) continue;
338 0 : if( FD_UNLIKELY( i==server->max_connection_cnt ) ) {
339 0 : accept_conns( server );
340 0 : } else {
341 0 : if( FD_LIKELY( server->pollfds[ i ].revents & POLLIN ) ) read_conn( server, i );
342 0 : if( FD_UNLIKELY( -1==server->pollfds[ i ].fd ) ) continue;
343 0 : if( FD_LIKELY( server->pollfds[ i ].revents & POLLOUT ) ) write_conn( server, i );
344 : /* No need to handle POLLHUP, read() will return 0 soon enough. */
345 0 : }
346 0 : }
347 0 : }
348 :
349 : fd_ipecho_server_metrics_t *
350 0 : fd_ipecho_server_metrics( fd_ipecho_server_t * server ) {
351 0 : return server->metrics;
352 0 : }
353 :
354 : int
355 0 : fd_ipecho_server_sockfd( fd_ipecho_server_t * server ) {
356 0 : return server->sockfd;
357 0 : }
|