Line data Source code
1 : #include "fd_ipecho_client.h"
2 : #include "fd_ipecho_client_private.h"
3 :
4 : #include "../../util/fd_util.h"
5 :
6 : #include <errno.h>
7 : #include <netinet/in.h>
8 : #include <string.h>
9 : #include <sys/socket.h>
10 : #include <unistd.h>
11 :
12 : FD_FN_CONST ulong
13 0 : fd_ipecho_client_align( void ) {
14 0 : return alignof(fd_ipecho_client_t);
15 0 : }
16 :
17 : FD_FN_CONST ulong
18 0 : fd_ipecho_client_footprint( void ) {
19 0 : return sizeof(fd_ipecho_client_t);
20 0 : }
21 :
22 : void *
23 0 : fd_ipecho_client_new( void * shmem ) {
24 0 : fd_ipecho_client_t * ipe = (fd_ipecho_client_t *)shmem;
25 :
26 0 : if( FD_UNLIKELY( !shmem ) ) {
27 0 : FD_LOG_WARNING(( "NULL shmem" ));
28 0 : return NULL;
29 0 : }
30 :
31 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_ipecho_client_align() ) ) ) {
32 0 : FD_LOG_WARNING(( "misaligned shmem" ));
33 0 : return NULL;
34 0 : }
35 :
36 0 : FD_COMPILER_MFENCE();
37 0 : FD_VOLATILE( ipe->magic ) = FD_IPECHO_CLIENT_MAGIC;
38 0 : FD_COMPILER_MFENCE();
39 :
40 0 : return (void *)ipe;
41 0 : }
42 :
43 : fd_ipecho_client_t *
44 0 : fd_ipecho_client_join( void * shipe ) {
45 0 : if( FD_UNLIKELY( !shipe ) ) {
46 0 : FD_LOG_WARNING(( "NULL shipe" ));
47 0 : return NULL;
48 0 : }
49 :
50 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shipe, fd_ipecho_client_align() ) ) ) {
51 0 : FD_LOG_WARNING(( "misaligned shipe" ));
52 0 : return NULL;
53 0 : }
54 :
55 0 : fd_ipecho_client_t * ipe = (fd_ipecho_client_t *)shipe;
56 :
57 0 : if( FD_UNLIKELY( ipe->magic!=FD_IPECHO_CLIENT_MAGIC ) ) {
58 0 : FD_LOG_WARNING(( "bad magic" ));
59 0 : return NULL;
60 0 : }
61 :
62 0 : return ipe;
63 0 : }
64 :
65 : void
66 : fd_ipecho_client_init( fd_ipecho_client_t * client,
67 : fd_ip4_port_t const * servers,
68 0 : ulong servers_len ) {
69 0 : ulong peer_cnt = 0UL;
70 :
71 0 : for( ulong i=0UL; i<servers_len; i++ ) {
72 0 : fd_ip4_port_t const * server = &servers[ i ];
73 :
74 0 : int sockfd = socket( AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0 );
75 0 : if( FD_UNLIKELY( -1==sockfd ) ) FD_LOG_ERR(( "socket() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
76 :
77 0 : struct sockaddr_in addr = {
78 0 : .sin_family = AF_INET,
79 0 : .sin_port = server->port,
80 0 : .sin_addr = { .s_addr = server->addr }
81 0 : };
82 :
83 0 : if( FD_UNLIKELY( -1==connect( sockfd, fd_type_pun( &addr ), sizeof(addr) ) && errno!=EINPROGRESS ) ) {
84 0 : if( FD_UNLIKELY( -1==close( sockfd ) ) ) FD_LOG_ERR(( "close() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
85 0 : continue;
86 0 : }
87 :
88 0 : client->pollfds[ peer_cnt ] = (struct pollfd){
89 0 : .fd = sockfd,
90 0 : .events = POLLIN | POLLOUT,
91 0 : .revents = 0
92 0 : };
93 0 : client->peers[ peer_cnt ].writing = 1;
94 0 : client->peers[ peer_cnt ].request_bytes_sent = 0UL;
95 0 : client->peers[ peer_cnt ].response_bytes_read = 0UL;
96 0 : peer_cnt++;
97 0 : }
98 :
99 0 : for( ulong i=peer_cnt; i<16UL; i++ ) client->pollfds[ i ].fd = -1;
100 :
101 0 : client->peer_cnt = peer_cnt;
102 0 : client->start_time_nanos = fd_log_wallclock();
103 0 : }
104 :
105 : static void
106 : close_one( fd_ipecho_client_t * client,
107 0 : ulong idx ) {
108 0 : if( FD_UNLIKELY( -1==close( client->pollfds[ idx ].fd ) ) ) FD_LOG_ERR(( "close() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
109 0 : client->pollfds[ idx ].fd = -1;
110 0 : client->peer_cnt--;
111 0 : }
112 :
113 : static void
114 0 : close_all( fd_ipecho_client_t * client ) {
115 0 : for( ulong i=0UL; i<client->peer_cnt; i++ ) {
116 0 : if( FD_UNLIKELY( -1==client->pollfds[ i ].fd ) ) continue;
117 0 : close_one( client, i );
118 0 : }
119 0 : }
120 :
121 : int
122 : fd_ipecho_client_parse_response( uchar const * response,
123 : ulong response_len,
124 0 : ushort * _shred_version ) {
125 0 : if( FD_UNLIKELY( response_len<8UL ) ) return FD_IPECHO_PARSE_ERR;
126 0 : if( FD_UNLIKELY( memcmp( response, "\0\0\0\0", 4UL ) ) ) return FD_IPECHO_PARSE_ERR;
127 :
128 0 : uint ip_variant = fd_uint_load_4_fast( response+4UL );
129 0 : if( FD_UNLIKELY( ip_variant>1U ) ) return FD_IPECHO_PARSE_ERR;
130 :
131 0 : ulong offset = 8UL;
132 0 : if( FD_LIKELY( !ip_variant ) ) offset+=4UL;
133 0 : else offset+=16UL;
134 :
135 0 : if( FD_UNLIKELY( response_len<offset+3UL ) ) return FD_IPECHO_PARSE_ERR;
136 :
137 0 : if( FD_UNLIKELY( response[ offset ]!=1 ) ) return FD_IPECHO_PARSE_ERR;
138 :
139 0 : ushort shred_version = fd_ushort_load_2_fast( response+offset+1UL );
140 0 : if( FD_UNLIKELY( !shred_version ) ) return FD_IPECHO_PARSE_ERR;
141 :
142 0 : *_shred_version = shred_version;
143 0 : return FD_IPECHO_PARSE_OK;
144 0 : }
145 :
146 : static void
147 : write_conn( fd_ipecho_client_t * client,
148 0 : ulong conn_idx ) {
149 0 : fd_ipecho_client_peer_t * peer = &client->peers[ conn_idx ];
150 :
151 0 : if( FD_LIKELY( !peer->writing ) ) return;
152 :
153 0 : uchar request[ 21UL ] = {
154 0 : 0, 0, 0, 0, /* Magic */
155 0 : 0, 0, 0, 0, 0, 0, 0, 0, /* TCP ports */
156 0 : 0, 0, 0, 0, 0, 0, 0, 0, /* UDP ports */
157 0 : '\n', /* End of request */
158 0 : };
159 :
160 0 : long written = send( client->pollfds[ conn_idx ].fd,
161 0 : request+peer->request_bytes_sent,
162 0 : sizeof(request)-peer->request_bytes_sent,
163 0 : MSG_NOSIGNAL );
164 0 : if( FD_UNLIKELY( -1==written && errno==EAGAIN ) ) return; /* No data was written, continue. */
165 0 : else if( FD_UNLIKELY( -1==written ) ) {
166 0 : close_one( client, conn_idx );
167 0 : return;
168 0 : }
169 :
170 0 : peer->request_bytes_sent += (ulong)written;
171 0 : if( FD_UNLIKELY( peer->request_bytes_sent==sizeof(request) ) ) {
172 0 : peer->writing = 0;
173 0 : peer->response_bytes_read = 0UL;
174 0 : }
175 0 : }
176 :
177 : static int
178 : read_conn( fd_ipecho_client_t * client,
179 : ulong conn_idx,
180 0 : ushort * shred_version ) {
181 0 : fd_ipecho_client_peer_t * peer = &client->peers[ conn_idx ];
182 :
183 0 : if( FD_UNLIKELY( peer->writing ) ) return 1;
184 :
185 0 : long read = recv( client->pollfds[ conn_idx ].fd,
186 0 : peer->response+peer->response_bytes_read,
187 0 : sizeof(peer->response)-peer->response_bytes_read,
188 0 : 0 );
189 :
190 0 : if( FD_UNLIKELY( -1==read && (errno==EAGAIN || errno==EINTR) ) ) return 1;
191 0 : else if( FD_UNLIKELY( -1==read ) ) {
192 0 : close_one( client, conn_idx );
193 0 : return 1;
194 0 : }
195 :
196 0 : peer->response_bytes_read += (ulong)read;
197 0 : if( FD_UNLIKELY( read ) ) return 1;
198 :
199 0 : int response = fd_ipecho_client_parse_response( peer->response,
200 0 : peer->response_bytes_read,
201 0 : shred_version );
202 0 : if( FD_LIKELY( response==FD_IPECHO_PARSE_OK ) ) return 0;
203 0 : else {
204 0 : close_one( client, conn_idx );
205 0 : return 1;
206 0 : }
207 0 : }
208 :
209 : int
210 : fd_ipecho_client_poll( fd_ipecho_client_t * client,
211 : ushort * shred_version,
212 0 : int * charge_busy ) {
213 0 : if( FD_UNLIKELY( !client->peer_cnt ) ) return -1;
214 0 : if( FD_UNLIKELY( fd_log_wallclock()-client->start_time_nanos>2L*1000L*1000*1000L ) ) {
215 0 : close_all( client );
216 0 : return -1;
217 0 : }
218 :
219 0 : int nfds = fd_syscall_poll( client->pollfds, 16U, 0 );
220 0 : if( FD_UNLIKELY( 0==nfds ) ) return 1;
221 0 : else if( FD_UNLIKELY( -1==nfds && errno==EINTR ) ) return 1;
222 0 : else if( FD_UNLIKELY( -1==nfds ) ) FD_LOG_ERR(( "poll() failed (%i-%s)", errno, strerror( errno ) ));
223 :
224 0 : *charge_busy = 1;
225 :
226 0 : for( ulong i=0UL; i<16UL; i++ ) {
227 0 : if( FD_UNLIKELY( -1==client->pollfds[ i ].fd ) ) continue;
228 :
229 0 : if( FD_LIKELY( client->pollfds[ i ].revents & POLLOUT ) ) write_conn( client, i );
230 0 : if( FD_UNLIKELY( -1==client->pollfds[ i ].fd ) ) continue;
231 0 : if( FD_LIKELY( client->pollfds[ i ].revents & POLLIN ) ) {
232 0 : if( FD_LIKELY( !read_conn( client, i, shred_version ) ) ) {
233 0 : close_all( client );
234 0 : return 0;
235 0 : }
236 0 : }
237 0 : }
238 :
239 0 : return 1;
240 0 : }
|