Line data Source code
1 : #include "fd_ipecho_client.h"
2 : #include "fd_ipecho_client_private.h"
3 :
4 : #include "../../util/fd_util.h"
5 :
6 : #include <errno.h>
7 : #include <netinet/in.h>
8 : #include <string.h>
9 : #include <sys/socket.h>
10 : #include <unistd.h>
11 :
12 : FD_FN_CONST ulong
13 0 : fd_ipecho_client_align( void ) {
14 0 : return alignof(fd_ipecho_client_t);
15 0 : }
16 :
17 : FD_FN_CONST ulong
18 0 : fd_ipecho_client_footprint( void ) {
19 0 : return sizeof(fd_ipecho_client_t);
20 0 : }
21 :
22 : void *
23 0 : fd_ipecho_client_new( void * shmem ) {
24 0 : fd_ipecho_client_t * ipe = (fd_ipecho_client_t *)shmem;
25 :
26 0 : if( FD_UNLIKELY( !shmem ) ) {
27 0 : FD_LOG_WARNING(( "NULL shmem" ));
28 0 : return NULL;
29 0 : }
30 :
31 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_ipecho_client_align() ) ) ) {
32 0 : FD_LOG_WARNING(( "misaligned shmem" ));
33 0 : return NULL;
34 0 : }
35 :
36 0 : FD_COMPILER_MFENCE();
37 0 : FD_VOLATILE( ipe->magic ) = FD_IPECHO_CLIENT_MAGIC;
38 0 : FD_COMPILER_MFENCE();
39 :
40 0 : return (void *)ipe;
41 0 : }
42 :
43 : fd_ipecho_client_t *
44 0 : fd_ipecho_client_join( void * shipe ) {
45 0 : if( FD_UNLIKELY( !shipe ) ) {
46 0 : FD_LOG_WARNING(( "NULL shipe" ));
47 0 : return NULL;
48 0 : }
49 :
50 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shipe, fd_ipecho_client_align() ) ) ) {
51 0 : FD_LOG_WARNING(( "misaligned shipe" ));
52 0 : return NULL;
53 0 : }
54 :
55 0 : fd_ipecho_client_t * ipe = (fd_ipecho_client_t *)shipe;
56 :
57 0 : if( FD_UNLIKELY( ipe->magic!=FD_IPECHO_CLIENT_MAGIC ) ) {
58 0 : FD_LOG_WARNING(( "bad magic" ));
59 0 : return NULL;
60 0 : }
61 :
62 0 : return ipe;
63 0 : }
64 :
65 : void
66 : fd_ipecho_client_init( fd_ipecho_client_t * client,
67 : fd_ip4_port_t const * servers,
68 0 : ulong servers_len ) {
69 0 : ulong peer_cnt = 0UL;
70 :
71 0 : for( ulong i=0UL; i<servers_len; i++ ) {
72 0 : fd_ip4_port_t const * server = &servers[ i ];
73 :
74 0 : int sockfd = socket( AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0 );
75 0 : if( FD_UNLIKELY( -1==sockfd ) ) FD_LOG_ERR(( "socket() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
76 :
77 0 : struct sockaddr_in addr = {
78 0 : .sin_family = AF_INET,
79 0 : .sin_port = server->port,
80 0 : .sin_addr = { .s_addr = server->addr }
81 0 : };
82 :
83 0 : if( FD_UNLIKELY( -1==connect( sockfd, fd_type_pun( &addr ), sizeof(addr) ) && errno!=EINPROGRESS ) ) {
84 0 : if( FD_UNLIKELY( -1==close( sockfd ) ) ) FD_LOG_ERR(( "close() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
85 0 : continue;
86 0 : }
87 :
88 0 : client->pollfds[ peer_cnt ] = (struct pollfd){
89 0 : .fd = sockfd,
90 0 : .events = POLLIN | POLLOUT,
91 0 : .revents = 0
92 0 : };
93 0 : client->peers[ peer_cnt ].writing = 1;
94 0 : client->peers[ peer_cnt ].request_bytes_sent = 0UL;
95 0 : client->peers[ peer_cnt ].response_bytes_read = 0UL;
96 0 : peer_cnt++;
97 0 : }
98 :
99 0 : for( ulong i=peer_cnt; i<16UL; i++ ) client->pollfds[ i ].fd = -1;
100 :
101 0 : client->peer_cnt = peer_cnt;
102 0 : client->remaining_peer_cnt = peer_cnt;
103 0 : client->start_time_nanos = fd_log_wallclock();
104 0 : }
105 :
106 : static void
107 : close_one( fd_ipecho_client_t * client,
108 0 : ulong idx ) {
109 0 : if( FD_UNLIKELY( -1==close( client->pollfds[ idx ].fd ) ) ) FD_LOG_ERR(( "close() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
110 0 : client->pollfds[ idx ].fd = -1;
111 0 : client->remaining_peer_cnt--;
112 0 : }
113 :
114 : static void
115 0 : close_all( fd_ipecho_client_t * client ) {
116 0 : for( ulong i=0UL; i<client->peer_cnt; i++ ) {
117 0 : if( FD_UNLIKELY( -1==client->pollfds[ i ].fd ) ) continue;
118 0 : close_one( client, i );
119 0 : }
120 0 : }
121 :
122 : int
123 : fd_ipecho_client_parse_response( uchar const * response,
124 : ulong response_len,
125 0 : ushort * _shred_version ) {
126 0 : if( FD_UNLIKELY( response_len<8UL ) ) return FD_IPECHO_PARSE_ERR;
127 0 : if( FD_UNLIKELY( memcmp( response, "\0\0\0\0", 4UL ) ) ) return FD_IPECHO_PARSE_ERR;
128 :
129 0 : uint ip_variant = fd_uint_load_4_fast( response+4UL );
130 0 : if( FD_UNLIKELY( ip_variant>1U ) ) return FD_IPECHO_PARSE_ERR;
131 :
132 0 : ulong offset = 8UL;
133 0 : if( FD_LIKELY( !ip_variant ) ) offset+=4UL;
134 0 : else offset+=16UL;
135 :
136 0 : if( FD_UNLIKELY( response_len<offset+3UL ) ) return FD_IPECHO_PARSE_ERR;
137 :
138 0 : if( FD_UNLIKELY( response[ offset ]!=1 ) ) return FD_IPECHO_PARSE_ERR;
139 :
140 0 : ushort shred_version = fd_ushort_load_2_fast( response+offset+1UL );
141 0 : if( FD_UNLIKELY( !shred_version ) ) return FD_IPECHO_PARSE_ERR;
142 :
143 0 : *_shred_version = shred_version;
144 0 : return FD_IPECHO_PARSE_OK;
145 0 : }
146 :
147 : static void
148 : write_conn( fd_ipecho_client_t * client,
149 0 : ulong conn_idx ) {
150 0 : fd_ipecho_client_peer_t * peer = &client->peers[ conn_idx ];
151 :
152 0 : if( FD_LIKELY( !peer->writing ) ) return;
153 :
154 0 : uchar request[ 21UL ] = {
155 0 : 0, 0, 0, 0, /* Magic */
156 0 : 0, 0, 0, 0, 0, 0, 0, 0, /* TCP ports */
157 0 : 0, 0, 0, 0, 0, 0, 0, 0, /* UDP ports */
158 0 : '\n', /* End of request */
159 0 : };
160 :
161 0 : long written = sendto( client->pollfds[ conn_idx ].fd,
162 0 : request+peer->request_bytes_sent,
163 0 : sizeof(request)-peer->request_bytes_sent,
164 0 : MSG_NOSIGNAL,
165 0 : NULL,
166 0 : 0 );
167 0 : if( FD_UNLIKELY( -1==written && errno==EAGAIN ) ) return; /* No data was written, continue. */
168 0 : else if( FD_UNLIKELY( -1==written ) ) {
169 0 : close_one( client, conn_idx );
170 0 : return;
171 0 : }
172 :
173 0 : peer->request_bytes_sent += (ulong)written;
174 0 : if( FD_UNLIKELY( peer->request_bytes_sent==sizeof(request) ) ) {
175 0 : peer->writing = 0;
176 0 : peer->response_bytes_read = 0UL;
177 0 : }
178 0 : }
179 :
180 : static int
181 : read_conn( fd_ipecho_client_t * client,
182 : ulong conn_idx,
183 0 : ushort * shred_version ) {
184 0 : fd_ipecho_client_peer_t * peer = &client->peers[ conn_idx ];
185 :
186 0 : if( FD_UNLIKELY( peer->writing ) ) return 1;
187 0 : long read = recvfrom( client->pollfds[ conn_idx ].fd,
188 0 : peer->response+peer->response_bytes_read,
189 0 : sizeof(peer->response)-peer->response_bytes_read,
190 0 : 0,
191 0 : NULL,
192 0 : NULL );
193 0 : if( FD_UNLIKELY( -1==read && (errno==EAGAIN || errno==EINTR) ) ) return 1;
194 0 : else if( FD_UNLIKELY( -1==read ) ) {
195 0 : close_one( client, conn_idx );
196 0 : return 1;
197 0 : }
198 :
199 0 : peer->response_bytes_read += (ulong)read;
200 0 : if( FD_UNLIKELY( read ) ) return 1;
201 :
202 0 : int response = fd_ipecho_client_parse_response( peer->response,
203 0 : peer->response_bytes_read,
204 0 : shred_version );
205 0 : if( FD_LIKELY( response==FD_IPECHO_PARSE_OK ) ) return 0;
206 0 : else {
207 0 : close_one( client, conn_idx );
208 0 : return 1;
209 0 : }
210 0 : }
211 :
212 : int
213 : fd_ipecho_client_poll( fd_ipecho_client_t * client,
214 : ushort * shred_version,
215 0 : int * charge_busy ) {
216 0 : if( FD_UNLIKELY( !client->remaining_peer_cnt ) ) return -1;
217 0 : if( FD_UNLIKELY( fd_log_wallclock()-client->start_time_nanos>2L*1000L*1000*1000L ) ) {
218 0 : close_all( client );
219 0 : return -1;
220 0 : }
221 :
222 0 : int nfds = fd_syscall_poll( client->pollfds, (uint)client->peer_cnt, 0 );
223 0 : if( FD_UNLIKELY( 0==nfds ) ) return 1;
224 0 : else if( FD_UNLIKELY( -1==nfds && errno==EINTR ) ) return 1;
225 0 : else if( FD_UNLIKELY( -1==nfds ) ) FD_LOG_ERR(( "poll() failed (%i-%s)", errno, strerror( errno ) ));
226 :
227 0 : *charge_busy = 1;
228 :
229 0 : for( ulong i=0UL; i<16UL; i++ ) {
230 0 : if( FD_UNLIKELY( -1==client->pollfds[ i ].fd ) ) continue;
231 :
232 0 : if( FD_LIKELY( client->pollfds[ i ].revents & POLLOUT ) ) write_conn( client, i );
233 0 : if( FD_UNLIKELY( -1==client->pollfds[ i ].fd ) ) continue;
234 0 : if( FD_LIKELY( client->pollfds[ i ].revents & POLLIN ) ) {
235 0 : if( FD_LIKELY( !read_conn( client, i, shred_version ) ) ) {
236 0 : close_all( client );
237 0 : return 0;
238 0 : }
239 0 : }
240 0 : }
241 :
242 0 : return 1;
243 0 : }
244 :
245 : struct pollfd const *
246 0 : fd_ipecho_client_get_pollfds( fd_ipecho_client_t * client ) {
247 0 : return client->pollfds;
248 0 : }
|