Line data Source code
1 : #include "fd_ssping.h"
2 :
3 : #include "../../../util/bits/fd_bits.h"
4 : #include "../../../util/log/fd_log.h"
5 :
6 : #include <poll.h>
7 : #include <errno.h>
8 : #include <unistd.h>
9 : #include <sys/socket.h>
10 : #include <netinet/in.h>
11 : #include <netinet/ip_icmp.h>
12 :
13 0 : #define PEER_STATE_UNPINGED 0
14 0 : #define PEER_STATE_PINGED 1
15 0 : #define PEER_STATE_VALID 2
16 0 : #define PEER_STATE_REFRESHING 3
17 0 : #define PEER_STATE_INVALID 4
18 :
19 0 : #define PEER_DEADLINE_NANOS_PING (1L*1000L*1000L*1000L) /* 1 second */
20 0 : #define PEER_DEADLINE_NANOS_VALID (2L*60L*1000L*1000L*1000L) /* 2 minutes */
21 0 : #define PEER_DEADLINE_NANOS_INVALID (5L*60L*1000L*1000L*1000L) /* 5 minutes */
22 :
23 : struct fd_ssping_peer {
24 : ulong refcnt;
25 : fd_ip4_port_t addr;
26 :
27 : struct {
28 : ulong next;
29 : } pool;
30 :
31 : struct {
32 : ulong next;
33 : ulong prev;
34 : } map;
35 :
36 : struct {
37 : ulong parent;
38 : ulong left;
39 : ulong right;
40 : ulong prio;
41 : } score_treap;
42 :
43 : struct {
44 : ulong next;
45 : ulong prev;
46 : } deadline;
47 :
48 : struct {
49 : ulong idx;
50 : } fd;
51 :
52 : int state;
53 : ulong latency_nanos;
54 : long deadline_nanos;
55 : };
56 :
57 : typedef struct fd_ssping_peer fd_ssping_peer_t;
58 :
59 : #define POOL_NAME peer_pool
60 0 : #define POOL_T fd_ssping_peer_t
61 : #define POOL_IDX_T ulong
62 0 : #define POOL_NEXT pool.next
63 : #include "../../../util/tmpl/fd_pool.c"
64 :
65 : #define MAP_NAME peer_map
66 0 : #define MAP_KEY addr
67 0 : #define MAP_ELE_T fd_ssping_peer_t
68 : #define MAP_KEY_T fd_ip4_port_t
69 0 : #define MAP_PREV map.prev
70 0 : #define MAP_NEXT map.next
71 0 : #define MAP_KEY_EQ(k0,k1) ((k0)->l==(k1)->l)
72 0 : #define MAP_KEY_HASH(key,seed) (seed^(key)->l)
73 : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
74 : #include "../../../util/tmpl/fd_map_chain.c"
75 :
76 0 : #define COMPARE_WORSE(x,y) ( (x)->latency_nanos<(y)->latency_nanos )
77 :
78 : #define TREAP_T fd_ssping_peer_t
79 : #define TREAP_NAME score_treap
80 : #define TREAP_QUERY_T void * /* We don't use query ... */
81 : #define TREAP_CMP(a,b) (__extension__({ (void)(a); (void)(b); -1; })) /* which means we don't need to give a real
82 : implementation to cmp either */
83 0 : #define TREAP_IDX_T ulong
84 0 : #define TREAP_LT COMPARE_WORSE
85 0 : #define TREAP_PARENT score_treap.parent
86 0 : #define TREAP_LEFT score_treap.left
87 0 : #define TREAP_RIGHT score_treap.right
88 0 : #define TREAP_PRIO score_treap.prio
89 : #include "../../../util/tmpl/fd_treap.c"
90 :
91 : #define DLIST_NAME deadline_list
92 : #define DLIST_ELE_T fd_ssping_peer_t
93 0 : #define DLIST_PREV deadline.prev
94 0 : #define DLIST_NEXT deadline.next
95 :
96 : #include "../../../util/tmpl/fd_dlist.c"
97 :
98 : struct fd_ssping_private {
99 : fd_ssping_peer_t * pool;
100 : peer_map_t * map;
101 : score_treap_t * score_treap;
102 :
103 : deadline_list_t * unpinged;
104 : deadline_list_t * pinged;
105 : deadline_list_t * valid;
106 : deadline_list_t * refreshing;
107 : deadline_list_t * invalid;
108 :
109 : ulong fds_len;
110 : struct pollfd * fds;
111 : ulong * fds_idx;
112 :
113 : ulong magic; /* ==FD_SSPING_MAGIC */
114 : };
115 :
116 : FD_FN_CONST ulong
117 0 : fd_ssping_align( void ) {
118 0 : return FD_SSPING_ALIGN;
119 0 : }
120 :
121 : FD_FN_CONST ulong
122 0 : fd_ssping_footprint( ulong max_peers ) {
123 0 : ulong l;
124 0 : l = FD_LAYOUT_INIT;
125 0 : l = FD_LAYOUT_APPEND( l, FD_SSPING_ALIGN, sizeof(fd_ssping_t) );
126 0 : l = FD_LAYOUT_APPEND( l, peer_pool_align(), peer_pool_footprint( max_peers ) );
127 0 : l = FD_LAYOUT_APPEND( l, peer_map_align(), peer_map_footprint( max_peers ) );
128 0 : l = FD_LAYOUT_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
129 0 : l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
130 0 : l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
131 0 : l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
132 0 : l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
133 0 : l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
134 0 : l = FD_LAYOUT_APPEND( l, sizeof(struct pollfd), max_peers*sizeof(struct pollfd) );
135 0 : l = FD_LAYOUT_APPEND( l, sizeof(ulong), max_peers*sizeof(ulong) );
136 0 : return FD_LAYOUT_FINI( l, FD_SSPING_ALIGN );
137 0 : }
138 :
139 : void *
140 : fd_ssping_new( void * shmem,
141 : ulong max_peers,
142 0 : ulong seed ) {
143 0 : if( FD_UNLIKELY( !shmem ) ) {
144 0 : FD_LOG_WARNING(( "NULL shmem" ));
145 0 : return NULL;
146 0 : }
147 :
148 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_ssping_align() ) ) ) {
149 0 : FD_LOG_WARNING(( "unaligned shmem" ));
150 0 : return NULL;
151 0 : }
152 :
153 0 : if( FD_UNLIKELY( max_peers < 1UL ) ) {
154 0 : FD_LOG_WARNING(( "max_peers must be at least 1" ));
155 0 : return NULL;
156 0 : }
157 :
158 0 : FD_SCRATCH_ALLOC_INIT( l, shmem );
159 0 : fd_ssping_t * ssping = FD_SCRATCH_ALLOC_APPEND( l, FD_SSPING_ALIGN, sizeof(fd_ssping_t) );
160 0 : void * _pool = FD_SCRATCH_ALLOC_APPEND( l, peer_pool_align(), peer_pool_footprint( max_peers ) );
161 0 : void * _map = FD_SCRATCH_ALLOC_APPEND( l, peer_map_align(), peer_map_footprint( max_peers ) );
162 0 : void * _score_treap = FD_SCRATCH_ALLOC_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
163 0 : void * _unpinged = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
164 0 : void * _pinged = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
165 0 : void * _valid = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
166 0 : void * _refreshing = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
167 0 : void * _invalid = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
168 0 : struct pollfd * fds = FD_SCRATCH_ALLOC_APPEND( l, sizeof(struct pollfd), max_peers*sizeof(struct pollfd) );
169 0 : ulong * fds_idx = FD_SCRATCH_ALLOC_APPEND( l, sizeof(ulong), max_peers*sizeof(ulong) );
170 :
171 0 : ssping->pool = peer_pool_join( peer_pool_new( _pool, max_peers ) );
172 0 : ssping->map = peer_map_join( peer_map_new( _map, max_peers, seed ) );
173 0 : ssping->score_treap = score_treap_join( score_treap_new( _score_treap, max_peers ) );
174 :
175 0 : ssping->unpinged = deadline_list_join( deadline_list_new( _unpinged ) );
176 0 : ssping->pinged = deadline_list_join( deadline_list_new( _pinged ) );
177 0 : ssping->valid = deadline_list_join( deadline_list_new( _valid ) );
178 0 : ssping->refreshing = deadline_list_join( deadline_list_new( _refreshing ) );
179 0 : ssping->invalid = deadline_list_join( deadline_list_new( _invalid ) );
180 :
181 0 : ssping->fds_len = 0UL;
182 0 : ssping->fds = fds;
183 0 : ssping->fds_idx = fds_idx;
184 :
185 0 : FD_COMPILER_MFENCE();
186 0 : FD_VOLATILE( ssping->magic ) = FD_SSPING_MAGIC;
187 0 : FD_COMPILER_MFENCE();
188 :
189 0 : return (void *)ssping;
190 0 : }
191 :
192 : fd_ssping_t *
193 0 : fd_ssping_join( void * shping ) {
194 0 : if( FD_UNLIKELY( !shping ) ) {
195 0 : FD_LOG_WARNING(( "NULL shping" ));
196 0 : return NULL;
197 0 : }
198 :
199 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shping, fd_ssping_align() ) ) ) {
200 0 : FD_LOG_WARNING(( "misaligned shping" ));
201 0 : return NULL;
202 0 : }
203 :
204 0 : fd_ssping_t * ssping = (fd_ssping_t *)shping;
205 :
206 0 : if( FD_UNLIKELY( ssping->magic!=FD_SSPING_MAGIC ) ) {
207 0 : FD_LOG_WARNING(( "bad magic" ));
208 0 : return NULL;
209 0 : }
210 :
211 0 : return ssping;
212 0 : }
213 :
214 : void
215 : fd_ssping_add( fd_ssping_t * ssping,
216 0 : fd_ip4_port_t addr ) {
217 0 : fd_ssping_peer_t * peer = peer_map_ele_query( ssping->map, &addr, NULL, ssping->pool );
218 0 : if( FD_LIKELY( !peer ) ) {
219 0 : if( FD_UNLIKELY( !peer_pool_free( ssping->pool ) ) ) return;
220 0 : peer = peer_pool_ele_acquire( ssping->pool );
221 0 : FD_TEST( peer );
222 0 : peer->refcnt = 0UL;
223 0 : peer->state = PEER_STATE_UNPINGED;
224 0 : peer->addr = addr;
225 0 : peer_map_ele_insert( ssping->map, peer, ssping->pool );
226 0 : deadline_list_ele_push_tail( ssping->unpinged, peer, ssping->pool );
227 0 : }
228 0 : peer->refcnt++;
229 0 : }
230 :
231 : static inline void
232 : remove_ping_fd( fd_ssping_t * ssping,
233 0 : ulong idx ) {
234 0 : FD_TEST( idx<ssping->fds_len );
235 :
236 0 : if( FD_UNLIKELY( ssping->fds_len==1UL ) ) {
237 0 : ssping->fds_len = 0UL;
238 0 : return;
239 0 : }
240 :
241 0 : ssping->fds[ idx ] = ssping->fds[ ssping->fds_len-1UL ];
242 0 : ssping->fds_idx[ idx ] = ssping->fds_idx[ ssping->fds_len-1UL ];
243 :
244 0 : fd_ssping_peer_t * peer = peer_pool_ele( ssping->pool, ssping->fds_idx[ idx ] );
245 0 : peer->fd.idx = idx;
246 :
247 0 : ssping->fds_len--;
248 0 : }
249 :
250 : void
251 : fd_ssping_remove( fd_ssping_t * ssping,
252 0 : fd_ip4_port_t addr ) {
253 0 : fd_ssping_peer_t * peer = peer_map_ele_query( ssping->map, &addr, NULL, ssping->pool );
254 0 : FD_TEST( peer );
255 0 : FD_TEST( peer->refcnt );
256 0 : peer->refcnt--;
257 0 : if( FD_LIKELY( !peer->refcnt ) ) {
258 0 : switch( peer->state ) {
259 0 : case PEER_STATE_UNPINGED:
260 0 : deadline_list_ele_remove( ssping->unpinged, peer, ssping->pool );
261 0 : break;
262 0 : case PEER_STATE_PINGED:
263 0 : remove_ping_fd( ssping, peer->fd.idx );
264 0 : deadline_list_ele_remove( ssping->pinged, peer, ssping->pool );
265 0 : break;
266 0 : case PEER_STATE_VALID:
267 0 : score_treap_ele_remove( ssping->score_treap, peer, ssping->pool );
268 0 : deadline_list_ele_remove( ssping->valid, peer, ssping->pool );
269 0 : break;
270 0 : case PEER_STATE_REFRESHING:
271 0 : remove_ping_fd( ssping, peer->fd.idx );
272 0 : score_treap_ele_remove( ssping->score_treap, peer, ssping->pool );
273 0 : deadline_list_ele_remove( ssping->refreshing, peer, ssping->pool );
274 0 : break;
275 0 : case PEER_STATE_INVALID:
276 0 : deadline_list_ele_remove( ssping->invalid, peer, ssping->pool );
277 0 : break;
278 0 : }
279 0 : peer_map_ele_remove_fast( ssping->map, peer, ssping->pool );
280 0 : peer_pool_ele_release( ssping->pool, peer );
281 0 : }
282 0 : }
283 :
284 : static inline void
285 : unping_peer( fd_ssping_t * ssping,
286 : fd_ssping_peer_t * peer,
287 0 : long now ) {
288 0 : FD_TEST( peer->state==PEER_STATE_PINGED || peer->state==PEER_STATE_REFRESHING );
289 :
290 0 : remove_ping_fd( ssping, peer->fd.idx );
291 0 : if( FD_UNLIKELY( peer->state==PEER_STATE_PINGED ) ) {
292 0 : deadline_list_ele_remove( ssping->pinged, peer, ssping->pool );
293 0 : } else if( FD_UNLIKELY( peer->state==PEER_STATE_REFRESHING ) ) {
294 0 : score_treap_ele_remove( ssping->score_treap, peer, ssping->pool );
295 0 : deadline_list_ele_remove( ssping->refreshing, peer, ssping->pool );
296 0 : }
297 0 : peer->state = PEER_STATE_INVALID;
298 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
299 0 : deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool );
300 0 : }
301 :
302 : void
303 : fd_ssping_invalidate( fd_ssping_t * ssping,
304 : fd_ip4_port_t addr,
305 0 : long now ) {
306 0 : fd_ssping_peer_t * peer = peer_map_ele_query( ssping->map, &addr, NULL, ssping->pool );
307 0 : if( FD_UNLIKELY( !peer ) ) return;
308 :
309 0 : if( FD_UNLIKELY( peer->state==PEER_STATE_PINGED || peer->state==PEER_STATE_REFRESHING ) ) {
310 0 : unping_peer( ssping, peer, now );
311 0 : } else {
312 0 : FD_TEST( peer->state==PEER_STATE_UNPINGED || peer->state==PEER_STATE_VALID );
313 0 : if( FD_LIKELY( peer->state==PEER_STATE_UNPINGED ) ) {
314 0 : deadline_list_ele_remove( ssping->unpinged, peer, ssping->pool );
315 0 : } else if( FD_UNLIKELY( peer->state==PEER_STATE_VALID ) ) {
316 0 : score_treap_ele_remove( ssping->score_treap, peer, ssping->pool );
317 0 : deadline_list_ele_remove( ssping->valid, peer, ssping->pool );
318 0 : }
319 0 : peer->state = PEER_STATE_INVALID;
320 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
321 0 : deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool );
322 0 : }
323 0 : }
324 :
325 : static inline void
326 : poll_advance( fd_ssping_t * ssping,
327 0 : long now ) {
328 0 : if( FD_LIKELY( !ssping->fds_len ) ) return;
329 :
330 0 : int nfds = poll( ssping->fds, ssping->fds_len, 0 );
331 0 : if( FD_LIKELY( !nfds ) ) return;
332 0 : else if( FD_UNLIKELY( -1==nfds && errno==EINTR ) ) return;
333 0 : else if( FD_UNLIKELY( -1==nfds ) ) FD_LOG_ERR(( "poll failed (%i-%s)", errno, strerror( errno ) ));
334 :
335 0 : for( ulong i=0UL; i<ssping->fds_len; i++ ) {
336 0 : struct pollfd * pfd = &ssping->fds[ i ];
337 0 : if( FD_UNLIKELY( pfd->revents & (POLLERR|POLLHUP) ) ) {
338 0 : unping_peer( ssping, peer_pool_ele( ssping->pool, ssping->fds_idx[ i ] ), now );
339 0 : continue;
340 0 : }
341 :
342 0 : if( FD_LIKELY( pfd->revents & POLLOUT ) ) {
343 0 : struct icmphdr icmp_hdr = (struct icmphdr){
344 0 : .type = ICMP_ECHO,
345 0 : .code = 0,
346 0 : .un.echo.id = 0, /* Automatically set by kernel for a ping socket */
347 0 : .un.echo.sequence = 0, /* Only one ping goes out per socket, so nothing to change */
348 0 : .checksum = 0 /* Will be calculated by the kernel */
349 0 : };
350 :
351 0 : long result = send( pfd->fd, &icmp_hdr, sizeof(icmp_hdr), 0 );
352 0 : if( FD_UNLIKELY( !result ) ) continue;
353 0 : if( FD_UNLIKELY( -1==result && errno==EAGAIN ) ) continue;
354 0 : else if( FD_UNLIKELY( -1==result ) ) {
355 0 : unping_peer( ssping, peer_pool_ele( ssping->pool, ssping->fds_idx[ i ] ), now );
356 0 : continue;
357 0 : }
358 0 : pfd->revents &= ~POLLOUT;
359 0 : }
360 :
361 0 : if( FD_LIKELY( pfd->revents & POLLIN ) ) {
362 0 : struct icmphdr icmp_hdr;
363 0 : long result = recv( pfd->fd, &icmp_hdr, sizeof(icmp_hdr), 0 );
364 0 : if( FD_UNLIKELY( -1==result && errno==EAGAIN ) ) continue;
365 0 : else if( FD_UNLIKELY( -1==result || (ulong)result<sizeof(icmp_hdr) || icmp_hdr.type!=ICMP_ECHOREPLY ) ) {
366 0 : unping_peer( ssping, peer_pool_ele( ssping->pool, ssping->fds_idx[ i ] ), now );
367 0 : continue;
368 0 : }
369 :
370 0 : fd_ssping_peer_t * peer = peer_pool_ele( ssping->pool, ssping->fds_idx[ i ] );
371 0 : FD_TEST( peer->deadline_nanos>now );
372 0 : peer->latency_nanos = PEER_DEADLINE_NANOS_PING - (ulong)(peer->deadline_nanos - now);
373 :
374 0 : if( FD_LIKELY( peer->state==PEER_STATE_REFRESHING ) ) {
375 0 : score_treap_ele_remove( ssping->score_treap, peer, ssping->pool );
376 0 : }
377 :
378 0 : FD_LOG_INFO(( "pinged " FD_IP4_ADDR_FMT ":%hu in %lu ns", FD_IP4_ADDR_FMT_ARGS( peer->addr.addr ), peer->addr.port, peer->latency_nanos ));
379 0 : peer->state = PEER_STATE_VALID;
380 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_VALID;
381 :
382 0 : deadline_list_ele_remove( ssping->pinged, peer, ssping->pool );
383 0 : deadline_list_ele_push_tail( ssping->valid, peer, ssping->pool );
384 0 : score_treap_ele_insert( ssping->score_treap, peer, ssping->pool );
385 0 : remove_ping_fd( ssping, i );
386 0 : }
387 0 : }
388 0 : }
389 :
390 : static int
391 : peer_connect( fd_ssping_t * ssping,
392 0 : fd_ssping_peer_t * peer ) {
393 0 : int sockfd = socket( PF_INET, SOCK_DGRAM|SOCK_NONBLOCK, IPPROTO_ICMP );
394 0 : if( FD_UNLIKELY( -1==sockfd ) ) FD_LOG_ERR(( "socket failed (%i-%s)", errno, strerror( errno ) ));
395 :
396 0 : struct sockaddr_in addr = {
397 0 : .sin_family = AF_INET,
398 0 : .sin_port = fd_ushort_bswap( peer->addr.port ),
399 0 : .sin_addr = { .s_addr = peer->addr.addr }
400 0 : };
401 :
402 0 : if( FD_UNLIKELY( -1==connect( sockfd, fd_type_pun( &addr ), sizeof(addr) ) && errno!=EINPROGRESS ) ) {
403 0 : if( FD_UNLIKELY( -1==close( sockfd ) ) ) FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
404 0 : return -1;
405 0 : }
406 :
407 0 : ssping->fds[ ssping->fds_len ] = (struct pollfd){
408 0 : .fd = sockfd,
409 0 : .events = POLLIN|POLLOUT,
410 0 : .revents = 0
411 0 : };
412 0 : ssping->fds_idx[ ssping->fds_len ] = peer_pool_idx( ssping->pool, peer );
413 0 : peer->fd.idx = ssping->fds_len;
414 0 : ssping->fds_len++;
415 :
416 0 : return 0;
417 0 : }
418 :
419 : void
420 : fd_ssping_advance( fd_ssping_t * ssping,
421 0 : long now ) {
422 0 : while( !deadline_list_is_empty( ssping->unpinged, ssping->pool ) ) {
423 0 : fd_ssping_peer_t * peer = deadline_list_ele_pop_head( ssping->unpinged, ssping->pool );
424 :
425 0 : FD_LOG_INFO(( "pinging " FD_IP4_ADDR_FMT ":%hu", FD_IP4_ADDR_FMT_ARGS( peer->addr.addr ), peer->addr.port ));
426 0 : int result = peer_connect( ssping, peer );
427 0 : if( FD_UNLIKELY( -1==result ) ) {
428 0 : peer->state = PEER_STATE_INVALID;
429 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
430 0 : deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool );
431 0 : } else {
432 0 : peer->state = PEER_STATE_PINGED;
433 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_PING;
434 0 : deadline_list_ele_push_tail( ssping->pinged, peer, ssping->pool );
435 0 : }
436 0 : }
437 :
438 0 : while( !deadline_list_is_empty( ssping->pinged, ssping->pool ) ) {
439 0 : fd_ssping_peer_t * peer = deadline_list_ele_peek_head( ssping->pinged, ssping->pool );
440 0 : if( FD_LIKELY( peer->deadline_nanos>now ) ) break;
441 :
442 0 : deadline_list_ele_pop_head( ssping->pinged, ssping->pool );
443 :
444 0 : peer->state = PEER_STATE_INVALID;
445 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
446 0 : deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool );
447 0 : remove_ping_fd( ssping, peer->fd.idx );
448 0 : }
449 :
450 0 : while( !deadline_list_is_empty( ssping->valid, ssping->pool ) ) {
451 0 : fd_ssping_peer_t * peer = deadline_list_ele_peek_head( ssping->valid, ssping->pool );
452 0 : if( FD_LIKELY( peer->deadline_nanos>now ) ) break;
453 :
454 0 : deadline_list_ele_pop_head( ssping->valid, ssping->pool );
455 :
456 0 : int result = peer_connect( ssping, peer );
457 0 : if( FD_UNLIKELY( -1==result ) ) {
458 0 : peer->state = PEER_STATE_INVALID;
459 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
460 0 : score_treap_ele_remove( ssping->score_treap, peer, ssping->pool );
461 0 : deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool );
462 0 : } else {
463 0 : peer->state = PEER_STATE_REFRESHING;
464 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_PING;
465 0 : deadline_list_ele_push_tail( ssping->refreshing, peer, ssping->pool );
466 0 : }
467 0 : }
468 :
469 0 : while( !deadline_list_is_empty( ssping->refreshing, ssping->pool ) ) {
470 0 : fd_ssping_peer_t * peer = deadline_list_ele_peek_head( ssping->refreshing, ssping->pool );
471 0 : if( FD_LIKELY( peer->deadline_nanos>now ) ) break;
472 :
473 0 : deadline_list_ele_pop_head( ssping->refreshing, ssping->pool );
474 :
475 0 : peer->state = PEER_STATE_INVALID;
476 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
477 0 : deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool );
478 0 : score_treap_ele_remove( ssping->score_treap, peer, ssping->pool );
479 0 : remove_ping_fd( ssping, peer->fd.idx );
480 0 : }
481 :
482 0 : while( !deadline_list_is_empty( ssping->invalid, ssping->pool ) ) {
483 0 : fd_ssping_peer_t * peer = deadline_list_ele_peek_head( ssping->invalid, ssping->pool );
484 0 : if( FD_LIKELY( peer->deadline_nanos>now ) ) break;
485 :
486 0 : deadline_list_ele_pop_head( ssping->invalid, ssping->pool );
487 :
488 0 : peer->state = PEER_STATE_UNPINGED;
489 0 : peer->deadline_nanos = 0L;
490 0 : deadline_list_ele_push_tail( ssping->unpinged, peer, ssping->pool );
491 0 : }
492 :
493 0 : poll_advance( ssping, now );
494 0 : }
495 :
496 : fd_ip4_port_t
497 0 : fd_ssping_best( fd_ssping_t const * ssping ) {
498 0 : score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( ssping->score_treap, ssping->pool );
499 0 : if( FD_UNLIKELY( score_treap_fwd_iter_done( iter ) ) ) return (fd_ip4_port_t){ .l=0UL };
500 :
501 0 : fd_ssping_peer_t const * best = score_treap_fwd_iter_ele_const( iter, ssping->pool );
502 0 : return best->addr;
503 0 : }
|