LCOV - code coverage report
Current view: top level - discof/restore/utils - fd_ssping.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 165 374 44.1 %
Date: 2026-06-29 05:51:35 Functions: 10 14 71.4 %

          Line data    Source code
       1             : #define _GNU_SOURCE /* ppoll */
       2             : #include "fd_ssping.h"
       3             : #include "fd_sspeer_selector.h"
       4             : 
       5             : #include "../../../util/fd_util.h"
       6             : #include "../../../util/bits/fd_bits.h"
       7             : #include "../../../util/log/fd_log.h"
       8             : 
       9             : #include <errno.h>
      10             : #include <fcntl.h>
      11             : #include <unistd.h>
      12             : #include <sys/socket.h>
      13             : #include <netinet/in.h>
      14             : #include <netinet/tcp.h>
      15             : #include <poll.h>
      16             : 
      17          48 : #define PEER_STATE_UNPINGED   0
      18           0 : #define PEER_STATE_PINGED     1
      19           0 : #define PEER_STATE_VALID      2
      20           0 : #define PEER_STATE_REFRESHING 3
      21          54 : #define PEER_STATE_INVALID    4
      22             : 
      23           0 : #define PEER_DEADLINE_NANOS_PING    (1L*1000L*1000L*1000L)     /* 1 second */
      24           0 : #define PEER_DEADLINE_NANOS_VALID   (2L*60L*1000L*1000L*1000L) /* 2 minutes */
      25          21 : #define PEER_DEADLINE_NANOS_INVALID (5L*60L*1000L*1000L*1000L) /* 5 minutes */
      26             : 
      27           0 : #define PING_BURST_MAX (16UL) /* Limit how many pings we can burst at once. */
      28             : 
      29             : struct fd_ssping_peer {
      30             :   ulong         refcnt;
      31             :   fd_ip4_port_t addr;
      32             : 
      33             :   struct {
      34             :     ulong next;
      35             :   } pool;
      36             : 
      37             :   struct {
      38             :     ulong next;
      39             :     ulong prev;
      40             :   } map;
      41             : 
      42             :   struct {
      43             :     ulong next;
      44             :     ulong prev;
      45             :   } deadline;
      46             : 
      47             :   int   state;
      48             :   ulong latency_nanos;
      49             :   long  deadline_nanos;
      50             :   ulong used_fd_idx;
      51             : };
      52             : 
      53             : typedef struct fd_ssping_peer fd_ssping_peer_t;
      54             : 
      55             : #define POOL_NAME  peer_pool
      56           6 : #define POOL_T     fd_ssping_peer_t
      57             : #define POOL_IDX_T ulong
      58          96 : #define POOL_NEXT  pool.next
      59             : #include "../../../util/tmpl/fd_pool.c"
      60             : 
      61             : #define MAP_NAME               peer_map
      62          48 : #define MAP_KEY                addr
      63          24 : #define MAP_ELE_T              fd_ssping_peer_t
      64             : #define MAP_KEY_T              fd_ip4_port_t
      65          24 : #define MAP_PREV               map.prev
      66          48 : #define MAP_NEXT               map.next
      67          57 : #define MAP_KEY_EQ(k0,k1)      ((k0)->l==(k1)->l)
      68         129 : #define MAP_KEY_HASH(key,seed) (seed^(key)->l)
      69             : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
      70             : #include "../../../util/tmpl/fd_map_chain.c"
      71             : 
      72             : #define DLIST_NAME  deadline_list
      73             : #define DLIST_ELE_T fd_ssping_peer_t
      74          90 : #define DLIST_PREV  deadline.prev
      75          90 : #define DLIST_NEXT  deadline.next
      76             : #include "../../../util/tmpl/fd_dlist.c"
      77             : 
      78             : struct fd_ssping_private {
      79             :   fd_ssping_peer_t *       pool;
      80             :   peer_map_t *             map;
      81             : 
      82             :   deadline_list_t *        unpinged;
      83             :   deadline_list_t *        pinged;
      84             :   deadline_list_t *        valid;
      85             :   deadline_list_t *        refreshing;
      86             :   deadline_list_t *        invalid;
      87             : 
      88             :   fd_ssping_on_ping_fn_t   on_ping_cb;
      89             :   void *                   cb_arg;
      90             : 
      91             :   ulong                    magic; /* ==FD_SSPING_MAGIC */
      92             : 
      93             :   /* Invariant: The pool elements with an associated file descriptor are
      94             :      exactly those that are PINGED or REFRESHING. */
      95             :   ulong                    used_fd_cnt;
      96             :   struct pollfd            used_fds[ FD_SSPING_FD_CNT ]; /* indexed [0, used_fd_cnt) */
      97             :   int                      idle_fds[ FD_SSPING_FD_CNT ]; /* indexed [0, FD_SSPING_FD_CNT-used_fd_cnt) */
      98             :   /* ping_to_pool[ i ]==x means that used_fds[ i ].fd is in use for
      99             :      pinging the peer in pool[ x ]. */
     100             :   ulong                    ping_to_pool[ FD_SSPING_FD_CNT ]; /* indexed [0, used_fd_cnt) */
     101             : };
     102             : 
     103             : 
     104             : FD_FN_CONST ulong
     105         231 : fd_ssping_align( void ) {
     106         231 :   return fd_ulong_max( alignof(fd_ssping_t),
     107         231 :          fd_ulong_max( peer_pool_align(),
     108         231 :          fd_ulong_max( peer_map_align(),
     109         231 :                        deadline_list_align() ) ) );
     110         231 : }
     111             : 
     112             : FD_FN_CONST ulong
     113          33 : fd_ssping_footprint( ulong max_peers ) {
     114          33 :   ulong l;
     115          33 :   l = FD_LAYOUT_INIT;
     116          33 :   l = FD_LAYOUT_APPEND( l, alignof(fd_ssping_t),  sizeof(fd_ssping_t) );
     117          33 :   l = FD_LAYOUT_APPEND( l, peer_pool_align(),     peer_pool_footprint( max_peers ) );
     118          33 :   l = FD_LAYOUT_APPEND( l, peer_map_align(),      peer_map_footprint( peer_map_chain_cnt_est( max_peers ) ) );
     119          33 :   l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
     120          33 :   l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
     121          33 :   l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
     122          33 :   l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
     123          33 :   l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
     124          33 :   return FD_LAYOUT_FINI( l, fd_ssping_align() );
     125          33 : }
     126             : 
     127             : void *
     128             : fd_ssping_new( void *                 shmem,
     129             :                ulong                  max_peers,
     130             :                ulong                  seed,
     131             :                fd_ssping_on_ping_fn_t on_ping_cb,
     132           3 :                void *                 cb_arg ) {
     133           3 :   if( FD_UNLIKELY( !shmem ) ) {
     134           0 :     FD_LOG_WARNING(( "NULL shmem" ));
     135           0 :     return NULL;
     136           0 :   }
     137             : 
     138           3 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_ssping_align() ) ) ) {
     139           0 :     FD_LOG_WARNING(( "unaligned shmem" ));
     140           0 :     return NULL;
     141           0 :   }
     142             : 
     143           3 :   if( FD_UNLIKELY( max_peers < 1UL ) ) {
     144           0 :     FD_LOG_WARNING(( "max_peers must be at least 1" ));
     145           0 :     return NULL;
     146           0 :   }
     147             : 
     148           3 :   FD_SCRATCH_ALLOC_INIT( l, shmem );
     149           3 :   fd_ssping_t * ssping = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_ssping_t),  sizeof(fd_ssping_t) );
     150           3 :   void * _pool         = FD_SCRATCH_ALLOC_APPEND( l, peer_pool_align(),     peer_pool_footprint( max_peers ) );
     151           3 :   void * _map          = FD_SCRATCH_ALLOC_APPEND( l, peer_map_align(),      peer_map_footprint( peer_map_chain_cnt_est( max_peers ) ) );
     152           3 :   void * _unpinged     = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
     153           3 :   void * _pinged       = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
     154           3 :   void * _valid        = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
     155           3 :   void * _refreshing   = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
     156           3 :   void * _invalid      = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
     157             : 
     158           3 :   ssping->pool = peer_pool_join( peer_pool_new( _pool, max_peers ) );
     159           3 :   ssping->map  = peer_map_join( peer_map_new( _map, peer_map_chain_cnt_est( max_peers ), seed ) );
     160             : 
     161           3 :   ssping->unpinged   = deadline_list_join( deadline_list_new( _unpinged ) );
     162           3 :   ssping->pinged     = deadline_list_join( deadline_list_new( _pinged ) );
     163           3 :   ssping->valid      = deadline_list_join( deadline_list_new( _valid ) );
     164           3 :   ssping->refreshing = deadline_list_join( deadline_list_new( _refreshing ) );
     165           3 :   ssping->invalid    = deadline_list_join( deadline_list_new( _invalid ) );
     166             : 
     167             :   /* Allocate a contiguous range of file descriptors */
     168           3 :   int next_fd = FD_SSPING_FD_MIN;
     169             : 
     170         750 :   for( ulong i=0UL; i<FD_SSPING_FD_CNT; i++, next_fd++ ) {
     171         747 :     int fd = socket( AF_INET, SOCK_STREAM|SOCK_NONBLOCK, IPPROTO_TCP );
     172         747 :     if( FD_UNLIKELY( -1==fd ) ) FD_LOG_ERR(( "socket(SOCK_STREAM,IPPROTO_TCP) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     173             : 
     174         747 :     int actual_fd = fcntl( fd, F_DUPFD_CLOEXEC, next_fd );
     175         747 :     if( FD_UNLIKELY( actual_fd<0 ) ) {
     176           0 :       FD_LOG_ERR(( "fcntl(F_DUPFD_CLOEXEC,%d) failed (%i-%s)", next_fd, errno, fd_io_strerror( errno ) ));
     177           0 :     }
     178         747 :     if( FD_UNLIKELY( actual_fd!=next_fd ) ) {
     179           0 :       FD_LOG_ERR(( "file descriptor collision at %d", next_fd ));
     180           0 :     }
     181         747 :     if( FD_UNLIKELY( 0!=close( fd ) ) ) {
     182           0 :       FD_LOG_ERR(( "close(%d) failed (%i-%s)", fd, errno, fd_io_strerror( errno ) ));
     183           0 :     }
     184             : 
     185         747 :     int tcp_nodelay = 1;
     186         747 :     if( FD_UNLIKELY( setsockopt( next_fd, SOL_TCP, TCP_NODELAY, &tcp_nodelay, sizeof(int) ) ) ) {
     187           0 :       FD_LOG_ERR(( "setsockopt(SOL_TCP,TCP_NODELAY,1) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     188           0 :     }
     189         747 :     ssping->idle_fds[ i ] = next_fd;
     190             : 
     191         747 :     ssping->used_fds[ i ].fd      = -1;
     192         747 :     ssping->used_fds[ i ].events  = POLLOUT|POLLRDHUP|POLLPRI;
     193         747 :     ssping->used_fds[ i ].revents = 0;
     194         747 :   }
     195             : 
     196           3 :   ssping->used_fd_cnt = 0UL;
     197             : 
     198           3 :   ssping->on_ping_cb = on_ping_cb;
     199           3 :   ssping->cb_arg     = cb_arg;
     200             : 
     201           3 :   FD_COMPILER_MFENCE();
     202           3 :   FD_VOLATILE( ssping->magic ) = FD_SSPING_MAGIC;
     203           3 :   FD_COMPILER_MFENCE();
     204             : 
     205           3 :   return (void *)ssping;
     206           3 : }
     207             : 
     208             : fd_ssping_t *
     209           3 : fd_ssping_join( void * shping ) {
     210           3 :   if( FD_UNLIKELY( !shping ) ) {
     211           0 :     FD_LOG_WARNING(( "NULL shping" ));
     212           0 :     return NULL;
     213           0 :   }
     214             : 
     215           3 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shping, fd_ssping_align() ) ) ) {
     216           0 :     FD_LOG_WARNING(( "misaligned shping" ));
     217           0 :     return NULL;
     218           0 :   }
     219             : 
     220           3 :   fd_ssping_t * ssping = (fd_ssping_t *)shping;
     221             : 
     222           3 :   if( FD_UNLIKELY( ssping->magic!=FD_SSPING_MAGIC ) ) {
     223           0 :     FD_LOG_WARNING(( "bad magic" ));
     224           0 :     return NULL;
     225           0 :   }
     226             : 
     227           3 :   return ssping;
     228           3 : }
     229             : 
     230             : void *
     231           3 : fd_ssping_leave( fd_ssping_t * ssping ) {
     232           3 :   if( FD_UNLIKELY( !ssping ) ) {
     233           0 :     FD_LOG_WARNING(( "NULL ssping" ));
     234           0 :     return NULL;
     235           0 :   }
     236             : 
     237           3 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)ssping, fd_ssping_align() ) ) ) {
     238           0 :     FD_LOG_WARNING(( "misaligned ssping" ));
     239           0 :     return NULL;
     240           0 :   }
     241             : 
     242           3 :   if( FD_UNLIKELY( ssping->magic!=FD_SSPING_MAGIC ) ) {
     243           0 :     FD_LOG_WARNING(( "bad magic" ));
     244           0 :     return NULL;
     245           0 :   }
     246             : 
     247           3 :   ssping->pool       = peer_pool_leave( ssping->pool );
     248           3 :   ssping->map        = peer_map_leave( ssping->map );
     249           3 :   ssping->unpinged   = deadline_list_leave( ssping->unpinged );
     250           3 :   ssping->pinged     = deadline_list_leave( ssping->pinged );
     251           3 :   ssping->valid      = deadline_list_leave( ssping->valid );
     252           3 :   ssping->refreshing = deadline_list_leave( ssping->refreshing );
     253           3 :   ssping->invalid    = deadline_list_leave( ssping->invalid );
     254             : 
     255           3 :   return (void *)ssping;
     256           3 : }
     257             : 
     258             : void *
     259           3 : fd_ssping_delete( void * shping ) {
     260           3 :   if( FD_UNLIKELY( !shping ) ) {
     261           0 :     FD_LOG_WARNING(( "NULL shping" ));
     262           0 :     return NULL;
     263           0 :   }
     264             : 
     265           3 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shping, fd_ssping_align() ) ) ) {
     266           0 :     FD_LOG_WARNING(( "misaligned shping" ));
     267           0 :     return NULL;
     268           0 :   }
     269             : 
     270           3 :   fd_ssping_t * ssping = (fd_ssping_t *)shping;
     271             : 
     272           3 :   if( FD_UNLIKELY( ssping->magic!=FD_SSPING_MAGIC ) ) {
     273           0 :     FD_LOG_WARNING(( "bad magic" ));
     274           0 :     return NULL;
     275           0 :   }
     276             : 
     277             :   /* Close all file descriptors opened by fd_ssping_new. */
     278           3 :   for( ulong i=0UL; i<ssping->used_fd_cnt; i++ ) {
     279           0 :     close( ssping->used_fds[ i ].fd );
     280           0 :   }
     281         750 :   for( ulong i=0UL; i<FD_SSPING_FD_CNT-ssping->used_fd_cnt; i++ ) {
     282         747 :     close( ssping->idle_fds[ i ] );
     283         747 :   }
     284             : 
     285           3 :   FD_COMPILER_MFENCE();
     286           3 :   FD_VOLATILE( ssping->magic ) = 0UL;
     287           3 :   FD_COMPILER_MFENCE();
     288             : 
     289           3 :   return (void *)ssping;
     290           3 : }
     291             : 
     292             : void
     293             : fd_ssping_add( fd_ssping_t * ssping,
     294          24 :                fd_ip4_port_t addr ) {
     295          24 :   fd_ssping_peer_t * peer = peer_map_ele_query( ssping->map, &addr, NULL, ssping->pool );
     296          24 :   if( FD_LIKELY( !peer ) ) {
     297          24 :     if( FD_UNLIKELY( !peer_pool_free( ssping->pool ) ) ) {
     298           0 :       FD_LOG_WARNING(( "ping peer pool exhausted" ));
     299           0 :       return;
     300           0 :     }
     301          24 :     peer = peer_pool_ele_acquire( ssping->pool );
     302          24 :     memset( peer, 0, sizeof(fd_ssping_peer_t) );
     303          24 :     peer->refcnt        = 0UL;
     304          24 :     peer->state         = PEER_STATE_UNPINGED;
     305          24 :     peer->addr          = addr;
     306          24 :     peer->latency_nanos = ULONG_MAX;
     307          24 :     peer->used_fd_idx   = ULONG_MAX;
     308          24 :     peer_map_ele_insert( ssping->map, peer, ssping->pool );
     309          24 :     deadline_list_ele_push_tail( ssping->unpinged, peer, ssping->pool );
     310          24 :   }
     311          24 :   peer->refcnt++;
     312          24 : }
     313             : 
     314             : static void
     315             : remove_fdesc_idx( fd_ssping_t * ssping,
     316           0 :                   ulong         fdesc_idx ) {
     317           0 :   FD_TEST( fdesc_idx<FD_SSPING_FD_CNT );
     318           0 :   FD_TEST( fdesc_idx<ssping->used_fd_cnt );
     319           0 :   ulong pool_idx = ssping->ping_to_pool[ fdesc_idx ];
     320             : 
     321           0 :   int fdesc = ssping->used_fds[ fdesc_idx ].fd;
     322             :   /* Abort the connection attempt or close the connection by connecting
     323             :      to AF_UNSPEC. */
     324           0 :   struct sockaddr_in addr[1] = {{
     325           0 :     .sin_family = AF_UNSPEC,
     326           0 :     .sin_addr   = { .s_addr = 0U },
     327           0 :     .sin_port   = 0
     328           0 :   }};
     329           0 :   if( FD_UNLIKELY( connect( fdesc, fd_type_pun_const( addr ), sizeof(addr) ) ) ) FD_LOG_ERR(( "connect(AF_UNSPEC) failed (%d-%s)", errno, fd_io_strerror( errno ) ));
     330             : 
     331             :   /* Mark that the pool element no longer has an associated index. */
     332           0 :   ssping->pool[ pool_idx ].used_fd_idx = ULONG_MAX;
     333             : 
     334             :   /* Now swap the last used_fd into this position, updating all the
     335             :      relevant bookkeeping info. */
     336           0 :   ulong last = ssping->used_fd_cnt-1UL;
     337           0 :   if( FD_LIKELY( fdesc_idx!=last ) ) {
     338           0 :     ssping->used_fds[ fdesc_idx ] = ssping->used_fds[ last ];
     339           0 :     ulong last_pool_idx = ssping->ping_to_pool[ fdesc_idx ] = ssping->ping_to_pool[ last ];
     340           0 :     ssping->pool[ last_pool_idx ].used_fd_idx = fdesc_idx;
     341           0 :   }
     342             : 
     343           0 :   ssping->idle_fds[ FD_SSPING_FD_CNT - ssping->used_fd_cnt ] = fdesc;
     344           0 :   ssping->used_fd_cnt--;
     345           0 : }
     346             : 
     347             : int
     348             : fd_ssping_remove( fd_ssping_t * ssping,
     349          24 :                   fd_ip4_port_t addr ) {
     350          24 :   fd_ssping_peer_t * peer = peer_map_ele_query( ssping->map, &addr, NULL, ssping->pool );
     351          24 :   if( FD_UNLIKELY( !peer ) ) return 0;
     352          24 :   if( FD_UNLIKELY( !peer->refcnt ) ) return 0;
     353          24 :   peer->refcnt--;
     354          24 :   if( FD_LIKELY( !peer->refcnt ) ) {
     355          24 :     switch( peer->state ) {
     356           3 :       case PEER_STATE_UNPINGED:
     357           3 :         deadline_list_ele_remove( ssping->unpinged, peer, ssping->pool );
     358           3 :         break;
     359           0 :       case PEER_STATE_PINGED:
     360           0 :         deadline_list_ele_remove( ssping->pinged, peer, ssping->pool );
     361           0 :         remove_fdesc_idx( ssping, peer->used_fd_idx );
     362           0 :         break;
     363           0 :       case PEER_STATE_VALID:
     364           0 :         deadline_list_ele_remove( ssping->valid, peer, ssping->pool );
     365           0 :         break;
     366           0 :       case PEER_STATE_REFRESHING:
     367           0 :         deadline_list_ele_remove( ssping->refreshing, peer, ssping->pool );
     368           0 :         remove_fdesc_idx( ssping, peer->used_fd_idx );
     369           0 :         break;
     370          21 :       case PEER_STATE_INVALID:
     371          21 :         deadline_list_ele_remove( ssping->invalid, peer, ssping->pool );
     372          21 :         break;
     373          24 :     }
     374          24 :     peer_map_ele_remove_fast( ssping->map, peer, ssping->pool );
     375          24 :     peer_pool_ele_release( ssping->pool, peer );
     376          24 :     return 1;
     377          24 :   }
     378           0 :   return 0;
     379          24 : }
     380             : 
     381             : void
     382             : fd_ssping_invalidate( fd_ssping_t * ssping,
     383             :                       fd_ip4_port_t addr,
     384          24 :                       long          now ) {
     385          24 :   if( FD_UNLIKELY( !ssping ) ) return;
     386          24 :   fd_ssping_peer_t * peer = peer_map_ele_query( ssping->map, &addr, NULL, ssping->pool );
     387          24 :   if( FD_UNLIKELY( !peer ) ) return;
     388          24 :   switch( peer->state ) {
     389          21 :     case PEER_STATE_UNPINGED:
     390          21 :       deadline_list_ele_remove( ssping->unpinged, peer, ssping->pool );
     391          21 :       break;
     392           0 :     case PEER_STATE_PINGED:
     393           0 :       deadline_list_ele_remove( ssping->pinged, peer, ssping->pool );
     394           0 :       remove_fdesc_idx( ssping, peer->used_fd_idx );
     395           0 :       break;
     396           0 :     case PEER_STATE_VALID:
     397           0 :       deadline_list_ele_remove( ssping->valid, peer, ssping->pool );
     398           0 :       break;
     399           0 :     case PEER_STATE_REFRESHING:
     400           0 :       deadline_list_ele_remove( ssping->refreshing, peer, ssping->pool );
     401           0 :       remove_fdesc_idx( ssping, peer->used_fd_idx );
     402           0 :       break;
     403           3 :     case PEER_STATE_INVALID:
     404           3 :       return;
     405          24 :   }
     406          21 :   peer->state = PEER_STATE_INVALID;
     407          21 :   peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
     408          21 :   deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool );
     409          21 : }
     410             : 
     411             : static inline void
     412             : recv_pings( fd_ssping_t * ssping,
     413           0 :             fd_sspeer_selector_t * selector) {
     414           0 :   int pollv = fd_syscall_poll( ssping->used_fds, (uint)ssping->used_fd_cnt, 0 );
     415           0 :   if( FD_UNLIKELY( pollv<0 ) ) {
     416           0 :     FD_LOG_WARNING(( "poll(used_fds,%lu,0) failed (%d-%s)", ssping->used_fd_cnt, errno, fd_io_strerror( errno ) ));
     417           0 :     return;
     418           0 :   }
     419           0 :   long now = fd_log_wallclock();
     420           0 :   ulong processed = 0UL;
     421           0 :   ulong processed_idx[ PING_BURST_MAX ];
     422           0 :   for( ulong i=0UL; i<ssping->used_fd_cnt; i++ ) {
     423           0 :     if( FD_UNLIKELY( processed >= fd_ulong_min( (ulong)pollv, PING_BURST_MAX ) ) ) break;
     424           0 :     if( FD_UNLIKELY( ssping->used_fds[ i ].revents ) ) {
     425           0 :       ulong pool_idx = ssping->ping_to_pool[ i ];
     426           0 :       fd_ssping_peer_t * peer = ssping->pool+pool_idx;
     427             : 
     428           0 :       FD_TEST( peer->state==PEER_STATE_PINGED || peer->state==PEER_STATE_REFRESHING );
     429             : 
     430             : 
     431           0 :       deadline_list_ele_remove( peer->state==PEER_STATE_PINGED ? ssping->pinged : ssping->refreshing, peer, ssping->pool );
     432           0 :       int is_err = ssping->used_fds[ i ].revents & (POLLRDHUP|POLLERR|POLLHUP);
     433           0 :       if( FD_LIKELY( !is_err ) ) {
     434           0 :         peer->latency_nanos  = (ulong)fd_long_max( now - (peer->deadline_nanos - PEER_DEADLINE_NANOS_PING), 1L );
     435           0 :         peer->state          = PEER_STATE_VALID;
     436           0 :         peer->deadline_nanos = now + PEER_DEADLINE_NANOS_VALID;
     437           0 :         deadline_list_ele_push_tail( ssping->valid, peer, ssping->pool );
     438             : 
     439           0 :         FD_LOG_INFO(( "pinged " FD_IP4_ADDR_FMT ":%hu in %lu nanos",
     440           0 :               FD_IP4_ADDR_FMT_ARGS( peer->addr.addr ), fd_ushort_bswap( peer->addr.port ), peer->latency_nanos ));
     441           0 :         ssping->on_ping_cb( ssping->cb_arg, peer->addr, peer->latency_nanos );
     442           0 :       } else {
     443             :         /* This is pretty unlikely, but the host could respond with an
     444             :            RST packet I suppose. */
     445           0 :         peer->state = PEER_STATE_INVALID;
     446           0 :         peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
     447           0 :         deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool );
     448           0 :         fd_sspeer_selector_remove_by_addr( selector, peer->addr );
     449           0 :       }
     450           0 :       processed_idx[ processed ] = i;
     451           0 :       processed++;
     452           0 :     }
     453           0 :   }
     454             :   /* Now we need to call remove_fdesc_idx on the processed ones in
     455             :      reverse order (largest to smallest) so that we don't trip on
     456             :      ourself as we shuffle the array. */
     457           0 :   while( processed ) remove_fdesc_idx( ssping, processed_idx[ --processed ] );
     458           0 : }
     459             : 
     460             : static uint
     461             : send_pings( fd_ssping_t *     ssping,
     462             :             deadline_list_t * list,
     463           0 :             long              until ) {
     464           0 :   uint msg_cnt = 0U;
     465           0 :   for( deadline_list_iter_t iter = deadline_list_iter_fwd_init( list, ssping->pool );
     466           0 :        msg_cnt<PING_BURST_MAX && ssping->used_fd_cnt<FD_SSPING_FD_CNT && !deadline_list_iter_done( iter, list, ssping->pool );
     467           0 :        iter = deadline_list_iter_fwd_next( iter, list, ssping->pool ) ) {
     468           0 :     ulong peer_idx = deadline_list_iter_idx( iter, list, ssping->pool );
     469           0 :     fd_ssping_peer_t * peer = peer_pool_ele( ssping->pool, peer_idx );
     470           0 :     if( peer->deadline_nanos>until ) break;
     471             : 
     472           0 :     int fdesc =  ssping->idle_fds[ FD_SSPING_FD_CNT-ssping->used_fd_cnt-1UL ];
     473             : 
     474           0 :     struct sockaddr_in addr[1] = {{
     475           0 :       .sin_family = AF_INET,
     476           0 :       .sin_addr   = { .s_addr = peer->addr.addr },
     477           0 :       .sin_port   = peer->addr.port
     478           0 :     }};
     479             : 
     480           0 :     if( FD_UNLIKELY( connect( fdesc, fd_type_pun_const( addr ), sizeof(addr) ) && errno!=EINPROGRESS ) ) {
     481           0 :       FD_LOG_WARNING(( "connect(" FD_IP4_ADDR_FMT ":%hu) failed (%d-%s)", FD_IP4_ADDR_FMT_ARGS( peer->addr.addr ), fd_ushort_bswap( peer->addr.port ), errno, fd_io_strerror( errno ) ));
     482             :       /* Nothing to do.  It will get "reaped" later. */
     483           0 :     }
     484             : 
     485           0 :     ssping->used_fds    [ ssping->used_fd_cnt ].fd = fdesc;
     486           0 :     ssping->ping_to_pool[ ssping->used_fd_cnt ]    = peer_idx;
     487           0 :     peer->used_fd_idx = ssping->used_fd_cnt;
     488           0 :     ssping->used_fd_cnt++;
     489           0 :     msg_cnt++;
     490           0 :   }
     491             : 
     492           0 :   if( msg_cnt==0U ) return 0U;
     493           0 :   return (uint)msg_cnt;
     494           0 : }
     495             : 
     496             : 
     497             : void
     498             : fd_ssping_advance( fd_ssping_t *          ssping,
     499             :                    long                   now,
     500           0 :                    fd_sspeer_selector_t * selector) {
     501           0 :   uint sent = send_pings( ssping, ssping->unpinged, LONG_MAX );
     502           0 :   for( uint i=0U; i<sent; i++ ) {
     503           0 :     fd_ssping_peer_t * peer = deadline_list_ele_pop_head( ssping->unpinged, ssping->pool );
     504           0 :     FD_TEST( peer );
     505           0 :     peer->state = PEER_STATE_PINGED;
     506           0 :     peer->deadline_nanos = now + PEER_DEADLINE_NANOS_PING;
     507           0 :     deadline_list_ele_push_tail( ssping->pinged, peer, ssping->pool );
     508           0 :   }
     509             : 
     510           0 :   while( !deadline_list_is_empty( ssping->pinged, ssping->pool ) ) {
     511           0 :     fd_ssping_peer_t * peer = deadline_list_ele_peek_head( ssping->pinged, ssping->pool );
     512           0 :     if( FD_LIKELY( peer->deadline_nanos>now ) ) break;
     513             : 
     514           0 :     deadline_list_ele_pop_head( ssping->pinged, ssping->pool );
     515             : 
     516           0 :     remove_fdesc_idx( ssping, peer->used_fd_idx );
     517             : 
     518           0 :     peer->state = PEER_STATE_INVALID;
     519           0 :     peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
     520           0 :     deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool );
     521           0 :     fd_sspeer_selector_remove_by_addr( selector, peer->addr );
     522           0 :   }
     523             : 
     524           0 :   sent = send_pings( ssping, ssping->valid, now );
     525           0 :   for( uint i=0U; i<sent; i++ ) {
     526           0 :     fd_ssping_peer_t * peer = deadline_list_ele_pop_head( ssping->valid, ssping->pool );
     527           0 :     FD_TEST( peer );
     528           0 :     peer->state = PEER_STATE_REFRESHING;
     529           0 :     peer->deadline_nanos = now + PEER_DEADLINE_NANOS_PING;
     530           0 :     deadline_list_ele_push_tail( ssping->refreshing, peer, ssping->pool );
     531           0 :   }
     532             : 
     533           0 :   while( !deadline_list_is_empty( ssping->refreshing, ssping->pool ) ) {
     534           0 :     fd_ssping_peer_t * peer = deadline_list_ele_peek_head( ssping->refreshing, ssping->pool );
     535           0 :     if( FD_LIKELY( peer->deadline_nanos>now ) ) break;
     536             : 
     537           0 :     deadline_list_ele_pop_head( ssping->refreshing, ssping->pool );
     538             : 
     539           0 :     remove_fdesc_idx( ssping, peer->used_fd_idx );
     540             : 
     541           0 :     peer->state = PEER_STATE_INVALID;
     542           0 :     peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
     543           0 :     deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool );
     544           0 :     fd_sspeer_selector_remove_by_addr( selector, peer->addr );
     545           0 :   }
     546             : 
     547           0 :   while( !deadline_list_is_empty( ssping->invalid, ssping->pool ) ) {
     548           0 :     fd_ssping_peer_t * peer = deadline_list_ele_peek_head( ssping->invalid, ssping->pool );
     549           0 :     if( FD_LIKELY( peer->deadline_nanos>now ) ) break;
     550             : 
     551           0 :     deadline_list_ele_pop_head( ssping->invalid, ssping->pool );
     552             : 
     553           0 :     peer->state = PEER_STATE_UNPINGED;
     554           0 :     peer->deadline_nanos = 0L;
     555           0 :     deadline_list_ele_push_tail( ssping->unpinged, peer, ssping->pool );
     556           0 :   }
     557             : 
     558           0 :   recv_pings( ssping, selector );
     559           0 : }
     560             : 
     561             : int
     562             : fd_ssping_is_invalidated( fd_ssping_t * ssping,
     563           9 :                           fd_ip4_port_t addr ) {
     564           9 :   if( FD_UNLIKELY( !ssping ) ) return 0;
     565           9 :   fd_ssping_peer_t * peer = peer_map_ele_query( ssping->map, &addr, NULL, ssping->pool );
     566           9 :   if( FD_UNLIKELY( !peer ) ) return 0;
     567           9 :   return peer->state==PEER_STATE_INVALID;
     568           9 : }

Generated by: LCOV version 1.14