Line data Source code
1 : #define _GNU_SOURCE /* sendmmsg */
2 : #include "fd_ssping.h"
3 : #include "fd_sspeer_selector.h"
4 :
5 : #include "../../../util/fd_util.h"
6 : #include "../../../util/bits/fd_bits.h"
7 : #include "../../../util/log/fd_log.h"
8 :
9 : #include <errno.h>
10 : #include <unistd.h>
11 : #include <sys/socket.h>
12 : #include <netinet/in.h>
13 : #include <netinet/ip_icmp.h>
14 :
15 0 : #define PEER_STATE_UNPINGED 0
16 0 : #define PEER_STATE_PINGED 1
17 0 : #define PEER_STATE_VALID 2
18 0 : #define PEER_STATE_REFRESHING 3
19 0 : #define PEER_STATE_INVALID 4
20 :
21 0 : #define PEER_DEADLINE_NANOS_PING (1L*1000L*1000L*1000L) /* 1 second */
22 0 : #define PEER_DEADLINE_NANOS_VALID (2L*60L*1000L*1000L*1000L) /* 2 minutes */
23 0 : #define PEER_DEADLINE_NANOS_INVALID (5L*60L*1000L*1000L*1000L) /* 5 minutes */
24 :
25 0 : #define PING_BURST_MAX (16UL) /* Limit how many pings we can burst at once. */
26 :
27 : /* FIXME: This code uses fd_ip4_port_t as the key for peers, but it
28 : should really just use uint (IPv4 address) as port has no meaning
29 : for ICMP pings. Making this change however requires some significant
30 : changes in snapct as we are also effectively storing peer invalidation
31 : state in this data structure. The number of distinct peers with
32 : the same IP address but different ports will be low, so this is fine
33 : for now. */
34 :
35 : /* FIXME: Properly set and track sequence numbers for repeated pings. */
36 :
37 : struct fd_ssping_peer {
38 : ulong refcnt;
39 : fd_ip4_port_t addr;
40 :
41 : struct {
42 : ulong next;
43 : } pool;
44 :
45 : struct {
46 : ulong next;
47 : ulong prev;
48 : } map;
49 :
50 : struct {
51 : ulong next;
52 : ulong prev;
53 : } deadline;
54 :
55 : int state;
56 : ulong latency_nanos;
57 : long deadline_nanos;
58 : };
59 :
60 : typedef struct fd_ssping_peer fd_ssping_peer_t;
61 :
62 : #define POOL_NAME peer_pool
63 0 : #define POOL_T fd_ssping_peer_t
64 : #define POOL_IDX_T ulong
65 0 : #define POOL_NEXT pool.next
66 : #include "../../../util/tmpl/fd_pool.c"
67 :
68 : #define MAP_NAME peer_map
69 0 : #define MAP_KEY addr
70 0 : #define MAP_ELE_T fd_ssping_peer_t
71 : #define MAP_KEY_T fd_ip4_port_t
72 0 : #define MAP_PREV map.prev
73 0 : #define MAP_NEXT map.next
74 0 : #define MAP_KEY_EQ(k0,k1) ((k0)->l==(k1)->l)
75 0 : #define MAP_KEY_HASH(key,seed) (seed^(key)->l)
76 : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
77 : #include "../../../util/tmpl/fd_map_chain.c"
78 :
79 : #define DLIST_NAME deadline_list
80 : #define DLIST_ELE_T fd_ssping_peer_t
81 0 : #define DLIST_PREV deadline.prev
82 0 : #define DLIST_NEXT deadline.next
83 : #include "../../../util/tmpl/fd_dlist.c"
84 :
85 : struct fd_ssping_private {
86 : fd_ssping_peer_t * pool;
87 : peer_map_t * map;
88 :
89 : deadline_list_t * unpinged;
90 : deadline_list_t * pinged;
91 : deadline_list_t * valid;
92 : deadline_list_t * refreshing;
93 : deadline_list_t * invalid;
94 :
95 : int sockfd;
96 :
97 : fd_ssping_on_ping_fn_t on_ping_cb;
98 : void * cb_arg;
99 :
100 : ulong magic; /* ==FD_SSPING_MAGIC */
101 : };
102 :
103 : /* We attach the UDP port number associated with the peer to each ping
104 : echo request, which must be reflected back to us in the echo reply.
105 : This is used to look up the correct peer, which is keyed on both
106 : IP address and UDP port. The ICMP echo protocol has no concept
107 : of UDP port which is why we must do this manually. */
108 :
109 : struct ssping_pkt {
110 : struct icmphdr icmp;
111 : ushort port;
112 : };
113 :
114 : FD_FN_CONST ulong
115 0 : fd_ssping_align( void ) {
116 0 : return fd_ulong_max( alignof(fd_ssping_t),
117 0 : fd_ulong_max( peer_pool_align(),
118 0 : fd_ulong_max( peer_map_align(),
119 0 : deadline_list_align() ) ) );
120 0 : }
121 :
122 : FD_FN_CONST ulong
123 0 : fd_ssping_footprint( ulong max_peers ) {
124 0 : ulong l;
125 0 : l = FD_LAYOUT_INIT;
126 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_ssping_t), sizeof(fd_ssping_t) );
127 0 : l = FD_LAYOUT_APPEND( l, peer_pool_align(), peer_pool_footprint( max_peers ) );
128 0 : l = FD_LAYOUT_APPEND( l, peer_map_align(), peer_map_footprint( peer_map_chain_cnt_est( max_peers ) ) );
129 0 : l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
130 0 : l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
131 0 : l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
132 0 : l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
133 0 : l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
134 0 : return FD_LAYOUT_FINI( l, fd_ssping_align() );
135 0 : }
136 :
137 : void *
138 : fd_ssping_new( void * shmem,
139 : ulong max_peers,
140 : ulong seed,
141 : fd_ssping_on_ping_fn_t on_ping_cb,
142 0 : void * cb_arg ) {
143 0 : if( FD_UNLIKELY( !shmem ) ) {
144 0 : FD_LOG_WARNING(( "NULL shmem" ));
145 0 : return NULL;
146 0 : }
147 :
148 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_ssping_align() ) ) ) {
149 0 : FD_LOG_WARNING(( "unaligned shmem" ));
150 0 : return NULL;
151 0 : }
152 :
153 0 : if( FD_UNLIKELY( max_peers < 1UL ) ) {
154 0 : FD_LOG_WARNING(( "max_peers must be at least 1" ));
155 0 : return NULL;
156 0 : }
157 :
158 0 : FD_SCRATCH_ALLOC_INIT( l, shmem );
159 0 : fd_ssping_t * ssping = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_ssping_t), sizeof(fd_ssping_t) );
160 0 : void * _pool = FD_SCRATCH_ALLOC_APPEND( l, peer_pool_align(), peer_pool_footprint( max_peers ) );
161 0 : void * _map = FD_SCRATCH_ALLOC_APPEND( l, peer_map_align(), peer_map_footprint( peer_map_chain_cnt_est( max_peers ) ) );
162 0 : void * _unpinged = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
163 0 : void * _pinged = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
164 0 : void * _valid = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
165 0 : void * _refreshing = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
166 0 : void * _invalid = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
167 :
168 0 : ssping->pool = peer_pool_join( peer_pool_new( _pool, max_peers ) );
169 0 : ssping->map = peer_map_join( peer_map_new( _map, peer_map_chain_cnt_est( max_peers ), seed ) );
170 :
171 0 : ssping->unpinged = deadline_list_join( deadline_list_new( _unpinged ) );
172 0 : ssping->pinged = deadline_list_join( deadline_list_new( _pinged ) );
173 0 : ssping->valid = deadline_list_join( deadline_list_new( _valid ) );
174 0 : ssping->refreshing = deadline_list_join( deadline_list_new( _refreshing ) );
175 0 : ssping->invalid = deadline_list_join( deadline_list_new( _invalid ) );
176 :
177 : /* Note: This uses an obscure feature of Linux called ICMP datagram
178 : sockets or unprivileged ping sockets. Normally one would have to
179 : use SOCK_RAW sockets, but with this special feature any user can
180 : send & receive ICMP echo packets. */
181 0 : ssping->sockfd = socket( AF_INET, SOCK_DGRAM|SOCK_NONBLOCK, IPPROTO_ICMP );
182 0 : if( FD_UNLIKELY( -1==ssping->sockfd ) ) FD_LOG_ERR(( "socket(SOCK_DGRAM,IPPROTO_ICMP) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
183 :
184 0 : ssping->on_ping_cb = on_ping_cb;
185 0 : ssping->cb_arg = cb_arg;
186 :
187 0 : FD_COMPILER_MFENCE();
188 0 : FD_VOLATILE( ssping->magic ) = FD_SSPING_MAGIC;
189 0 : FD_COMPILER_MFENCE();
190 :
191 0 : return (void *)ssping;
192 0 : }
193 :
194 : fd_ssping_t *
195 0 : fd_ssping_join( void * shping ) {
196 0 : if( FD_UNLIKELY( !shping ) ) {
197 0 : FD_LOG_WARNING(( "NULL shping" ));
198 0 : return NULL;
199 0 : }
200 :
201 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shping, fd_ssping_align() ) ) ) {
202 0 : FD_LOG_WARNING(( "misaligned shping" ));
203 0 : return NULL;
204 0 : }
205 :
206 0 : fd_ssping_t * ssping = (fd_ssping_t *)shping;
207 :
208 0 : if( FD_UNLIKELY( ssping->magic!=FD_SSPING_MAGIC ) ) {
209 0 : FD_LOG_WARNING(( "bad magic" ));
210 0 : return NULL;
211 0 : }
212 :
213 0 : return ssping;
214 0 : }
215 :
216 : int
217 0 : fd_ssping_get_sockfd( fd_ssping_t const * ssping ) {
218 0 : return ssping->sockfd;
219 0 : }
220 :
221 : void
222 : fd_ssping_add( fd_ssping_t * ssping,
223 0 : fd_ip4_port_t addr ) {
224 0 : fd_ssping_peer_t * peer = peer_map_ele_query( ssping->map, &addr, NULL, ssping->pool );
225 0 : if( FD_LIKELY( !peer ) ) {
226 0 : if( FD_UNLIKELY( !peer_pool_free( ssping->pool ) ) ) return;
227 0 : peer = peer_pool_ele_acquire( ssping->pool );
228 0 : FD_TEST( peer );
229 0 : memset( peer, 0, sizeof(fd_ssping_peer_t) );
230 0 : peer->refcnt = 0UL;
231 0 : peer->state = PEER_STATE_UNPINGED;
232 0 : peer->addr = addr;
233 0 : peer->latency_nanos = ULONG_MAX;
234 0 : peer_map_ele_insert( ssping->map, peer, ssping->pool );
235 0 : deadline_list_ele_push_tail( ssping->unpinged, peer, ssping->pool );
236 0 : }
237 0 : peer->refcnt++;
238 0 : }
239 :
240 : int
241 : fd_ssping_remove( fd_ssping_t * ssping,
242 0 : fd_ip4_port_t addr ) {
243 0 : fd_ssping_peer_t * peer = peer_map_ele_query( ssping->map, &addr, NULL, ssping->pool );
244 0 : FD_TEST( peer );
245 0 : FD_TEST( peer->refcnt );
246 0 : peer->refcnt--;
247 0 : if( FD_LIKELY( !peer->refcnt ) ) {
248 0 : switch( peer->state ) {
249 0 : case PEER_STATE_UNPINGED:
250 0 : deadline_list_ele_remove( ssping->unpinged, peer, ssping->pool );
251 0 : break;
252 0 : case PEER_STATE_PINGED:
253 0 : deadline_list_ele_remove( ssping->pinged, peer, ssping->pool );
254 0 : break;
255 0 : case PEER_STATE_VALID:
256 0 : deadline_list_ele_remove( ssping->valid, peer, ssping->pool );
257 0 : break;
258 0 : case PEER_STATE_REFRESHING:
259 0 : deadline_list_ele_remove( ssping->refreshing, peer, ssping->pool );
260 0 : break;
261 0 : case PEER_STATE_INVALID:
262 0 : deadline_list_ele_remove( ssping->invalid, peer, ssping->pool );
263 0 : break;
264 0 : }
265 0 : peer_map_ele_remove_fast( ssping->map, peer, ssping->pool );
266 0 : peer_pool_ele_release( ssping->pool, peer );
267 0 : return 1;
268 0 : }
269 0 : return 0;
270 0 : }
271 :
272 : void
273 : fd_ssping_invalidate( fd_ssping_t * ssping,
274 : fd_ip4_port_t addr,
275 0 : long now ) {
276 0 : fd_ssping_peer_t * peer = peer_map_ele_query( ssping->map, &addr, NULL, ssping->pool );
277 0 : if( FD_UNLIKELY( !peer ) ) return;
278 0 : switch( peer->state ) {
279 0 : case PEER_STATE_UNPINGED:
280 0 : deadline_list_ele_remove( ssping->unpinged, peer, ssping->pool );
281 0 : break;
282 0 : case PEER_STATE_PINGED:
283 0 : deadline_list_ele_remove( ssping->pinged, peer, ssping->pool );
284 0 : break;
285 0 : case PEER_STATE_VALID:
286 0 : deadline_list_ele_remove( ssping->valid, peer, ssping->pool );
287 0 : break;
288 0 : case PEER_STATE_REFRESHING:
289 0 : deadline_list_ele_remove( ssping->refreshing, peer, ssping->pool );
290 0 : break;
291 0 : case PEER_STATE_INVALID:
292 0 : return;
293 0 : }
294 0 : peer->state = PEER_STATE_INVALID;
295 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
296 0 : deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool );
297 0 : }
298 :
299 : static inline void
300 : recv_pings( fd_ssping_t * ssping,
301 0 : long now ) {
302 0 : for( ulong i=0UL; i<PING_BURST_MAX; i++ ) {
303 0 : struct ssping_pkt pkt;
304 0 : struct sockaddr_in addr;
305 0 : socklen_t alen = sizeof(addr);
306 0 : long result = recvfrom( ssping->sockfd, &pkt, sizeof(pkt), 0, fd_type_pun( &addr ), &alen );
307 0 : if( FD_UNLIKELY( result!=sizeof(pkt) || alen!=sizeof(addr) || pkt.icmp.type!=ICMP_ECHOREPLY ) ) break;
308 :
309 0 : fd_ip4_port_t key = {
310 0 : .addr = addr.sin_addr.s_addr,
311 0 : .port = pkt.port
312 0 : };
313 0 : fd_ssping_peer_t * peer = peer_map_ele_query( ssping->map, &key, NULL, ssping->pool );
314 0 : if( FD_UNLIKELY( peer==NULL || ( peer->state!=PEER_STATE_PINGED && peer->state!=PEER_STATE_REFRESHING ) ) ) continue;
315 :
316 0 : deadline_list_ele_remove( peer->state==PEER_STATE_PINGED ? ssping->pinged : ssping->refreshing, peer, ssping->pool );
317 0 : FD_TEST( peer->deadline_nanos-PEER_DEADLINE_NANOS_PING<now );
318 0 : peer->latency_nanos = (ulong)(now - (peer->deadline_nanos - PEER_DEADLINE_NANOS_PING));
319 0 : peer->state = PEER_STATE_VALID;
320 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_VALID;
321 0 : deadline_list_ele_push_tail( ssping->valid, peer, ssping->pool );
322 :
323 0 : FD_LOG_INFO(( "pinged " FD_IP4_ADDR_FMT ":%hu in %lu nanos",
324 0 : FD_IP4_ADDR_FMT_ARGS( peer->addr.addr ), peer->addr.port, peer->latency_nanos ));
325 0 : ssping->on_ping_cb( ssping->cb_arg, peer->addr, peer->latency_nanos );
326 0 : }
327 0 : }
328 :
329 : static uint
330 : send_pings( fd_ssping_t * ssping,
331 : deadline_list_t * list,
332 0 : long until ) {
333 0 : uint msg_cnt = 0U;
334 0 : struct ssping_pkt pkts [ PING_BURST_MAX ];
335 0 : struct iovec iovs [ PING_BURST_MAX ];
336 0 : struct sockaddr_in addrs [ PING_BURST_MAX ];
337 0 : struct mmsghdr msgs [ PING_BURST_MAX ];
338 0 : for( deadline_list_iter_t iter = deadline_list_iter_fwd_init( list, ssping->pool );
339 0 : msg_cnt<PING_BURST_MAX && !deadline_list_iter_done( iter, list, ssping->pool );
340 0 : iter = deadline_list_iter_fwd_next( iter, list, ssping->pool ) ) {
341 0 : fd_ssping_peer_t * peer = peer_pool_ele( ssping->pool, deadline_list_iter_idx( iter, list, ssping->pool ) );
342 0 : if( peer->deadline_nanos>until ) break;
343 :
344 0 : pkts[ msg_cnt ] = (struct ssping_pkt){
345 0 : .icmp = { .type = ICMP_ECHO },
346 0 : .port = peer->addr.port
347 0 : };
348 0 : iovs[ msg_cnt ] = (struct iovec){
349 0 : .iov_base = pkts + msg_cnt,
350 0 : .iov_len = sizeof(struct ssping_pkt)
351 0 : };
352 0 : addrs[ msg_cnt ] = (struct sockaddr_in){
353 0 : .sin_family = AF_INET,
354 0 : .sin_addr = { .s_addr = peer->addr.addr }
355 0 : };
356 0 : msgs[ msg_cnt ].msg_hdr = (struct msghdr){
357 0 : .msg_name = addrs + msg_cnt,
358 0 : .msg_namelen = sizeof(struct sockaddr_in),
359 0 : .msg_iov = iovs + msg_cnt,
360 0 : .msg_iovlen = 1,
361 0 : };
362 0 : msgs[ msg_cnt ].msg_len = 0;
363 0 : msg_cnt++;
364 0 : }
365 :
366 0 : if( msg_cnt==0U ) return 0U;
367 0 : int result = sendmmsg( ssping->sockfd, msgs, msg_cnt, 0 );
368 0 : if( FD_UNLIKELY( -1==result ) ) {
369 0 : if( errno!=EAGAIN && errno!=EINTR ) FD_LOG_WARNING(( "sendmmsg(%u) failed (%i-%s)", msg_cnt, errno, fd_io_strerror( errno ) ));
370 0 : return 0U;
371 0 : }
372 0 : FD_TEST( result>=0 && result<=(int)PING_BURST_MAX );
373 0 : return (uint)result;
374 0 : }
375 :
376 : void
377 : fd_ssping_advance( fd_ssping_t * ssping,
378 : long now,
379 0 : fd_sspeer_selector_t * selector) {
380 0 : uint sent = send_pings( ssping, ssping->unpinged, LONG_MAX );
381 0 : for( uint i=0U; i<sent; i++ ) {
382 0 : fd_ssping_peer_t * peer = deadline_list_ele_pop_head( ssping->unpinged, ssping->pool );
383 0 : FD_TEST( peer );
384 0 : peer->state = PEER_STATE_PINGED;
385 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_PING;
386 0 : deadline_list_ele_push_tail( ssping->pinged, peer, ssping->pool );
387 0 : }
388 :
389 0 : while( !deadline_list_is_empty( ssping->pinged, ssping->pool ) ) {
390 0 : fd_ssping_peer_t * peer = deadline_list_ele_peek_head( ssping->pinged, ssping->pool );
391 0 : if( FD_LIKELY( peer->deadline_nanos>now ) ) break;
392 :
393 0 : deadline_list_ele_pop_head( ssping->pinged, ssping->pool );
394 :
395 0 : peer->state = PEER_STATE_INVALID;
396 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
397 0 : deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool );
398 0 : fd_sspeer_selector_remove( selector, peer->addr );
399 0 : }
400 :
401 0 : sent = send_pings( ssping, ssping->valid, now );
402 0 : for( uint i=0U; i<sent; i++ ) {
403 0 : fd_ssping_peer_t * peer = deadline_list_ele_pop_head( ssping->valid, ssping->pool );
404 0 : FD_TEST( peer );
405 0 : peer->state = PEER_STATE_REFRESHING;
406 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_PING;
407 0 : deadline_list_ele_push_tail( ssping->refreshing, peer, ssping->pool );
408 0 : }
409 :
410 0 : while( !deadline_list_is_empty( ssping->refreshing, ssping->pool ) ) {
411 0 : fd_ssping_peer_t * peer = deadline_list_ele_peek_head( ssping->refreshing, ssping->pool );
412 0 : if( FD_LIKELY( peer->deadline_nanos>now ) ) break;
413 :
414 0 : deadline_list_ele_pop_head( ssping->refreshing, ssping->pool );
415 :
416 0 : peer->state = PEER_STATE_INVALID;
417 0 : peer->deadline_nanos = now + PEER_DEADLINE_NANOS_INVALID;
418 0 : deadline_list_ele_push_tail( ssping->invalid, peer, ssping->pool );
419 0 : fd_sspeer_selector_remove( selector, peer->addr );
420 0 : }
421 :
422 0 : while( !deadline_list_is_empty( ssping->invalid, ssping->pool ) ) {
423 0 : fd_ssping_peer_t * peer = deadline_list_ele_peek_head( ssping->invalid, ssping->pool );
424 0 : if( FD_LIKELY( peer->deadline_nanos>now ) ) break;
425 :
426 0 : deadline_list_ele_pop_head( ssping->invalid, ssping->pool );
427 :
428 0 : peer->state = PEER_STATE_UNPINGED;
429 0 : peer->deadline_nanos = 0L;
430 0 : deadline_list_ele_push_tail( ssping->unpinged, peer, ssping->pool );
431 0 : }
432 :
433 0 : recv_pings( ssping, now );
434 0 : }
|