Line data Source code
1 : #define _GNU_SOURCE
2 : #include "fd_http_resolver.h"
3 : #include "fd_ssresolve.h"
4 :
5 : #include "../../../util/log/fd_log.h"
6 : #include "../../../util/fd_util.h"
7 :
8 : #include <poll.h>
9 : #include <errno.h>
10 : #include <unistd.h>
11 : #include <sys/socket.h>
12 : #include <netinet/in.h>
13 : #include <netinet/tcp.h>
14 :
15 0 : #define PEER_STATE_UNRESOLVED (0)
16 0 : #define PEER_STATE_REFRESHING (1)
17 0 : #define PEER_STATE_VALID (2)
18 0 : #define PEER_STATE_INVALID (3)
19 :
20 0 : #define PEER_DEADLINE_NANOS_VALID (5L*1000L*1000L*1000L) /* 5 seconds */
21 0 : #define PEER_DEADLINE_NANOS_RESOLVE (1L*1000L*1000L*1000L) /* 1 second */
22 0 : #define PEER_DEADLINE_NANOS_INVALID (5L*1000L*1000L*1000L) /* 5 seconds */
23 :
24 : struct fd_ssresolve_peer {
25 : fd_ip4_port_t addr;
26 : fd_ssinfo_t ssinfo;
27 :
28 : fd_ssresolve_t * full_ssresolve;
29 : fd_ssresolve_t * inc_ssresolve;
30 :
31 : struct {
32 : ulong next;
33 : } pool;
34 :
35 : struct {
36 : ulong next;
37 : ulong prev;
38 : } deadline;
39 :
40 : struct {
41 : ulong idx;
42 : } fd;
43 :
44 : int state;
45 : long deadline_nanos;
46 : };
47 : typedef struct fd_ssresolve_peer fd_ssresolve_peer_t;
48 :
49 : #define POOL_NAME peer_pool
50 0 : #define POOL_T fd_ssresolve_peer_t
51 : #define POOL_IDX_T ulong
52 0 : #define POOL_NEXT pool.next
53 : #include "../../../util/tmpl/fd_pool.c"
54 :
55 : #define DLIST_NAME deadline_list
56 : #define DLIST_ELE_T fd_ssresolve_peer_t
57 0 : #define DLIST_PREV deadline.prev
58 0 : #define DLIST_NEXT deadline.next
59 : #include "../../../util/tmpl/fd_dlist.c"
60 :
61 : struct fd_http_resolver_private {
62 : fd_ssresolve_peer_t * pool;
63 : deadline_list_t * unresolved;
64 : deadline_list_t * resolving;
65 : deadline_list_t * valid;
66 : deadline_list_t * invalid;
67 :
68 : ulong fds_len;
69 : struct pollfd * fds;
70 : ulong * fds_idx;
71 :
72 : int incremental_snapshot_fetch;
73 :
74 : void * cb_arg;
75 : fd_http_resolver_on_resolve_fn_t on_resolve_cb;
76 :
77 : ulong magic; /* ==FD_HTTP_RESOLVER_MAGIC */
78 : };
79 :
80 : FD_FN_CONST ulong
81 0 : fd_http_resolver_align( void ) {
82 0 : return fd_ulong_max( alignof(fd_http_resolver_t), fd_ulong_max( peer_pool_align(), fd_ulong_max( deadline_list_align(), fd_ulong_max( alignof(struct pollfd), alignof(ulong) ) ) ) );
83 0 : }
84 :
85 : FD_FN_CONST ulong
86 0 : fd_http_resolver_footprint( ulong peers_cnt ) {
87 0 : ulong l;
88 0 : l = FD_LAYOUT_INIT;
89 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_http_resolver_t), sizeof(fd_http_resolver_t) );
90 0 : l = FD_LAYOUT_APPEND( l, peer_pool_align(), peer_pool_footprint( peers_cnt ) );
91 0 : l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
92 0 : l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
93 0 : l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
94 0 : l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
95 0 : l = FD_LAYOUT_APPEND( l, alignof(struct pollfd), 2UL*peers_cnt*sizeof(struct pollfd) );
96 0 : l = FD_LAYOUT_APPEND( l, alignof(ulong), 2UL*peers_cnt*sizeof(ulong) );
97 :
98 0 : for( ulong i=0UL; i<peers_cnt*2UL; i++ ) {
99 0 : l = FD_LAYOUT_APPEND( l, fd_ssresolve_align(), fd_ssresolve_footprint() );
100 0 : }
101 0 : return FD_LAYOUT_FINI( l, fd_http_resolver_align() );
102 0 : }
103 :
104 : void *
105 : fd_http_resolver_new( void * shmem,
106 : ulong peers_cnt,
107 : int incremental_snapshot_fetch,
108 : fd_http_resolver_on_resolve_fn_t on_resolve_cb,
109 0 : void * cb_arg ) {
110 0 : if( FD_UNLIKELY( !shmem ) ) {
111 0 : FD_LOG_WARNING(( "NULL shmem" ));
112 0 : return NULL;
113 0 : }
114 :
115 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_http_resolver_align() ) ) ) {
116 0 : FD_LOG_WARNING(( "unaligned shmem" ));
117 0 : return NULL;
118 0 : }
119 :
120 0 : if( FD_UNLIKELY( peers_cnt<1UL ) ) {
121 0 : FD_LOG_WARNING(( "max_peers must be at least 1" ));
122 0 : return NULL;
123 0 : }
124 :
125 0 : FD_SCRATCH_ALLOC_INIT( l, shmem );
126 0 : fd_http_resolver_t * resolver = FD_SCRATCH_ALLOC_APPEND( l, fd_http_resolver_align(), sizeof(fd_http_resolver_t) );
127 0 : void * _pool = FD_SCRATCH_ALLOC_APPEND( l, peer_pool_align(), peer_pool_footprint( peers_cnt ) );
128 0 : void * _unresolved = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
129 0 : void * _resolving = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
130 0 : void * _invalid = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
131 0 : void * _valid = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
132 0 : struct pollfd * fds = FD_SCRATCH_ALLOC_APPEND( l, alignof(struct pollfd), 2UL*peers_cnt*sizeof(struct pollfd) );
133 0 : ulong * fds_idx = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), 2UL*peers_cnt*sizeof(ulong) );
134 :
135 0 : resolver->pool = peer_pool_join( peer_pool_new( _pool, peers_cnt ) );
136 0 : resolver->unresolved = deadline_list_join( deadline_list_new( _unresolved ) );
137 0 : resolver->resolving = deadline_list_join( deadline_list_new( _resolving ) );
138 0 : resolver->invalid = deadline_list_join( deadline_list_new( _invalid ) );
139 0 : resolver->valid = deadline_list_join( deadline_list_new( _valid ) );
140 :
141 0 : resolver->fds_len = 0UL;
142 0 : resolver->fds = fds;
143 0 : resolver->fds_idx = fds_idx;
144 :
145 0 : for( ulong i=0UL; i<peer_pool_max( resolver->pool ); i++ ) {
146 0 : void * _full_ssresolve = FD_SCRATCH_ALLOC_APPEND( l, fd_ssresolve_align(), fd_ssresolve_footprint() );
147 0 : void * _inc_ssresolve = FD_SCRATCH_ALLOC_APPEND( l, fd_ssresolve_align(), fd_ssresolve_footprint() );
148 0 : resolver->pool[ i ].full_ssresolve = fd_ssresolve_join( fd_ssresolve_new( _full_ssresolve ) );
149 0 : resolver->pool[ i ].inc_ssresolve = fd_ssresolve_join( fd_ssresolve_new( _inc_ssresolve ) );
150 0 : }
151 :
152 0 : resolver->incremental_snapshot_fetch = incremental_snapshot_fetch;
153 0 : resolver->cb_arg = cb_arg;
154 0 : resolver->on_resolve_cb = on_resolve_cb;
155 :
156 0 : FD_COMPILER_MFENCE();
157 0 : FD_VOLATILE( resolver->magic ) = FD_HTTP_RESOLVER_MAGIC;
158 0 : FD_COMPILER_MFENCE();
159 :
160 0 : return (void *)resolver;
161 0 : }
162 :
163 : fd_http_resolver_t *
164 0 : fd_http_resolver_join( void * shresolver ) {
165 0 : if( FD_UNLIKELY( !shresolver ) ) {
166 0 : FD_LOG_WARNING(( "NULL shresolver" ));
167 0 : return NULL;
168 0 : }
169 :
170 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shresolver, fd_http_resolver_align() ) ) ) {
171 0 : FD_LOG_WARNING(( "misaligned shresolver" ));
172 0 : return NULL;
173 0 : }
174 :
175 0 : fd_http_resolver_t * resolver = (fd_http_resolver_t *)shresolver;
176 :
177 0 : if( FD_UNLIKELY( resolver->magic!=FD_HTTP_RESOLVER_MAGIC ) ) {
178 0 : FD_LOG_WARNING(( "bad magic" ));
179 0 : return NULL;
180 0 : }
181 :
182 0 : return resolver;
183 0 : }
184 :
185 : void
186 : fd_http_resolver_add( fd_http_resolver_t * resolver,
187 0 : fd_ip4_port_t addr ) {
188 0 : fd_ssresolve_peer_t * peer = peer_pool_ele_acquire( resolver->pool );
189 0 : FD_TEST( peer );
190 0 : peer->state = PEER_STATE_UNRESOLVED;
191 0 : peer->addr = addr;
192 0 : peer->fd.idx = ULONG_MAX;
193 0 : peer->ssinfo.full.slot = ULONG_MAX;
194 0 : peer->ssinfo.incremental.base_slot = ULONG_MAX;
195 0 : peer->ssinfo.incremental.slot = ULONG_MAX;
196 0 : deadline_list_ele_push_tail( resolver->unresolved, peer, resolver->pool );
197 0 : }
198 :
199 : static int
200 : create_socket( fd_http_resolver_t * resolver,
201 0 : fd_ssresolve_peer_t * peer ) {
202 0 : int sockfd = socket( PF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0 );
203 0 : if( FD_UNLIKELY( -1==sockfd ) ) FD_LOG_ERR(( "socket failed (%i-%s)", errno, strerror( errno ) ));
204 :
205 0 : int optval = 1;
206 0 : if( FD_UNLIKELY( -1==setsockopt( sockfd, SOL_TCP, TCP_NODELAY, &optval, sizeof(int) ) ) ) {
207 0 : FD_LOG_ERR(( "setsockopt() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
208 0 : }
209 :
210 0 : struct sockaddr_in addr = {
211 0 : .sin_family = AF_INET,
212 0 : .sin_port = fd_ushort_bswap( peer->addr.port ),
213 0 : .sin_addr = { .s_addr = peer->addr.addr }
214 0 : };
215 :
216 0 : if( FD_UNLIKELY( -1==connect( sockfd, fd_type_pun( &addr ), sizeof(addr) ) && errno!=EINPROGRESS ) ) {
217 0 : if( FD_UNLIKELY( -1==close( sockfd ) ) ) FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
218 0 : return -1;
219 0 : }
220 :
221 0 : resolver->fds[ resolver->fds_len ] = (struct pollfd){
222 0 : .fd = sockfd,
223 0 : .events = POLLIN|POLLOUT,
224 0 : .revents = 0
225 0 : };
226 :
227 0 : return 0;
228 0 : }
229 :
230 : static int
231 : peer_connect( fd_http_resolver_t * resolver,
232 0 : fd_ssresolve_peer_t * peer ) {
233 0 : int err;
234 0 : err = create_socket( resolver, peer ); /* full */
235 0 : if( FD_UNLIKELY( err ) ) return err;
236 0 : resolver->fds_idx[ resolver->fds_len ] = peer_pool_idx( resolver->pool, peer );
237 0 : peer->fd.idx = resolver->fds_len;
238 0 : resolver->fds_len++;
239 0 : fd_ssresolve_init( peer->full_ssresolve, peer->addr, resolver->fds[ peer->fd.idx ].fd, 1 );
240 :
241 0 : if( FD_LIKELY( resolver->incremental_snapshot_fetch ) ) {
242 0 : err = create_socket( resolver, peer ); /* incremental */
243 0 : if( FD_UNLIKELY( err ) ) return err;
244 0 : resolver->fds_idx[ resolver->fds_len ] = peer_pool_idx( resolver->pool, peer );
245 0 : resolver->fds_len++;
246 0 : fd_ssresolve_init( peer->inc_ssresolve, peer->addr, resolver->fds[ peer->fd.idx+1UL ].fd, 0 );
247 0 : } else {
248 0 : resolver->fds[ resolver->fds_len ] = (struct pollfd) {
249 0 : .fd = -1,
250 0 : .events = 0,
251 0 : .revents = 0
252 0 : };
253 0 : resolver->fds_idx[ resolver->fds_len ] = ULONG_MAX;
254 0 : resolver->fds_len++;
255 0 : }
256 :
257 0 : return 0;
258 0 : }
259 :
260 : static inline void
261 : remove_peer( fd_http_resolver_t * resolver,
262 0 : ulong idx ) {
263 0 : FD_TEST( idx<resolver->fds_len );
264 :
265 0 : if( FD_UNLIKELY( resolver->fds_len==2UL ) ) {
266 0 : resolver->fds_len = 0UL;
267 0 : return;
268 0 : }
269 :
270 0 : resolver->fds[ idx ] = resolver->fds[ resolver->fds_len-2UL ];
271 0 : resolver->fds_idx[ idx ] = resolver->fds_idx[ resolver->fds_len-2UL ];
272 :
273 0 : resolver->fds[ idx+1UL ] = resolver->fds[ resolver->fds_len-1UL ];
274 0 : resolver->fds_idx[ idx+1UL ] = resolver->fds_idx[ resolver->fds_len-1UL ];
275 :
276 0 : fd_ssresolve_peer_t * peer = peer_pool_ele( resolver->pool, resolver->fds_idx[ idx ] );
277 0 : peer->fd.idx = idx;
278 :
279 0 : resolver->fds_len -= 2UL;
280 0 : }
281 :
282 : static inline void
283 : unresolve_peer( fd_http_resolver_t * resolver,
284 : fd_ssresolve_peer_t * peer,
285 0 : long now ) {
286 0 : FD_TEST( peer->state==PEER_STATE_UNRESOLVED || peer->state==PEER_STATE_REFRESHING );
287 0 : remove_peer( resolver, peer->fd.idx );
288 0 : deadline_list_ele_remove( resolver->resolving, peer, resolver->pool );
289 0 : peer->state = PEER_STATE_INVALID;
290 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
291 0 : deadline_list_ele_push_tail( resolver->invalid, peer, resolver->pool );
292 0 : }
293 :
294 : static inline int
295 : poll_resolve( fd_http_resolver_t * resolver,
296 : struct pollfd * pfd,
297 : fd_ssresolve_peer_t * peer,
298 : fd_ssresolve_t * ssresolve,
299 : ulong idx,
300 0 : long now ) {
301 0 : if( FD_LIKELY( pfd->revents & POLLOUT ) ) {
302 0 : int res = fd_ssresolve_advance_poll_out( ssresolve );
303 :
304 0 : if( FD_UNLIKELY( res==FD_SSRESOLVE_ADVANCE_ERROR ) ) {
305 0 : unresolve_peer( resolver, peer_pool_ele( resolver->pool, resolver->fds_idx[ idx ] ), now );
306 0 : return -1;
307 0 : }
308 0 : }
309 :
310 0 : if( FD_LIKELY( pfd->revents & POLLIN ) ) {
311 0 : fd_ssresolve_result_t resolve_result;
312 0 : int res = fd_ssresolve_advance_poll_in( ssresolve, &resolve_result );
313 :
314 0 : if( FD_UNLIKELY( res==FD_SSRESOLVE_ADVANCE_ERROR ) ) {
315 0 : unresolve_peer( resolver, peer_pool_ele( resolver->pool, resolver->fds_idx[ idx ] ), now );
316 0 : return -1;
317 0 : } else if( FD_UNLIKELY( res==FD_SSRESOLVE_ADVANCE_AGAIN ) ) {
318 0 : return -1;
319 0 : } else { /* FD_SSRESOLVE_ADVANCE_SUCCESS */
320 0 : FD_TEST( peer->deadline_nanos>now );
321 :
322 0 : if( resolve_result.base_slot==ULONG_MAX ) {
323 0 : peer->ssinfo.full.slot = resolve_result.slot;
324 0 : } else {
325 0 : peer->ssinfo.incremental.base_slot = resolve_result.base_slot;
326 0 : peer->ssinfo.incremental.slot = resolve_result.slot;
327 0 : }
328 0 : }
329 0 : }
330 :
331 0 : return 0;
332 0 : }
333 :
334 : static inline void
335 : poll_advance( fd_http_resolver_t * resolver,
336 0 : long now ) {
337 0 : if( FD_LIKELY( !resolver->fds_len ) ) return;
338 :
339 0 : int nfds = fd_syscall_poll( resolver->fds, (uint)resolver->fds_len, 0 );
340 0 : if( FD_LIKELY( !nfds ) ) return;
341 0 : else if( FD_UNLIKELY( -1==nfds && errno==EINTR ) ) return;
342 0 : else if( FD_UNLIKELY( -1==nfds ) ) FD_LOG_ERR(( "poll failed (%i-%s)", errno, strerror( errno ) ));
343 :
344 0 : for( ulong i=0UL; i<resolver->fds_len; i++) {
345 :
346 0 : struct pollfd * pfd = &resolver->fds[ i ];
347 0 : if( FD_UNLIKELY( pfd->fd==-1 ) ) continue;
348 0 : if( FD_UNLIKELY( pfd->revents & (POLLERR|POLLHUP) ) ) {
349 0 : unresolve_peer( resolver, peer_pool_ele( resolver->pool, resolver->fds_idx[ i ] ), now );
350 0 : continue;
351 0 : }
352 :
353 0 : fd_ssresolve_peer_t * peer = peer_pool_ele( resolver->pool, resolver->fds_idx[ i ] );
354 0 : int full = i&1UL ? 0 : 1; /* even indices are full, odd indices are incremental */
355 0 : fd_ssresolve_t * ssresolve = full ? peer->full_ssresolve : peer->inc_ssresolve;
356 :
357 0 : if( FD_LIKELY( !fd_ssresolve_is_done( ssresolve ) ) ) {
358 0 : int res = poll_resolve( resolver, pfd, peer, ssresolve, i, now );
359 0 : if( FD_UNLIKELY( res ) ) continue;
360 0 : }
361 :
362 : /* Once both the full and incremental snapshots are resolved, we can
363 : mark the peer valid and remove the peer from the list of peers to
364 : ping. */
365 0 : if( FD_LIKELY( fd_ssresolve_is_done( peer->full_ssresolve ) &&
366 0 : (!resolver->incremental_snapshot_fetch || fd_ssresolve_is_done( peer->inc_ssresolve ) ) ) ) {
367 0 : peer->state = PEER_STATE_VALID;
368 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_VALID;
369 :
370 0 : deadline_list_ele_remove( resolver->resolving, peer, resolver->pool );
371 0 : deadline_list_ele_push_tail( resolver->valid, peer, resolver->pool );
372 0 : remove_peer( resolver, peer->fd.idx );
373 :
374 0 : resolver->on_resolve_cb( resolver->cb_arg, peer->addr, &peer->ssinfo );
375 0 : }
376 0 : }
377 0 : }
378 :
379 : void
380 : fd_http_resolver_advance( fd_http_resolver_t * resolver,
381 : long now,
382 0 : fd_sspeer_selector_t * selector ) {
383 0 : while( !deadline_list_is_empty( resolver->unresolved, resolver->pool ) ) {
384 0 : fd_ssresolve_peer_t * peer = deadline_list_ele_pop_head( resolver->unresolved, resolver->pool );
385 :
386 0 : FD_LOG_INFO(( "resolving " FD_IP4_ADDR_FMT ":%hu", FD_IP4_ADDR_FMT_ARGS( peer->addr.addr ), peer->addr.port ));
387 0 : int result = peer_connect( resolver, peer );
388 0 : if( FD_UNLIKELY( -1==result ) ) {
389 0 : peer->state = PEER_STATE_INVALID;
390 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
391 0 : deadline_list_ele_push_tail( resolver->invalid, peer, resolver->pool );
392 0 : } else {
393 0 : peer->state = PEER_STATE_REFRESHING;
394 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_RESOLVE;
395 0 : deadline_list_ele_push_tail( resolver->resolving, peer, resolver->pool );
396 0 : }
397 0 : }
398 :
399 0 : while( !deadline_list_is_empty( resolver->resolving, resolver->pool ) ) {
400 0 : fd_ssresolve_peer_t * peer = deadline_list_ele_peek_head( resolver->resolving, resolver->pool );
401 0 : if( FD_LIKELY( peer->deadline_nanos>now ) ) break;
402 :
403 0 : deadline_list_ele_pop_head( resolver->resolving, resolver->pool );
404 0 : peer->state = PEER_STATE_INVALID;
405 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
406 0 : deadline_list_ele_push_tail( resolver->invalid, peer, resolver->pool );
407 0 : remove_peer( resolver, peer->fd.idx );
408 :
409 0 : fd_sspeer_selector_remove( selector, peer->addr );
410 0 : }
411 :
412 0 : while( !deadline_list_is_empty( resolver->invalid, resolver->pool ) ) {
413 0 : fd_ssresolve_peer_t * peer = deadline_list_ele_peek_head( resolver->invalid, resolver->pool );
414 0 : if( FD_LIKELY( peer->deadline_nanos>now ) ) break;
415 :
416 0 : deadline_list_ele_pop_head( resolver->invalid, resolver->pool );
417 :
418 0 : peer->state = PEER_STATE_UNRESOLVED;
419 0 : peer->deadline_nanos = 0L;
420 0 : deadline_list_ele_push_tail( resolver->unresolved, peer, resolver->pool );
421 0 : }
422 :
423 0 : while( !deadline_list_is_empty( resolver->valid, resolver->pool ) ) {
424 0 : fd_ssresolve_peer_t * peer = deadline_list_ele_peek_head( resolver->valid, resolver->pool );
425 0 : if( FD_LIKELY( peer->deadline_nanos>now ) ) break;
426 :
427 0 : deadline_list_ele_pop_head( resolver->valid, resolver->pool );
428 :
429 0 : int result = peer_connect( resolver, peer );
430 0 : if( FD_UNLIKELY( -1==result ) ) {
431 0 : peer->state = PEER_STATE_INVALID;
432 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
433 0 : deadline_list_ele_push_tail( resolver->invalid, peer, resolver->pool );
434 0 : fd_sspeer_selector_remove( selector, peer->addr );
435 0 : } else {
436 0 : peer->state = PEER_STATE_REFRESHING;
437 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_RESOLVE;
438 0 : deadline_list_ele_push_tail( resolver->resolving, peer, resolver->pool );
439 0 : }
440 0 : }
441 :
442 0 : poll_advance( resolver, now );
443 0 : }
|