Line data Source code
1 : #include "fd_genesis_client_private.h"
2 :
3 : #include "../../waltz/http/picohttpparser.h"
4 : #include "../../util/fd_util.h"
5 : #include "../../ballet/sha256/fd_sha256.h"
6 :
7 : #include <errno.h>
8 : #include <netinet/in.h>
9 : #include <string.h>
10 : #include <strings.h>
11 : #include <sys/socket.h>
12 : #include <unistd.h>
13 : #include <stdlib.h>
14 :
15 : FD_FN_CONST ulong
16 0 : fd_genesis_client_align( void ) {
17 0 : return alignof(fd_genesis_client_t);
18 0 : }
19 :
20 : FD_FN_CONST ulong
21 0 : fd_genesis_client_footprint( void ) {
22 0 : return sizeof(fd_genesis_client_t);
23 0 : }
24 :
25 : void *
26 0 : fd_genesis_client_new( void * shmem ) {
27 0 : fd_genesis_client_t * gen = (fd_genesis_client_t *)shmem;
28 :
29 0 : if( FD_UNLIKELY( !shmem ) ) {
30 0 : FD_LOG_WARNING(( "NULL shmem" ));
31 0 : return NULL;
32 0 : }
33 :
34 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_genesis_client_align() ) ) ) {
35 0 : FD_LOG_WARNING(( "misaligned shmem" ));
36 0 : return NULL;
37 0 : }
38 :
39 0 : FD_COMPILER_MFENCE();
40 0 : FD_VOLATILE( gen->magic ) = FD_GENESIS_CLIENT_MAGIC;
41 0 : FD_COMPILER_MFENCE();
42 :
43 0 : return (void *)gen;
44 0 : }
45 :
46 : fd_genesis_client_t *
47 0 : fd_genesis_client_join( void * shgen ) {
48 0 : if( FD_UNLIKELY( !shgen ) ) {
49 0 : FD_LOG_WARNING(( "NULL shgen" ));
50 0 : return NULL;
51 0 : }
52 :
53 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shgen, fd_genesis_client_align() ) ) ) {
54 0 : FD_LOG_WARNING(( "misaligned shgen" ));
55 0 : return NULL;
56 0 : }
57 :
58 0 : fd_genesis_client_t * gen = (fd_genesis_client_t *)shgen;
59 :
60 0 : if( FD_UNLIKELY( gen->magic!=FD_GENESIS_CLIENT_MAGIC ) ) {
61 0 : FD_LOG_WARNING(( "bad magic" ));
62 0 : return NULL;
63 0 : }
64 :
65 0 : return gen;
66 0 : }
67 :
68 : void
69 : fd_genesis_client_init( fd_genesis_client_t * client,
70 : fd_ip4_port_t const * servers,
71 0 : ulong servers_len ) {
72 0 : FD_TEST( servers_len<=FD_TOPO_GOSSIP_ENTRYPOINTS_MAX );
73 0 : ulong peer_cnt = 0UL;
74 :
75 0 : for( ulong i=0UL; i<servers_len; i++ ) {
76 0 : fd_ip4_port_t server = servers[ i ];
77 0 : server.port = 8899; // TODO: SPECIFY IN CONFIG
78 :
79 0 : int sockfd = socket( AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0 );
80 0 : if( FD_UNLIKELY( -1==sockfd ) ) FD_LOG_ERR(( "socket() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
81 :
82 0 : struct sockaddr_in addr = {
83 0 : .sin_family = AF_INET,
84 0 : .sin_port = fd_ushort_bswap( server.port ),
85 0 : .sin_addr = { .s_addr = server.addr }
86 0 : };
87 :
88 0 : if( FD_UNLIKELY( -1==connect( sockfd, fd_type_pun( &addr ), sizeof(addr) ) && errno!=EINPROGRESS ) ) {
89 0 : if( FD_UNLIKELY( -1==close( sockfd ) ) ) FD_LOG_ERR(( "close() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
90 0 : continue;
91 0 : }
92 :
93 0 : client->pollfds[ peer_cnt ] = (struct pollfd){
94 0 : .fd = sockfd,
95 0 : .events = POLLIN | POLLOUT,
96 0 : .revents = 0
97 0 : };
98 0 : client->peers[ peer_cnt ].addr = server;
99 0 : client->peers[ peer_cnt ].writing = 1;
100 0 : client->peers[ peer_cnt ].request_bytes_sent = 0UL;
101 0 : client->peers[ peer_cnt ].response_bytes_read = 0UL;
102 0 : peer_cnt++;
103 0 : }
104 :
105 0 : for( ulong i=peer_cnt; i<FD_TOPO_GOSSIP_ENTRYPOINTS_MAX; i++ ) client->pollfds[ i ].fd = -1;
106 :
107 0 : client->peer_cnt = peer_cnt;
108 0 : client->remaining_peer_cnt = peer_cnt;
109 0 : client->start_time_nanos = fd_log_wallclock();
110 0 : }
111 :
112 : static void
113 : close_one( fd_genesis_client_t * client,
114 0 : ulong idx ) {
115 0 : if( FD_UNLIKELY( -1==close( client->pollfds[ idx ].fd ) ) ) FD_LOG_ERR(( "close() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
116 0 : client->pollfds[ idx ].fd = -1;
117 0 : client->remaining_peer_cnt--;
118 0 : }
119 :
120 : static void
121 0 : close_all( fd_genesis_client_t * client ) {
122 0 : for( ulong i=0UL; i<client->peer_cnt; i++ ) {
123 0 : if( FD_UNLIKELY( -1==client->pollfds[ i ].fd ) ) continue;
124 0 : close_one( client, i );
125 0 : }
126 0 : }
127 :
128 : static void
129 : write_conn( fd_genesis_client_t * client,
130 0 : ulong conn_idx ) {
131 0 : fd_genesis_client_peer_t * peer = &client->peers[ conn_idx ];
132 :
133 0 : if( FD_LIKELY( !peer->writing ) ) return;
134 :
135 0 : char request[ 1024UL ];
136 0 : FD_TEST( fd_cstr_printf_check( request, sizeof(request), NULL, "GET /genesis.tar.bz2 HTTP/1.1\r\n"
137 0 : "Cache-Control: no-cache\r\n"
138 0 : "Connection: keep-alive\r\n"
139 0 : "Pragma: no-cache\r\n"
140 0 : "User-Agent: Firedancer\r\n"
141 0 : "Host: " FD_IP4_ADDR_FMT ":%hu\r\n\r\n",
142 0 : FD_IP4_ADDR_FMT_ARGS( peer->addr.addr ), fd_ushort_bswap( peer->addr.port ) ) );
143 :
144 0 : long written = sendto( client->pollfds[ conn_idx ].fd,
145 0 : request+peer->request_bytes_sent,
146 0 : sizeof(request)-peer->request_bytes_sent,
147 0 : MSG_NOSIGNAL,
148 0 : NULL,
149 0 : 0 );
150 0 : if( FD_UNLIKELY( -1==written && errno==EAGAIN ) ) return; /* No data was written, continue. */
151 0 : else if( FD_UNLIKELY( -1==written ) ) {
152 0 : close_one( client, conn_idx );
153 0 : return;
154 0 : }
155 :
156 0 : peer->request_bytes_sent += (ulong)written;
157 0 : if( FD_UNLIKELY( peer->request_bytes_sent==sizeof(request) ) ) {
158 0 : peer->writing = 0;
159 0 : peer->response_bytes_read = 0UL;
160 0 : }
161 0 : }
162 :
163 : static ulong
164 : rpc_phr_content_length( struct phr_header * headers,
165 0 : ulong num_headers ) {
166 0 : for( ulong i=0UL; i<num_headers; i++ ) {
167 0 : if( FD_LIKELY( headers[i].name_len!=14UL ) ) continue;
168 0 : if( FD_LIKELY( strncasecmp( headers[i].name, "Content-Length", 14UL ) ) ) continue;
169 0 : char * end;
170 0 : ulong content_length = strtoul( headers[i].value, &end, 10 );
171 0 : if( FD_UNLIKELY( content_length>UINT_MAX ) ) return ULONG_MAX; /* prevent overflow */
172 0 : if( FD_UNLIKELY( end==headers[i].value ) ) return ULONG_MAX;
173 0 : return content_length;
174 0 : }
175 0 : return ULONG_MAX;
176 0 : }
177 :
178 : static int
179 : read_conn( fd_genesis_client_t * client,
180 : ulong conn_idx,
181 : uchar ** buffer,
182 0 : ulong * buffer_sz ) {
183 0 : fd_genesis_client_peer_t * peer = &client->peers[ conn_idx ];
184 :
185 0 : if( FD_UNLIKELY( peer->writing ) ) return 1;
186 0 : long read = recvfrom( client->pollfds[ conn_idx ].fd,
187 0 : peer->response+peer->response_bytes_read,
188 0 : sizeof(peer->response)-peer->response_bytes_read,
189 0 : 0,
190 0 : NULL,
191 0 : NULL );
192 0 : if( FD_UNLIKELY( -1==read && (errno==EAGAIN || errno==EINTR) ) ) return 1;
193 0 : else if( FD_UNLIKELY( -1==read ) ) {
194 0 : close_one( client, conn_idx );
195 0 : return 1;
196 0 : }
197 :
198 0 : peer->response_bytes_read += (ulong)read;
199 :
200 0 : int minor_version;
201 0 : int status;
202 0 : const char * message;
203 0 : ulong message_len;
204 0 : struct phr_header headers[ 32 ];
205 0 : ulong num_headers = 32UL;
206 0 : int len = phr_parse_response( (char*)peer->response, peer->response_bytes_read,
207 0 : &minor_version, &status, &message, &message_len,
208 0 : headers, &num_headers, 0L );
209 0 : if( FD_UNLIKELY( -1==len ) ) {
210 0 : close_one( client, conn_idx );
211 0 : return 1;
212 0 : } else if( FD_UNLIKELY( -2==len ) ) {
213 0 : return 1;
214 0 : }
215 :
216 0 : if( FD_UNLIKELY( status!=200 ) ) {
217 0 : close_one( client, conn_idx );
218 0 : return 1;
219 0 : }
220 :
221 0 : ulong content_length = rpc_phr_content_length( headers, num_headers );
222 0 : if( FD_UNLIKELY( content_length==ULONG_MAX ) ) {
223 0 : close_one( client, conn_idx );
224 0 : return 1;
225 0 : }
226 0 : if( FD_UNLIKELY( content_length+(ulong)len>sizeof(peer->response) ) ) {
227 0 : close_one( client, conn_idx );
228 0 : return 1;
229 0 : }
230 0 : if( FD_LIKELY( content_length+(ulong)len>peer->response_bytes_read ) ) {
231 0 : return 1;
232 0 : }
233 :
234 0 : *buffer_sz = content_length;
235 0 : *buffer = peer->response + (ulong)len;
236 :
237 0 : uchar hash[ 32UL ] = {0};
238 0 : fd_sha256_hash( *buffer, *buffer_sz, hash );
239 :
240 0 : return 0;
241 0 : }
242 :
243 : int
244 : fd_genesis_client_poll( fd_genesis_client_t * client,
245 : fd_ip4_port_t * peer,
246 : uchar ** buffer,
247 : ulong * buffer_sz,
248 0 : int * charge_busy ) {
249 0 : if( FD_UNLIKELY( !client->remaining_peer_cnt ) ) return -1;
250 0 : if( FD_UNLIKELY( fd_log_wallclock()-client->start_time_nanos>20L*1000L*1000*1000L ) ) {
251 0 : close_all( client );
252 0 : return -1;
253 0 : }
254 :
255 0 : int nfds = fd_syscall_poll( client->pollfds, (uint)client->peer_cnt, 0 );
256 0 : if( FD_UNLIKELY( 0==nfds ) ) return 1;
257 0 : else if( FD_UNLIKELY( -1==nfds && errno==EINTR ) ) return 1;
258 0 : else if( FD_UNLIKELY( -1==nfds ) ) FD_LOG_ERR(( "poll() failed (%i-%s)", errno, strerror( errno ) ));
259 :
260 0 : *charge_busy = 1;
261 :
262 0 : for( ulong i=0UL; i<FD_TOPO_GOSSIP_ENTRYPOINTS_MAX; i++ ) {
263 0 : if( FD_UNLIKELY( -1==client->pollfds[ i ].fd ) ) continue;
264 :
265 0 : if( FD_LIKELY( client->pollfds[ i ].revents & POLLOUT ) ) write_conn( client, i );
266 0 : if( FD_UNLIKELY( -1==client->pollfds[ i ].fd ) ) continue;
267 0 : if( FD_LIKELY( client->pollfds[ i ].revents & POLLIN ) ) {
268 0 : if( FD_LIKELY( !read_conn( client, i, buffer, buffer_sz ) ) ) {
269 0 : close_all( client );
270 0 : *peer = client->peers[ i ].addr;
271 0 : return 0;
272 0 : }
273 0 : }
274 0 : }
275 :
276 0 : return 1;
277 0 : }
278 :
279 : struct pollfd const *
280 0 : fd_genesis_client_get_pollfds( fd_genesis_client_t * client ) {
281 0 : return client->pollfds;
282 0 : }
|