Line data Source code
1 : #define _GNU_SOURCE /* ppoll */
2 : #include "fd_ssping.h"
3 : #include "fd_sspeer_selector.h"
4 :
5 : #include "../../../util/fd_util.h"
6 : #include "../../../util/bits/fd_bits.h"
7 : #include "../../../util/log/fd_log.h"
8 :
9 : #include <errno.h>
10 : #include <fcntl.h>
11 : #include <unistd.h>
12 : #include <sys/socket.h>
13 : #include <netinet/in.h>
14 : #include <netinet/tcp.h>
15 : #include <poll.h>
16 :
17 48 : #define PEER_STATE_UNPINGED 0
18 0 : #define PEER_STATE_PINGED 1
19 0 : #define PEER_STATE_VALID 2
20 0 : #define PEER_STATE_REFRESHING 3
21 54 : #define PEER_STATE_INVALID 4
22 :
23 0 : #define PEER_DEADLINE_NANOS_PING (1L*1000L*1000L*1000L) /* 1 second */
24 0 : #define PEER_DEADLINE_NANOS_VALID (2L*60L*1000L*1000L*1000L) /* 2 minutes */
25 21 : #define PEER_DEADLINE_NANOS_INVALID (5L*60L*1000L*1000L*1000L) /* 5 minutes */
26 :
27 0 : #define PING_BURST_MAX (16UL) /* Limit how many pings we can burst at once. */
28 :
29 : struct fd_ssping_peer {
30 : ulong refcnt;
31 : fd_ip4_port_t addr;
32 :
33 : struct {
34 : ulong next;
35 : } pool;
36 :
37 : struct {
38 : ulong next;
39 : ulong prev;
40 : } map;
41 :
42 : struct {
43 : ulong next;
44 : ulong prev;
45 : } deadline;
46 :
47 : int state;
48 : ulong latency_nanos;
49 : long deadline_nanos;
50 : ulong used_fd_idx;
51 : };
52 :
53 : typedef struct fd_ssping_peer fd_ssping_peer_t;
54 :
55 : #define POOL_NAME peer_pool
56 6 : #define POOL_T fd_ssping_peer_t
57 : #define POOL_IDX_T ulong
58 96 : #define POOL_NEXT pool.next
59 : #include "../../../util/tmpl/fd_pool.c"
60 :
61 : #define MAP_NAME peer_map
62 48 : #define MAP_KEY addr
63 24 : #define MAP_ELE_T fd_ssping_peer_t
64 : #define MAP_KEY_T fd_ip4_port_t
65 24 : #define MAP_PREV map.prev
66 48 : #define MAP_NEXT map.next
67 57 : #define MAP_KEY_EQ(k0,k1) ((k0)->l==(k1)->l)
68 129 : #define MAP_KEY_HASH(key,seed) (seed^(key)->l)
69 : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
70 : #include "../../../util/tmpl/fd_map_chain.c"
71 :
72 : #define DLIST_NAME deadline_list
73 : #define DLIST_ELE_T fd_ssping_peer_t
74 90 : #define DLIST_PREV deadline.prev
75 90 : #define DLIST_NEXT deadline.next
76 : #include "../../../util/tmpl/fd_dlist.c"
77 :
78 : struct fd_ssping_private {
79 : fd_ssping_peer_t * pool;
80 : peer_map_t * map;
81 :
82 : deadline_list_t * unpinged;
83 : deadline_list_t * pinged;
84 : deadline_list_t * valid;
85 : deadline_list_t * refreshing;
86 : deadline_list_t * invalid;
87 :
88 : fd_ssping_on_ping_fn_t on_ping_cb;
89 : void * cb_arg;
90 :
91 : ulong magic; /* ==FD_SSPING_MAGIC */
92 :
93 : /* Invariant: The pool elements with an associated file descriptor are
94 : exactly those that are PINGED or REFRESHING. */
95 : ulong used_fd_cnt;
96 : struct pollfd used_fds[ FD_SSPING_FD_CNT ]; /* indexed [0, used_fd_cnt) */
97 : int idle_fds[ FD_SSPING_FD_CNT ]; /* indexed [0, FD_SSPING_FD_CNT-used_fd_cnt) */
98 : /* ping_to_pool[ i ]==x means that used_fds[ i ].fd is in use for
99 : pinging the peer in pool[ x ]. */
100 : ulong ping_to_pool[ FD_SSPING_FD_CNT ]; /* indexed [0, used_fd_cnt) */
101 : };
102 :
103 :
104 : FD_FN_CONST ulong
105 231 : fd_ssping_align( void ) {
106 231 : return fd_ulong_max( alignof(fd_ssping_t),
107 231 : fd_ulong_max( peer_pool_align(),
108 231 : fd_ulong_max( peer_map_align(),
109 231 : deadline_list_align() ) ) );
110 231 : }
111 :
112 : FD_FN_CONST ulong
113 33 : fd_ssping_footprint( ulong max_peers ) {
114 33 : ulong l;
115 33 : l = FD_LAYOUT_INIT;
116 33 : l = FD_LAYOUT_APPEND( l, alignof(fd_ssping_t), sizeof(fd_ssping_t) );
117 33 : l = FD_LAYOUT_APPEND( l, peer_pool_align(), peer_pool_footprint( max_peers ) );
118 33 : l = FD_LAYOUT_APPEND( l, peer_map_align(), peer_map_footprint( peer_map_chain_cnt_est( max_peers ) ) );
119 33 : l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
120 33 : l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
121 33 : l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
122 33 : l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
123 33 : l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
124 33 : return FD_LAYOUT_FINI( l, fd_ssping_align() );
125 33 : }
126 :
127 : void *
128 : fd_ssping_new( void * shmem,
129 : ulong max_peers,
130 : ulong seed,
131 : fd_ssping_on_ping_fn_t on_ping_cb,
132 3 : void * cb_arg ) {
133 3 : if( FD_UNLIKELY( !shmem ) ) {
134 0 : FD_LOG_WARNING(( "NULL shmem" ));
135 0 : return NULL;
136 0 : }
137 :
138 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_ssping_align() ) ) ) {
139 0 : FD_LOG_WARNING(( "unaligned shmem" ));
140 0 : return NULL;
141 0 : }
142 :
143 3 : if( FD_UNLIKELY( max_peers < 1UL ) ) {
144 0 : FD_LOG_WARNING(( "max_peers must be at least 1" ));
145 0 : return NULL;
146 0 : }
147 :
148 3 : FD_SCRATCH_ALLOC_INIT( l, shmem );
149 3 : fd_ssping_t * ssping = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_ssping_t), sizeof(fd_ssping_t) );
150 3 : void * _pool = FD_SCRATCH_ALLOC_APPEND( l, peer_pool_align(), peer_pool_footprint( max_peers ) );
151 3 : void * _map = FD_SCRATCH_ALLOC_APPEND( l, peer_map_align(), peer_map_footprint( peer_map_chain_cnt_est( max_peers ) ) );
152 3 : void * _unpinged = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
153 3 : void * _pinged = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
154 3 : void * _valid = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
155 3 : void * _refreshing = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
156 3 : void * _invalid = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
157 :
158 3 : ssping->pool = peer_pool_join( peer_pool_new( _pool, max_peers ) );
159 3 : ssping->map = peer_map_join( peer_map_new( _map, peer_map_chain_cnt_est( max_peers ), seed ) );
160 :
161 3 : ssping->unpinged = deadline_list_join( deadline_list_new( _unpinged ) );
162 3 : ssping->pinged = deadline_list_join( deadline_list_new( _pinged ) );
163 3 : ssping->valid = deadline_list_join( deadline_list_new( _valid ) );
164 3 : ssping->refreshing = deadline_list_join( deadline_list_new( _refreshing ) );
165 3 : ssping->invalid = deadline_list_join( deadline_list_new( _invalid ) );
166 :
167 : /* Allocate a contiguous range of file descriptors */
168 3 : int next_fd = FD_SSPING_FD_MIN;
169 :
170 750 : for( ulong i=0UL; i<FD_SSPING_FD_CNT; i++, next_fd++ ) {
171 747 : int fd = socket( AF_INET, SOCK_STREAM|SOCK_NONBLOCK, IPPROTO_TCP );
172 747 : if( FD_UNLIKELY( -1==fd ) ) FD_LOG_ERR(( "socket(SOCK_STREAM,IPPROTO_TCP) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
173 :
174 747 : int actual_fd = fcntl( fd, F_DUPFD_CLOEXEC, next_fd );
175 747 : if( FD_UNLIKELY( actual_fd<0 ) ) {
176 0 : FD_LOG_ERR(( "fcntl(F_DUPFD_CLOEXEC,%d) failed (%i-%s)", next_fd, errno, fd_io_strerror( errno ) ));
177 0 : }
178 747 : if( FD_UNLIKELY( actual_fd!=next_fd ) ) {
179 0 : FD_LOG_ERR(( "file descriptor collision at %d", next_fd ));
180 0 : }
181 747 : if( FD_UNLIKELY( 0!=close( fd ) ) ) {
182 0 : FD_LOG_ERR(( "close(%d) failed (%i-%s)", fd, errno, fd_io_strerror( errno ) ));
183 0 : }
184 :
185 747 : int tcp_nodelay = 1;
186 747 : if( FD_UNLIKELY( setsockopt( next_fd, SOL_TCP, TCP_NODELAY, &tcp_nodelay, sizeof(int) ) ) ) {
187 0 : FD_LOG_ERR(( "setsockopt(SOL_TCP,TCP_NODELAY,1) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
188 0 : }
189 747 : ssping->idle_fds[ i ] = next_fd;
190 :
191 747 : ssping->used_fds[ i ].fd = -1;
192 747 : ssping->used_fds[ i ].events = POLLOUT|POLLRDHUP|POLLPRI;
193 747 : ssping->used_fds[ i ].revents = 0;
194 747 : }
195 :
196 3 : ssping->used_fd_cnt = 0UL;
197 :
198 3 : ssping->on_ping_cb = on_ping_cb;
199 3 : ssping->cb_arg = cb_arg;
200 :
201 3 : FD_COMPILER_MFENCE();
202 3 : FD_VOLATILE( ssping->magic ) = FD_SSPING_MAGIC;
203 3 : FD_COMPILER_MFENCE();
204 :
205 3 : return (void *)ssping;
206 3 : }
207 :
208 : fd_ssping_t *
209 3 : fd_ssping_join( void * shping ) {
210 3 : if( FD_UNLIKELY( !shping ) ) {
211 0 : FD_LOG_WARNING(( "NULL shping" ));
212 0 : return NULL;
213 0 : }
214 :
215 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shping, fd_ssping_align() ) ) ) {
216 0 : FD_LOG_WARNING(( "misaligned shping" ));
217 0 : return NULL;
218 0 : }
219 :
220 3 : fd_ssping_t * ssping = (fd_ssping_t *)shping;
221 :
222 3 : if( FD_UNLIKELY( ssping->magic!=FD_SSPING_MAGIC ) ) {
223 0 : FD_LOG_WARNING(( "bad magic" ));
224 0 : return NULL;
225 0 : }
226 :
227 3 : return ssping;
228 3 : }
229 :
230 : void *
231 3 : fd_ssping_leave( fd_ssping_t * ssping ) {
232 3 : if( FD_UNLIKELY( !ssping ) ) {
233 0 : FD_LOG_WARNING(( "NULL ssping" ));
234 0 : return NULL;
235 0 : }
236 :
237 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)ssping, fd_ssping_align() ) ) ) {
238 0 : FD_LOG_WARNING(( "misaligned ssping" ));
239 0 : return NULL;
240 0 : }
241 :
242 3 : if( FD_UNLIKELY( ssping->magic!=FD_SSPING_MAGIC ) ) {
243 0 : FD_LOG_WARNING(( "bad magic" ));
244 0 : return NULL;
245 0 : }
246 :
247 3 : ssping->pool = peer_pool_leave( ssping->pool );
248 3 : ssping->map = peer_map_leave( ssping->map );
249 3 : ssping->unpinged = deadline_list_leave( ssping->unpinged );
250 3 : ssping->pinged = deadline_list_leave( ssping->pinged );
251 3 : ssping->valid = deadline_list_leave( ssping->valid );
252 3 : ssping->refreshing = deadline_list_leave( ssping->refreshing );
253 3 : ssping->invalid = deadline_list_leave( ssping->invalid );
254 :
255 3 : return (void *)ssping;
256 3 : }
257 :
258 : void *
259 3 : fd_ssping_delete( void * shping ) {
260 3 : if( FD_UNLIKELY( !shping ) ) {
261 0 : FD_LOG_WARNING(( "NULL shping" ));
262 0 : return NULL;
263 0 : }
264 :
265 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shping, fd_ssping_align() ) ) ) {
266 0 : FD_LOG_WARNING(( "misaligned shping" ));
267 0 : return NULL;
268 0 : }
269 :
270 3 : fd_ssping_t * ssping = (fd_ssping_t *)shping;
271 :
272 3 : if( FD_UNLIKELY( ssping->magic!=FD_SSPING_MAGIC ) ) {
273 0 : FD_LOG_WARNING(( "bad magic" ));
274 0 : return NULL;
275 0 : }
276 :
277 : /* Close all file descriptors opened by fd_ssping_new. */
278 3 : for( ulong i=0UL; i<ssping->used_fd_cnt; i++ ) {
279 0 : close( ssping->used_fds[ i ].fd );
280 0 : }
281 750 : for( ulong i=0UL; i<FD_SSPING_FD_CNT-ssping->used_fd_cnt; i++ ) {
282 747 : close( ssping->idle_fds[ i ] );
283 747 : }
284 :
285 3 : FD_COMPILER_MFENCE();
286 3 : FD_VOLATILE( ssping->magic ) = 0UL;
287 3 : FD_COMPILER_MFENCE();
288 :
289 3 : return (void *)ssping;
290 3 : }
291 :
292 : void
293 : fd_ssping_add( fd_ssping_t * ssping,
294 24 : fd_ip4_port_t addr ) {
295 24 : fd_ssping_peer_t * peer = peer_map_ele_query( ssping->map, &addr, NULL, ssping->pool );
296 24 : if( FD_LIKELY( !peer ) ) {
297 24 : if( FD_UNLIKELY( !peer_pool_free( ssping->pool ) ) ) {
298 0 : FD_LOG_WARNING(( "ping peer pool exhausted" ));
299 0 : return;
300 0 : }
301 24 : peer = peer_pool_ele_acquire( ssping->pool );
302 24 : memset( peer, 0, sizeof(fd_ssping_peer_t) );
303 24 : peer->refcnt = 0UL;
304 24 : peer->state = PEER_STATE_UNPINGED;
305 24 : peer->addr = addr;
306 24 : peer->latency_nanos = ULONG_MAX;
307 24 : peer->used_fd_idx = ULONG_MAX;
308 24 : peer_map_ele_insert( ssping->map, peer, ssping->pool );
309 24 : deadline_list_ele_push_tail( ssping->unpinged, peer, ssping->pool );
310 24 : }
311 24 : peer->refcnt++;
312 24 : }
313 :
314 : static void
315 : remove_fdesc_idx( fd_ssping_t * ssping,
316 0 : ulong fdesc_idx ) {
317 0 : FD_TEST( fdesc_idx<FD_SSPING_FD_CNT );
318 0 : FD_TEST( fdesc_idx<ssping->used_fd_cnt );
319 0 : ulong pool_idx = ssping->ping_to_pool[ fdesc_idx ];
320 :
321 0 : int fdesc = ssping->used_fds[ fdesc_idx ].fd;
322 : /* Abort the connection attempt or close the connection by connecting
323 : to AF_UNSPEC. */
324 0 : struct sockaddr_in addr[1] = {{
325 0 : .sin_family = AF_UNSPEC,
326 0 : .sin_addr = { .s_addr = 0U },
327 0 : .sin_port = 0
328 0 : }};
329 0 : if( FD_UNLIKELY( connect( fdesc, fd_type_pun_const( addr ), sizeof(addr) ) ) ) FD_LOG_ERR(( "connect(AF_UNSPEC) failed (%d-%s)", errno, fd_io_strerror( errno ) ));
330 :
331 : /* Mark that the pool element no longer has an associated index. */
332 0 : ssping->pool[ pool_idx ].used_fd_idx = ULONG_MAX;
333 :
334 : /* Now swap the last used_fd into this position, updating all the
335 : relevant bookkeeping info. */
336 0 : ulong last = ssping->used_fd_cnt-1UL;
337 0 : if( FD_LIKELY( fdesc_idx!=last ) ) {
338 0 : ssping->used_fds[ fdesc_idx ] = ssping->used_fds[ last ];
339 0 : ulong last_pool_idx = ssping->ping_to_pool[ fdesc_idx ] = ssping->ping_to_pool[ last ];
340 0 : ssping->pool[ last_pool_idx ].used_fd_idx = fdesc_idx;
341 0 : }
342 :
343 0 : ssping->idle_fds[ FD_SSPING_FD_CNT - ssping->used_fd_cnt ] = fdesc;
344 0 : ssping->used_fd_cnt--;
345 0 : }
346 :
347 : int
348 : fd_ssping_remove( fd_ssping_t * ssping,
349 24 : fd_ip4_port_t addr ) {
350 24 : fd_ssping_peer_t * peer = peer_map_ele_query( ssping->map, &addr, NULL, ssping->pool );
351 24 : if( FD_UNLIKELY( !peer ) ) return 0;
352 24 : if( FD_UNLIKELY( !peer->refcnt ) ) return 0;
353 24 : peer->refcnt--;
354 24 : if( FD_LIKELY( !peer->refcnt ) ) {
355 24 : switch( peer->state ) {
356 3 : case PEER_STATE_UNPINGED:
357 3 : deadline_list_ele_remove( ssping->unpinged, peer, ssping->pool );
358 3 : break;
359 0 : case PEER_STATE_PINGED:
360 0 : deadline_list_ele_remove( ssping->pinged, peer, ssping->pool );
361 0 : remove_fdesc_idx( ssping, peer->used_fd_idx );
362 0 : break;
363 0 : case PEER_STATE_VALID:
364 0 : deadline_list_ele_remove( ssping->valid, peer, ssping->pool );
365 0 : break;
366 0 : case PEER_STATE_REFRESHING:
367 0 : deadline_list_ele_remove( ssping->refreshing, peer, ssping->pool );
368 0 : remove_fdesc_idx( ssping, peer->used_fd_idx );
369 0 : break;
370 21 : case PEER_STATE_INVALID:
371 21 : deadline_list_ele_remove( ssping->invalid, peer, ssping->pool );
372 21 : break;
373 24 : }
374 24 : peer_map_ele_remove_fast( ssping->map, peer, ssping->pool );
375 24 : peer_pool_ele_release( ssping->pool, peer );
376 24 : return 1;
377 24 : }
378 0 : return 0;
379 24 : }
380 :
381 : void
382 : fd_ssping_invalidate( fd_ssping_t * ssping,
383 : fd_ip4_port_t addr,
384 24 : long now ) {
385 24 : if( FD_UNLIKELY( !ssping ) ) return;
386 24 : fd_ssping_peer_t * peer = peer_map_ele_query( ssping->map, &addr, NULL, ssping->pool );
387 24 : if( FD_UNLIKELY( !peer ) ) return;
388 24 : switch( peer->state ) {
389 21 : case PEER_STATE_UNPINGED:
390 21 : deadline_list_ele_remove( ssping->unpinged, peer, ssping->pool );
391 21 : break;
392 0 : case PEER_STATE_PINGED:
393 0 : deadline_list_ele_remove( ssping->pinged, peer, ssping->pool );
394 0 : remove_fdesc_idx( ssping, peer->used_fd_idx );
395 0 : break;
396 0 : case PEER_STATE_VALID:
397 0 : deadline_list_ele_remove( ssping->valid, peer, ssping->pool );
398 0 : break;
399 0 : case PEER_STATE_REFRESHING:
400 0 : deadline_list_ele_remove( ssping->refreshing, peer, ssping->pool );
401 0 : remove_fdesc_idx( ssping, peer->used_fd_idx );
402 0 : break;
403 3 : case PEER_STATE_INVALID:
404 3 : return;
405 24 : }
406 21 : peer->state = PEER_STATE_INVALID;
407 21 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
408 21 : deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool );
409 21 : }
410 :
411 : static inline void
412 : recv_pings( fd_ssping_t * ssping,
413 0 : fd_sspeer_selector_t * selector) {
414 0 : int pollv = fd_syscall_poll( ssping->used_fds, (uint)ssping->used_fd_cnt, 0 );
415 0 : if( FD_UNLIKELY( pollv<0 ) ) {
416 0 : FD_LOG_WARNING(( "poll(used_fds,%lu,0) failed (%d-%s)", ssping->used_fd_cnt, errno, fd_io_strerror( errno ) ));
417 0 : return;
418 0 : }
419 0 : long now = fd_log_wallclock();
420 0 : ulong processed = 0UL;
421 0 : ulong processed_idx[ PING_BURST_MAX ];
422 0 : for( ulong i=0UL; i<ssping->used_fd_cnt; i++ ) {
423 0 : if( FD_UNLIKELY( processed >= fd_ulong_min( (ulong)pollv, PING_BURST_MAX ) ) ) break;
424 0 : if( FD_UNLIKELY( ssping->used_fds[ i ].revents ) ) {
425 0 : ulong pool_idx = ssping->ping_to_pool[ i ];
426 0 : fd_ssping_peer_t * peer = ssping->pool+pool_idx;
427 :
428 0 : FD_TEST( peer->state==PEER_STATE_PINGED || peer->state==PEER_STATE_REFRESHING );
429 :
430 :
431 0 : deadline_list_ele_remove( peer->state==PEER_STATE_PINGED ? ssping->pinged : ssping->refreshing, peer, ssping->pool );
432 0 : int is_err = ssping->used_fds[ i ].revents & (POLLRDHUP|POLLERR|POLLHUP);
433 0 : if( FD_LIKELY( !is_err ) ) {
434 0 : peer->latency_nanos = (ulong)fd_long_max( now - (peer->deadline_nanos - PEER_DEADLINE_NANOS_PING), 1L );
435 0 : peer->state = PEER_STATE_VALID;
436 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_VALID;
437 0 : deadline_list_ele_push_tail( ssping->valid, peer, ssping->pool );
438 :
439 0 : FD_LOG_INFO(( "pinged " FD_IP4_ADDR_FMT ":%hu in %lu nanos",
440 0 : FD_IP4_ADDR_FMT_ARGS( peer->addr.addr ), fd_ushort_bswap( peer->addr.port ), peer->latency_nanos ));
441 0 : ssping->on_ping_cb( ssping->cb_arg, peer->addr, peer->latency_nanos );
442 0 : } else {
443 : /* This is pretty unlikely, but the host could respond with an
444 : RST packet I suppose. */
445 0 : peer->state = PEER_STATE_INVALID;
446 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
447 0 : deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool );
448 0 : fd_sspeer_selector_remove_by_addr( selector, peer->addr );
449 0 : }
450 0 : processed_idx[ processed ] = i;
451 0 : processed++;
452 0 : }
453 0 : }
454 : /* Now we need to call remove_fdesc_idx on the processed ones in
455 : reverse order (largest to smallest) so that we don't trip on
456 : ourself as we shuffle the array. */
457 0 : while( processed ) remove_fdesc_idx( ssping, processed_idx[ --processed ] );
458 0 : }
459 :
460 : static uint
461 : send_pings( fd_ssping_t * ssping,
462 : deadline_list_t * list,
463 0 : long until ) {
464 0 : uint msg_cnt = 0U;
465 0 : for( deadline_list_iter_t iter = deadline_list_iter_fwd_init( list, ssping->pool );
466 0 : msg_cnt<PING_BURST_MAX && ssping->used_fd_cnt<FD_SSPING_FD_CNT && !deadline_list_iter_done( iter, list, ssping->pool );
467 0 : iter = deadline_list_iter_fwd_next( iter, list, ssping->pool ) ) {
468 0 : ulong peer_idx = deadline_list_iter_idx( iter, list, ssping->pool );
469 0 : fd_ssping_peer_t * peer = peer_pool_ele( ssping->pool, peer_idx );
470 0 : if( peer->deadline_nanos>until ) break;
471 :
472 0 : int fdesc = ssping->idle_fds[ FD_SSPING_FD_CNT-ssping->used_fd_cnt-1UL ];
473 :
474 0 : struct sockaddr_in addr[1] = {{
475 0 : .sin_family = AF_INET,
476 0 : .sin_addr = { .s_addr = peer->addr.addr },
477 0 : .sin_port = peer->addr.port
478 0 : }};
479 :
480 0 : if( FD_UNLIKELY( connect( fdesc, fd_type_pun_const( addr ), sizeof(addr) ) && errno!=EINPROGRESS ) ) {
481 0 : FD_LOG_WARNING(( "connect(" FD_IP4_ADDR_FMT ":%hu) failed (%d-%s)", FD_IP4_ADDR_FMT_ARGS( peer->addr.addr ), fd_ushort_bswap( peer->addr.port ), errno, fd_io_strerror( errno ) ));
482 : /* Nothing to do. It will get "reaped" later. */
483 0 : }
484 :
485 0 : ssping->used_fds [ ssping->used_fd_cnt ].fd = fdesc;
486 0 : ssping->ping_to_pool[ ssping->used_fd_cnt ] = peer_idx;
487 0 : peer->used_fd_idx = ssping->used_fd_cnt;
488 0 : ssping->used_fd_cnt++;
489 0 : msg_cnt++;
490 0 : }
491 :
492 0 : if( msg_cnt==0U ) return 0U;
493 0 : return (uint)msg_cnt;
494 0 : }
495 :
496 :
497 : void
498 : fd_ssping_advance( fd_ssping_t * ssping,
499 : long now,
500 0 : fd_sspeer_selector_t * selector) {
501 0 : uint sent = send_pings( ssping, ssping->unpinged, LONG_MAX );
502 0 : for( uint i=0U; i<sent; i++ ) {
503 0 : fd_ssping_peer_t * peer = deadline_list_ele_pop_head( ssping->unpinged, ssping->pool );
504 0 : FD_TEST( peer );
505 0 : peer->state = PEER_STATE_PINGED;
506 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_PING;
507 0 : deadline_list_ele_push_tail( ssping->pinged, peer, ssping->pool );
508 0 : }
509 :
510 0 : while( !deadline_list_is_empty( ssping->pinged, ssping->pool ) ) {
511 0 : fd_ssping_peer_t * peer = deadline_list_ele_peek_head( ssping->pinged, ssping->pool );
512 0 : if( FD_LIKELY( peer->deadline_nanos>now ) ) break;
513 :
514 0 : deadline_list_ele_pop_head( ssping->pinged, ssping->pool );
515 :
516 0 : remove_fdesc_idx( ssping, peer->used_fd_idx );
517 :
518 0 : peer->state = PEER_STATE_INVALID;
519 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
520 0 : deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool );
521 0 : fd_sspeer_selector_remove_by_addr( selector, peer->addr );
522 0 : }
523 :
524 0 : sent = send_pings( ssping, ssping->valid, now );
525 0 : for( uint i=0U; i<sent; i++ ) {
526 0 : fd_ssping_peer_t * peer = deadline_list_ele_pop_head( ssping->valid, ssping->pool );
527 0 : FD_TEST( peer );
528 0 : peer->state = PEER_STATE_REFRESHING;
529 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_PING;
530 0 : deadline_list_ele_push_tail( ssping->refreshing, peer, ssping->pool );
531 0 : }
532 :
533 0 : while( !deadline_list_is_empty( ssping->refreshing, ssping->pool ) ) {
534 0 : fd_ssping_peer_t * peer = deadline_list_ele_peek_head( ssping->refreshing, ssping->pool );
535 0 : if( FD_LIKELY( peer->deadline_nanos>now ) ) break;
536 :
537 0 : deadline_list_ele_pop_head( ssping->refreshing, ssping->pool );
538 :
539 0 : remove_fdesc_idx( ssping, peer->used_fd_idx );
540 :
541 0 : peer->state = PEER_STATE_INVALID;
542 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
543 0 : deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool );
544 0 : fd_sspeer_selector_remove_by_addr( selector, peer->addr );
545 0 : }
546 :
547 0 : while( !deadline_list_is_empty( ssping->invalid, ssping->pool ) ) {
548 0 : fd_ssping_peer_t * peer = deadline_list_ele_peek_head( ssping->invalid, ssping->pool );
549 0 : if( FD_LIKELY( peer->deadline_nanos>now ) ) break;
550 :
551 0 : deadline_list_ele_pop_head( ssping->invalid, ssping->pool );
552 :
553 0 : peer->state = PEER_STATE_UNPINGED;
554 0 : peer->deadline_nanos = 0L;
555 0 : deadline_list_ele_push_tail( ssping->unpinged, peer, ssping->pool );
556 0 : }
557 :
558 0 : recv_pings( ssping, selector );
559 0 : }
560 :
561 : int
562 : fd_ssping_is_invalidated( fd_ssping_t * ssping,
563 9 : fd_ip4_port_t addr ) {
564 9 : if( FD_UNLIKELY( !ssping ) ) return 0;
565 9 : fd_ssping_peer_t * peer = peer_map_ele_query( ssping->map, &addr, NULL, ssping->pool );
566 9 : if( FD_UNLIKELY( !peer ) ) return 0;
567 9 : return peer->state==PEER_STATE_INVALID;
568 9 : }
|