Line data Source code
1 : #include "fd_ssping.h"
2 : #include "fd_sspeer_selector.h"
3 :
4 : #include "../../../util/fd_util.h"
5 : #include "../../../util/bits/fd_bits.h"
6 : #include "../../../util/log/fd_log.h"
7 :
8 : #include <poll.h>
9 : #include <errno.h>
10 : #include <unistd.h>
11 : #include <sys/socket.h>
12 : #include <netinet/in.h>
13 : #include <netinet/ip_icmp.h>
14 :
15 0 : #define PEER_STATE_UNPINGED 0
16 0 : #define PEER_STATE_PINGED 1
17 0 : #define PEER_STATE_VALID 2
18 0 : #define PEER_STATE_REFRESHING 3
19 0 : #define PEER_STATE_INVALID 4
20 :
21 0 : #define PEER_DEADLINE_NANOS_PING (1L*1000L*1000L*1000L) /* 1 second */
22 0 : #define PEER_DEADLINE_NANOS_VALID (2L*60L*1000L*1000L*1000L) /* 2 minutes */
23 0 : #define PEER_DEADLINE_NANOS_INVALID (5L*60L*1000L*1000L*1000L) /* 5 minutes */
24 :
25 0 : #define FD_SSPING_RATE_LIMIT (64UL) /* Only ping up to 64 peers at a time */
26 :
27 : struct fd_ssping_peer {
28 : ulong refcnt;
29 : fd_ip4_port_t addr;
30 :
31 : struct {
32 : ulong next;
33 : } pool;
34 :
35 : struct {
36 : ulong next;
37 : ulong prev;
38 : } map;
39 :
40 : struct {
41 : ulong next;
42 : ulong prev;
43 : } deadline;
44 :
45 : struct {
46 : ulong idx;
47 : } fd;
48 :
49 : int state;
50 : ulong latency_nanos;
51 : long deadline_nanos;
52 : };
53 :
54 : typedef struct fd_ssping_peer fd_ssping_peer_t;
55 :
56 : #define POOL_NAME peer_pool
57 0 : #define POOL_T fd_ssping_peer_t
58 : #define POOL_IDX_T ulong
59 0 : #define POOL_NEXT pool.next
60 : #include "../../../util/tmpl/fd_pool.c"
61 :
62 : #define MAP_NAME peer_map
63 0 : #define MAP_KEY addr
64 0 : #define MAP_ELE_T fd_ssping_peer_t
65 : #define MAP_KEY_T fd_ip4_port_t
66 0 : #define MAP_PREV map.prev
67 0 : #define MAP_NEXT map.next
68 0 : #define MAP_KEY_EQ(k0,k1) ((k0)->l==(k1)->l)
69 0 : #define MAP_KEY_HASH(key,seed) (seed^(key)->l)
70 : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
71 : #include "../../../util/tmpl/fd_map_chain.c"
72 :
73 : #define DLIST_NAME deadline_list
74 : #define DLIST_ELE_T fd_ssping_peer_t
75 0 : #define DLIST_PREV deadline.prev
76 0 : #define DLIST_NEXT deadline.next
77 :
78 : #include "../../../util/tmpl/fd_dlist.c"
79 :
80 : struct fd_ssping_private {
81 : fd_ssping_peer_t * pool;
82 : peer_map_t * map;
83 :
84 : deadline_list_t * unpinged;
85 : deadline_list_t * pinged;
86 : deadline_list_t * valid;
87 : deadline_list_t * refreshing;
88 : deadline_list_t * invalid;
89 :
90 : ulong fds_len;
91 : struct pollfd * fds;
92 : ulong * fds_idx;
93 : ulong poll_idx;
94 :
95 : fd_ssping_on_ping_fn_t on_ping_cb;
96 : void * cb_arg;
97 :
98 : ulong magic; /* ==FD_SSPING_MAGIC */
99 : };
100 :
101 : FD_FN_CONST ulong
102 0 : fd_ssping_align( void ) {
103 0 : return FD_SSPING_ALIGN;
104 0 : }
105 :
106 : FD_FN_CONST ulong
107 0 : fd_ssping_footprint( ulong max_peers ) {
108 0 : ulong l;
109 0 : l = FD_LAYOUT_INIT;
110 0 : l = FD_LAYOUT_APPEND( l, FD_SSPING_ALIGN, sizeof(fd_ssping_t) );
111 0 : l = FD_LAYOUT_APPEND( l, peer_pool_align(), peer_pool_footprint( max_peers ) );
112 0 : l = FD_LAYOUT_APPEND( l, peer_map_align(), peer_map_footprint( max_peers ) );
113 0 : l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
114 0 : l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
115 0 : l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
116 0 : l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
117 0 : l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
118 0 : l = FD_LAYOUT_APPEND( l, sizeof(struct pollfd), max_peers*sizeof(struct pollfd) );
119 0 : l = FD_LAYOUT_APPEND( l, sizeof(ulong), max_peers*sizeof(ulong) );
120 0 : return FD_LAYOUT_FINI( l, FD_SSPING_ALIGN );
121 0 : }
122 :
123 : void *
124 : fd_ssping_new( void * shmem,
125 : ulong max_peers,
126 : ulong seed,
127 : fd_ssping_on_ping_fn_t on_ping_cb,
128 0 : void * cb_arg ) {
129 0 : if( FD_UNLIKELY( !shmem ) ) {
130 0 : FD_LOG_WARNING(( "NULL shmem" ));
131 0 : return NULL;
132 0 : }
133 :
134 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_ssping_align() ) ) ) {
135 0 : FD_LOG_WARNING(( "unaligned shmem" ));
136 0 : return NULL;
137 0 : }
138 :
139 0 : if( FD_UNLIKELY( max_peers < 1UL ) ) {
140 0 : FD_LOG_WARNING(( "max_peers must be at least 1" ));
141 0 : return NULL;
142 0 : }
143 :
144 0 : FD_SCRATCH_ALLOC_INIT( l, shmem );
145 0 : fd_ssping_t * ssping = FD_SCRATCH_ALLOC_APPEND( l, FD_SSPING_ALIGN, sizeof(fd_ssping_t) );
146 0 : void * _pool = FD_SCRATCH_ALLOC_APPEND( l, peer_pool_align(), peer_pool_footprint( max_peers ) );
147 0 : void * _map = FD_SCRATCH_ALLOC_APPEND( l, peer_map_align(), peer_map_footprint( max_peers ) );
148 0 : void * _unpinged = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
149 0 : void * _pinged = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
150 0 : void * _valid = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
151 0 : void * _refreshing = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
152 0 : void * _invalid = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
153 0 : struct pollfd * fds = FD_SCRATCH_ALLOC_APPEND( l, sizeof(struct pollfd), max_peers*sizeof(struct pollfd) );
154 0 : ulong * fds_idx = FD_SCRATCH_ALLOC_APPEND( l, sizeof(ulong), max_peers*sizeof(ulong) );
155 :
156 0 : ssping->pool = peer_pool_join( peer_pool_new( _pool, max_peers ) );
157 0 : ssping->map = peer_map_join( peer_map_new( _map, max_peers, seed ) );
158 :
159 0 : ssping->unpinged = deadline_list_join( deadline_list_new( _unpinged ) );
160 0 : ssping->pinged = deadline_list_join( deadline_list_new( _pinged ) );
161 0 : ssping->valid = deadline_list_join( deadline_list_new( _valid ) );
162 0 : ssping->refreshing = deadline_list_join( deadline_list_new( _refreshing ) );
163 0 : ssping->invalid = deadline_list_join( deadline_list_new( _invalid ) );
164 :
165 0 : ssping->fds_len = 0UL;
166 0 : ssping->fds = fds;
167 0 : ssping->fds_idx = fds_idx;
168 0 : ssping->poll_idx = 0UL;
169 :
170 0 : ssping->on_ping_cb = on_ping_cb;
171 0 : ssping->cb_arg = cb_arg;
172 :
173 0 : FD_COMPILER_MFENCE();
174 0 : FD_VOLATILE( ssping->magic ) = FD_SSPING_MAGIC;
175 0 : FD_COMPILER_MFENCE();
176 :
177 0 : return (void *)ssping;
178 0 : }
179 :
180 : fd_ssping_t *
181 0 : fd_ssping_join( void * shping ) {
182 0 : if( FD_UNLIKELY( !shping ) ) {
183 0 : FD_LOG_WARNING(( "NULL shping" ));
184 0 : return NULL;
185 0 : }
186 :
187 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shping, fd_ssping_align() ) ) ) {
188 0 : FD_LOG_WARNING(( "misaligned shping" ));
189 0 : return NULL;
190 0 : }
191 :
192 0 : fd_ssping_t * ssping = (fd_ssping_t *)shping;
193 :
194 0 : if( FD_UNLIKELY( ssping->magic!=FD_SSPING_MAGIC ) ) {
195 0 : FD_LOG_WARNING(( "bad magic" ));
196 0 : return NULL;
197 0 : }
198 :
199 0 : return ssping;
200 0 : }
201 :
202 : void
203 : fd_ssping_add( fd_ssping_t * ssping,
204 0 : fd_ip4_port_t addr ) {
205 0 : fd_ssping_peer_t * peer = peer_map_ele_query( ssping->map, &addr, NULL, ssping->pool );
206 0 : if( FD_LIKELY( !peer ) ) {
207 0 : if( FD_UNLIKELY( !peer_pool_free( ssping->pool ) ) ) return;
208 0 : peer = peer_pool_ele_acquire( ssping->pool );
209 0 : FD_TEST( peer );
210 0 : peer->refcnt = 0UL;
211 0 : peer->state = PEER_STATE_UNPINGED;
212 0 : peer->addr = addr;
213 0 : peer->latency_nanos = ULONG_MAX;
214 0 : peer_map_ele_insert( ssping->map, peer, ssping->pool );
215 0 : deadline_list_ele_push_tail( ssping->unpinged, peer, ssping->pool );
216 0 : }
217 0 : peer->refcnt++;
218 0 : }
219 :
220 : static inline void
221 : remove_ping_fd( fd_ssping_t * ssping,
222 0 : ulong idx ) {
223 0 : FD_TEST( idx<ssping->fds_len );
224 :
225 0 : if( FD_UNLIKELY( ssping->fds_len==1UL ) ) {
226 0 : ssping->fds_len = 0UL;
227 0 : return;
228 0 : }
229 :
230 0 : ssping->fds[ idx ] = ssping->fds[ ssping->fds_len-1UL ];
231 0 : ssping->fds_idx[ idx ] = ssping->fds_idx[ ssping->fds_len-1UL ];
232 :
233 0 : fd_ssping_peer_t * peer = peer_pool_ele( ssping->pool, ssping->fds_idx[ idx ] );
234 0 : peer->fd.idx = idx;
235 :
236 0 : ssping->fds_len--;
237 0 : }
238 :
239 : int
240 : fd_ssping_remove( fd_ssping_t * ssping,
241 0 : fd_ip4_port_t addr ) {
242 0 : fd_ssping_peer_t * peer = peer_map_ele_query( ssping->map, &addr, NULL, ssping->pool );
243 0 : FD_TEST( peer );
244 0 : FD_TEST( peer->refcnt );
245 0 : peer->refcnt--;
246 0 : if( FD_LIKELY( !peer->refcnt ) ) {
247 0 : switch( peer->state ) {
248 0 : case PEER_STATE_UNPINGED:
249 0 : deadline_list_ele_remove( ssping->unpinged, peer, ssping->pool );
250 0 : break;
251 0 : case PEER_STATE_PINGED:
252 0 : remove_ping_fd( ssping, peer->fd.idx );
253 0 : deadline_list_ele_remove( ssping->pinged, peer, ssping->pool );
254 0 : break;
255 0 : case PEER_STATE_VALID:
256 0 : deadline_list_ele_remove( ssping->valid, peer, ssping->pool );
257 0 : break;
258 0 : case PEER_STATE_REFRESHING:
259 0 : remove_ping_fd( ssping, peer->fd.idx );
260 0 : deadline_list_ele_remove( ssping->refreshing, peer, ssping->pool );
261 0 : break;
262 0 : case PEER_STATE_INVALID:
263 0 : deadline_list_ele_remove( ssping->invalid, peer, ssping->pool );
264 0 : break;
265 0 : }
266 0 : peer_map_ele_remove_fast( ssping->map, peer, ssping->pool );
267 0 : peer_pool_ele_release( ssping->pool, peer );
268 0 : return 1;
269 0 : }
270 0 : return 0;
271 0 : }
272 :
273 : static inline void
274 : unping_peer( fd_ssping_t * ssping,
275 : fd_ssping_peer_t * peer,
276 0 : long now ) {
277 0 : FD_TEST( peer->state==PEER_STATE_PINGED || peer->state==PEER_STATE_REFRESHING );
278 :
279 0 : remove_ping_fd( ssping, peer->fd.idx );
280 0 : if( FD_UNLIKELY( peer->state==PEER_STATE_PINGED ) ) {
281 0 : deadline_list_ele_remove( ssping->pinged, peer, ssping->pool );
282 0 : } else if( FD_UNLIKELY( peer->state==PEER_STATE_REFRESHING ) ) {
283 0 : deadline_list_ele_remove( ssping->refreshing, peer, ssping->pool );
284 0 : }
285 0 : peer->state = PEER_STATE_INVALID;
286 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
287 0 : deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool );
288 0 : }
289 :
290 : void
291 : fd_ssping_invalidate( fd_ssping_t * ssping,
292 : fd_ip4_port_t addr,
293 0 : long now ) {
294 0 : fd_ssping_peer_t * peer = peer_map_ele_query( ssping->map, &addr, NULL, ssping->pool );
295 0 : if( FD_UNLIKELY( !peer ) ) return;
296 :
297 0 : if( FD_UNLIKELY( peer->state==PEER_STATE_PINGED || peer->state==PEER_STATE_REFRESHING ) ) {
298 0 : unping_peer( ssping, peer, now );
299 0 : } else {
300 0 : FD_TEST( peer->state==PEER_STATE_UNPINGED || peer->state==PEER_STATE_VALID );
301 0 : if( FD_LIKELY( peer->state==PEER_STATE_UNPINGED ) ) {
302 0 : deadline_list_ele_remove( ssping->unpinged, peer, ssping->pool );
303 0 : } else if( FD_UNLIKELY( peer->state==PEER_STATE_VALID ) ) {
304 0 : deadline_list_ele_remove( ssping->valid, peer, ssping->pool );
305 0 : }
306 0 : peer->state = PEER_STATE_INVALID;
307 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
308 0 : deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool );
309 0 : }
310 0 : }
311 :
312 : static inline void
313 : poll_advance( fd_ssping_t * ssping,
314 0 : long now ) {
315 0 : if( FD_LIKELY( !ssping->fds_len ) ) return;
316 :
317 0 : ulong next_poll_idx = fd_ulong_min( ssping->poll_idx+FD_SSPING_RATE_LIMIT, ssping->fds_len );
318 0 : ulong fds_cnt = fd_ulong_min( next_poll_idx - ssping->poll_idx, FD_SSPING_RATE_LIMIT );
319 0 : int nfds = fd_syscall_poll( ssping->fds+ssping->poll_idx, (uint)fds_cnt, 0 );
320 0 : if( FD_LIKELY( !nfds ) ) return;
321 0 : else if( FD_UNLIKELY( -1==nfds && errno==EINTR ) ) return;
322 0 : else if( FD_UNLIKELY( -1==nfds ) ) FD_LOG_ERR(( "poll failed (%i-%s)", errno, strerror( errno ) ));
323 :
324 0 : for( ulong i=ssping->poll_idx; i<next_poll_idx; i++ ) {
325 0 : struct pollfd * pfd = &ssping->fds[ i ];
326 0 : if( FD_UNLIKELY( pfd->revents & (POLLERR|POLLHUP) ) ) {
327 0 : unping_peer( ssping, peer_pool_ele( ssping->pool, ssping->fds_idx[ i ] ), now );
328 0 : continue;
329 0 : }
330 :
331 0 : if( FD_LIKELY( pfd->revents & POLLOUT ) ) {
332 0 : struct icmphdr icmp_hdr = (struct icmphdr){
333 0 : .type = ICMP_ECHO,
334 0 : .code = 0,
335 0 : .un.echo.id = 0, /* Automatically set by kernel for a ping socket */
336 0 : .un.echo.sequence = 0, /* Only one ping goes out per socket, so nothing to change */
337 0 : .checksum = 0 /* Will be calculated by the kernel */
338 0 : };
339 :
340 0 : long result = sendto( pfd->fd, &icmp_hdr, sizeof(icmp_hdr), 0, NULL, 0 );
341 0 : if( FD_UNLIKELY( !result ) ) continue;
342 0 : if( FD_UNLIKELY( -1==result && errno==EAGAIN ) ) continue;
343 0 : else if( FD_UNLIKELY( -1==result ) ) {
344 0 : unping_peer( ssping, peer_pool_ele( ssping->pool, ssping->fds_idx[ i ] ), now );
345 0 : continue;
346 0 : }
347 0 : pfd->revents &= ~POLLOUT;
348 0 : }
349 :
350 0 : if( FD_LIKELY( pfd->revents & POLLIN ) ) {
351 0 : struct icmphdr icmp_hdr;
352 0 : long result = recvfrom( pfd->fd, &icmp_hdr, sizeof(icmp_hdr), 0, NULL, NULL );
353 0 : if( FD_UNLIKELY( -1==result && errno==EAGAIN ) ) continue;
354 0 : else if( FD_UNLIKELY( -1==result || (ulong)result<sizeof(icmp_hdr) || icmp_hdr.type!=ICMP_ECHOREPLY ) ) {
355 0 : unping_peer( ssping, peer_pool_ele( ssping->pool, ssping->fds_idx[ i ] ), now );
356 0 : continue;
357 0 : }
358 :
359 0 : fd_ssping_peer_t * peer = peer_pool_ele( ssping->pool, ssping->fds_idx[ i ] );
360 0 : FD_TEST( peer->deadline_nanos>now );
361 0 : peer->latency_nanos = PEER_DEADLINE_NANOS_PING - (ulong)(peer->deadline_nanos - now);
362 0 : peer->state = PEER_STATE_VALID;
363 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_VALID;
364 :
365 0 : deadline_list_ele_remove( ssping->pinged, peer, ssping->pool );
366 0 : deadline_list_ele_push_tail( ssping->valid, peer, ssping->pool );
367 0 : remove_ping_fd( ssping, i );
368 :
369 0 : ssping->on_ping_cb( ssping->cb_arg, peer->addr, peer->latency_nanos );
370 :
371 : /* fds_len could have decreased if we removed a ping fd */
372 0 : next_poll_idx = fd_ulong_min( next_poll_idx, ssping->fds_len );
373 0 : }
374 0 : }
375 :
376 0 : if( FD_UNLIKELY( next_poll_idx>=ssping->fds_len ) ) ssping->poll_idx = 0UL;
377 0 : else ssping->poll_idx = next_poll_idx;
378 0 : }
379 :
380 : static int
381 : peer_connect( fd_ssping_t * ssping,
382 0 : fd_ssping_peer_t * peer ) {
383 0 : int sockfd = socket( PF_INET, SOCK_DGRAM|SOCK_NONBLOCK, IPPROTO_ICMP );
384 0 : if( FD_UNLIKELY( -1==sockfd ) ) FD_LOG_ERR(( "socket failed (%i-%s)", errno, strerror( errno ) ));
385 :
386 0 : struct sockaddr_in addr = {
387 0 : .sin_family = AF_INET,
388 0 : .sin_port = fd_ushort_bswap( peer->addr.port ),
389 0 : .sin_addr = { .s_addr = peer->addr.addr }
390 0 : };
391 :
392 0 : if( FD_UNLIKELY( -1==connect( sockfd, fd_type_pun( &addr ), sizeof(addr) ) && errno!=EINPROGRESS ) ) {
393 0 : if( FD_UNLIKELY( -1==close( sockfd ) ) ) FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
394 0 : return -1;
395 0 : }
396 :
397 0 : ssping->fds[ ssping->fds_len ] = (struct pollfd){
398 0 : .fd = sockfd,
399 0 : .events = POLLIN|POLLOUT,
400 0 : .revents = 0
401 0 : };
402 0 : ssping->fds_idx[ ssping->fds_len ] = peer_pool_idx( ssping->pool, peer );
403 0 : peer->fd.idx = ssping->fds_len;
404 0 : ssping->fds_len++;
405 :
406 0 : return 0;
407 0 : }
408 :
409 : void
410 : fd_ssping_advance( fd_ssping_t * ssping,
411 : long now,
412 0 : fd_sspeer_selector_t * selector) {
413 0 : while( !deadline_list_is_empty( ssping->unpinged, ssping->pool ) ) {
414 0 : fd_ssping_peer_t * peer = deadline_list_ele_pop_head( ssping->unpinged, ssping->pool );
415 :
416 0 : FD_LOG_INFO(( "pinging " FD_IP4_ADDR_FMT ":%hu", FD_IP4_ADDR_FMT_ARGS( peer->addr.addr ), peer->addr.port ));
417 0 : int result = peer_connect( ssping, peer );
418 0 : if( FD_UNLIKELY( -1==result ) ) {
419 0 : peer->state = PEER_STATE_INVALID;
420 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
421 0 : deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool );
422 0 : } else {
423 0 : peer->state = PEER_STATE_PINGED;
424 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_PING;
425 0 : deadline_list_ele_push_tail( ssping->pinged, peer, ssping->pool );
426 0 : }
427 0 : }
428 :
429 0 : while( !deadline_list_is_empty( ssping->pinged, ssping->pool ) ) {
430 0 : fd_ssping_peer_t * peer = deadline_list_ele_peek_head( ssping->pinged, ssping->pool );
431 0 : if( FD_LIKELY( peer->deadline_nanos>now ) ) break;
432 :
433 0 : deadline_list_ele_pop_head( ssping->pinged, ssping->pool );
434 :
435 0 : peer->state = PEER_STATE_INVALID;
436 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
437 0 : deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool );
438 0 : remove_ping_fd( ssping, peer->fd.idx );
439 0 : fd_sspeer_selector_remove( selector, peer->addr );
440 0 : }
441 :
442 0 : while( !deadline_list_is_empty( ssping->valid, ssping->pool ) ) {
443 0 : fd_ssping_peer_t * peer = deadline_list_ele_peek_head( ssping->valid, ssping->pool );
444 0 : if( FD_LIKELY( peer->deadline_nanos>now ) ) break;
445 :
446 0 : deadline_list_ele_pop_head( ssping->valid, ssping->pool );
447 :
448 0 : int result = peer_connect( ssping, peer );
449 0 : if( FD_UNLIKELY( -1==result ) ) {
450 0 : peer->state = PEER_STATE_INVALID;
451 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
452 0 : deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool );
453 0 : fd_sspeer_selector_remove( selector, peer->addr );
454 0 : } else {
455 0 : peer->state = PEER_STATE_REFRESHING;
456 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_PING;
457 0 : deadline_list_ele_push_tail( ssping->refreshing, peer, ssping->pool );
458 0 : }
459 0 : }
460 :
461 0 : while( !deadline_list_is_empty( ssping->refreshing, ssping->pool ) ) {
462 0 : fd_ssping_peer_t * peer = deadline_list_ele_peek_head( ssping->refreshing, ssping->pool );
463 0 : if( FD_LIKELY( peer->deadline_nanos>now ) ) break;
464 :
465 0 : deadline_list_ele_pop_head( ssping->refreshing, ssping->pool );
466 :
467 0 : peer->state = PEER_STATE_INVALID;
468 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
469 0 : deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool );
470 0 : remove_ping_fd( ssping, peer->fd.idx );
471 0 : fd_sspeer_selector_remove( selector, peer->addr );
472 0 : }
473 :
474 0 : while( !deadline_list_is_empty( ssping->invalid, ssping->pool ) ) {
475 0 : fd_ssping_peer_t * peer = deadline_list_ele_peek_head( ssping->invalid, ssping->pool );
476 0 : if( FD_LIKELY( peer->deadline_nanos>now ) ) break;
477 :
478 0 : deadline_list_ele_pop_head( ssping->invalid, ssping->pool );
479 :
480 0 : peer->state = PEER_STATE_UNPINGED;
481 0 : peer->deadline_nanos = 0L;
482 0 : deadline_list_ele_push_tail( ssping->unpinged, peer, ssping->pool );
483 0 : }
484 :
485 0 : poll_advance( ssping, now );
486 0 : }
|