Line data Source code
1 : #include "fd_genesis_client_private.h"
2 :
3 : #include "../../waltz/http/picohttpparser.h"
4 : #include "../../util/fd_util.h"
5 : #include "../../waltz/http/fd_http.h"
6 : #include "../../ballet/sha256/fd_sha256.h"
7 :
8 : #include <errno.h>
9 : #include <netinet/in.h>
10 : #include <string.h>
11 : #include <strings.h>
12 : #include <sys/socket.h>
13 : #include <unistd.h>
14 : #include <stdlib.h>
15 :
16 : FD_FN_CONST ulong
17 0 : fd_genesis_client_align( void ) {
18 0 : return alignof(fd_genesis_client_t);
19 0 : }
20 :
21 : FD_FN_CONST ulong
22 0 : fd_genesis_client_footprint( void ) {
23 0 : return sizeof(fd_genesis_client_t);
24 0 : }
25 :
26 : void *
27 0 : fd_genesis_client_new( void * shmem ) {
28 0 : fd_genesis_client_t * gen = (fd_genesis_client_t *)shmem;
29 :
30 0 : if( FD_UNLIKELY( !shmem ) ) {
31 0 : FD_LOG_WARNING(( "NULL shmem" ));
32 0 : return NULL;
33 0 : }
34 :
35 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_genesis_client_align() ) ) ) {
36 0 : FD_LOG_WARNING(( "misaligned shmem" ));
37 0 : return NULL;
38 0 : }
39 :
40 0 : FD_COMPILER_MFENCE();
41 0 : FD_VOLATILE( gen->magic ) = FD_GENESIS_CLIENT_MAGIC;
42 0 : FD_COMPILER_MFENCE();
43 :
44 0 : return (void *)gen;
45 0 : }
46 :
47 : fd_genesis_client_t *
48 0 : fd_genesis_client_join( void * shgen ) {
49 0 : if( FD_UNLIKELY( !shgen ) ) {
50 0 : FD_LOG_WARNING(( "NULL shgen" ));
51 0 : return NULL;
52 0 : }
53 :
54 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shgen, fd_genesis_client_align() ) ) ) {
55 0 : FD_LOG_WARNING(( "misaligned shgen" ));
56 0 : return NULL;
57 0 : }
58 :
59 0 : fd_genesis_client_t * gen = (fd_genesis_client_t *)shgen;
60 :
61 0 : if( FD_UNLIKELY( gen->magic!=FD_GENESIS_CLIENT_MAGIC ) ) {
62 0 : FD_LOG_WARNING(( "bad magic" ));
63 0 : return NULL;
64 0 : }
65 :
66 0 : return gen;
67 0 : }
68 :
69 : void
70 : fd_genesis_client_init( fd_genesis_client_t * client,
71 : fd_ip4_port_t const * servers,
72 0 : ulong servers_len ) {
73 0 : FD_TEST( servers_len<=FD_TOPO_GOSSIP_ENTRYPOINTS_MAX );
74 0 : ulong peer_cnt = 0UL;
75 :
76 0 : for( ulong i=0UL; i<servers_len; i++ ) {
77 0 : fd_ip4_port_t server = servers[ i ];
78 0 : server.port = 8899; // TODO: SPECIFY IN CONFIG
79 :
80 0 : int sockfd = socket( AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0 );
81 0 : if( FD_UNLIKELY( -1==sockfd ) ) FD_LOG_ERR(( "socket() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
82 :
83 0 : struct sockaddr_in addr = {
84 0 : .sin_family = AF_INET,
85 0 : .sin_port = fd_ushort_bswap( server.port ),
86 0 : .sin_addr = { .s_addr = server.addr }
87 0 : };
88 :
89 0 : if( FD_UNLIKELY( -1==connect( sockfd, fd_type_pun( &addr ), sizeof(addr) ) && errno!=EINPROGRESS ) ) {
90 0 : if( FD_UNLIKELY( -1==close( sockfd ) ) ) FD_LOG_ERR(( "close() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
91 0 : continue;
92 0 : }
93 :
94 0 : client->pollfds[ peer_cnt ] = (struct pollfd){
95 0 : .fd = sockfd,
96 0 : .events = POLLIN | POLLOUT,
97 0 : .revents = 0
98 0 : };
99 0 : client->peers[ peer_cnt ].addr = server;
100 0 : client->peers[ peer_cnt ].writing = 1;
101 0 : client->peers[ peer_cnt ].request_bytes_sent = 0UL;
102 0 : client->peers[ peer_cnt ].response_bytes_read = 0UL;
103 0 : peer_cnt++;
104 0 : }
105 :
106 0 : for( ulong i=peer_cnt; i<FD_TOPO_GOSSIP_ENTRYPOINTS_MAX; i++ ) client->pollfds[ i ].fd = -1;
107 :
108 0 : client->peer_cnt = peer_cnt;
109 0 : client->remaining_peer_cnt = peer_cnt;
110 0 : client->start_time_nanos = LONG_MAX;
111 0 : }
112 :
113 : static void
114 : close_one( fd_genesis_client_t * client,
115 0 : ulong idx ) {
116 0 : if( FD_UNLIKELY( -1==close( client->pollfds[ idx ].fd ) ) ) FD_LOG_ERR(( "close() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
117 0 : client->pollfds[ idx ].fd = -1;
118 0 : client->remaining_peer_cnt--;
119 0 : }
120 :
121 : static void
122 0 : close_all( fd_genesis_client_t * client ) {
123 0 : for( ulong i=0UL; i<client->peer_cnt; i++ ) {
124 0 : if( FD_UNLIKELY( -1==client->pollfds[ i ].fd ) ) continue;
125 0 : close_one( client, i );
126 0 : }
127 0 : }
128 :
129 : static void
130 : write_conn( fd_genesis_client_t * client,
131 0 : ulong conn_idx ) {
132 0 : fd_genesis_client_peer_t * peer = &client->peers[ conn_idx ];
133 :
134 0 : if( FD_LIKELY( !peer->writing ) ) return;
135 :
136 0 : char request[ 1024UL ];
137 0 : ulong request_sz = 0UL;
138 0 : FD_TEST( fd_cstr_printf_check( request, sizeof(request), &request_sz, "GET /genesis.tar.bz2 HTTP/1.1\r\n"
139 0 : "Cache-Control: no-cache\r\n"
140 0 : "Connection: keep-alive\r\n"
141 0 : "Pragma: no-cache\r\n"
142 0 : "User-Agent: Firedancer\r\n"
143 0 : "Host: " FD_IP4_ADDR_FMT ":%hu\r\n\r\n",
144 0 : FD_IP4_ADDR_FMT_ARGS( peer->addr.addr ), fd_ushort_bswap( peer->addr.port ) ) );
145 0 : FD_TEST( request_sz > peer->request_bytes_sent );
146 :
147 0 : long written = sendto( client->pollfds[ conn_idx ].fd,
148 0 : request+peer->request_bytes_sent,
149 0 : request_sz-peer->request_bytes_sent,
150 0 : MSG_NOSIGNAL,
151 0 : NULL,
152 0 : 0 );
153 0 : if( FD_UNLIKELY( -1==written && errno==EAGAIN ) ) return; /* No data was written, continue. */
154 0 : else if( FD_UNLIKELY( -1==written ) ) {
155 0 : close_one( client, conn_idx );
156 0 : return;
157 0 : }
158 :
159 0 : peer->request_bytes_sent += (ulong)written;
160 0 : if( FD_UNLIKELY( peer->request_bytes_sent==request_sz ) ) {
161 0 : peer->writing = 0;
162 0 : peer->response_bytes_read = 0UL;
163 0 : }
164 0 : }
165 :
166 : static ulong
167 : rpc_phr_content_length( struct phr_header * headers,
168 0 : ulong num_headers ) {
169 0 : for( ulong i=0UL; i<num_headers; i++ ) {
170 0 : if( FD_LIKELY( headers[i].name_len!=14UL ) ) continue;
171 0 : if( FD_LIKELY( strncasecmp( headers[i].name, "Content-Length", 14UL ) ) ) continue;
172 0 : ulong content_length = 0UL;
173 0 : if( FD_UNLIKELY( fd_http_parse_content_len( headers[i].value, (ulong)headers[i].value_len, &content_length ) ) ) return ULONG_MAX;
174 0 : if( FD_UNLIKELY( content_length>UINT_MAX ) ) return ULONG_MAX; /* prevent overflow */
175 0 : return content_length;
176 0 : }
177 0 : return ULONG_MAX;
178 0 : }
179 :
180 : static int
181 : read_conn( fd_genesis_client_t * client,
182 : ulong conn_idx,
183 : uchar ** buffer,
184 0 : ulong * buffer_sz ) {
185 0 : fd_genesis_client_peer_t * peer = &client->peers[ conn_idx ];
186 :
187 0 : if( FD_UNLIKELY( peer->writing ) ) return 1;
188 0 : long read = recvfrom( client->pollfds[ conn_idx ].fd,
189 0 : peer->response+peer->response_bytes_read,
190 0 : sizeof(peer->response)-peer->response_bytes_read,
191 0 : 0,
192 0 : NULL,
193 0 : NULL );
194 0 : if( FD_UNLIKELY( -1==read && (errno==EAGAIN || errno==EINTR) ) ) return 1;
195 0 : else if( FD_UNLIKELY( -1==read ) ) {
196 0 : close_one( client, conn_idx );
197 0 : return 1;
198 0 : }
199 :
200 0 : peer->response_bytes_read += (ulong)read;
201 :
202 0 : int minor_version;
203 0 : int status;
204 0 : const char * message;
205 0 : ulong message_len;
206 0 : struct phr_header headers[ 32 ];
207 0 : ulong num_headers = 32UL;
208 0 : int len = phr_parse_response( (char*)peer->response, peer->response_bytes_read,
209 0 : &minor_version, &status, &message, &message_len,
210 0 : headers, &num_headers, 0L );
211 0 : if( FD_UNLIKELY( -1==len ) ) {
212 0 : close_one( client, conn_idx );
213 0 : return 1;
214 0 : } else if( FD_UNLIKELY( -2==len ) ) {
215 0 : return 1;
216 0 : }
217 :
218 0 : if( FD_UNLIKELY( status!=200 ) ) {
219 0 : close_one( client, conn_idx );
220 0 : return 1;
221 0 : }
222 :
223 0 : ulong content_length = rpc_phr_content_length( headers, num_headers );
224 0 : if( FD_UNLIKELY( content_length==ULONG_MAX ) ) {
225 0 : close_one( client, conn_idx );
226 0 : return 1;
227 0 : }
228 0 : if( FD_UNLIKELY( content_length+(ulong)len>sizeof(peer->response) ) ) {
229 0 : close_one( client, conn_idx );
230 0 : return 1;
231 0 : }
232 0 : if( FD_LIKELY( content_length+(ulong)len>peer->response_bytes_read ) ) {
233 0 : return 1;
234 0 : }
235 :
236 0 : *buffer_sz = content_length;
237 0 : *buffer = peer->response + (ulong)len;
238 :
239 0 : uchar hash[ 32UL ] = {0};
240 0 : fd_sha256_hash( *buffer, *buffer_sz, hash );
241 :
242 0 : return 0;
243 0 : }
244 :
245 : int
246 : fd_genesis_client_poll( fd_genesis_client_t * client,
247 : fd_ip4_port_t * peer,
248 : uchar ** buffer,
249 : ulong * buffer_sz,
250 0 : int * charge_busy ) {
251 0 : if( FD_UNLIKELY( !client->remaining_peer_cnt ) ) return -1;
252 0 : long now = fd_log_wallclock();
253 0 : if( FD_UNLIKELY( LONG_MAX==client->start_time_nanos ) ) client->start_time_nanos = now;
254 0 : if( FD_UNLIKELY( now-client->start_time_nanos>20L*1000L*1000*1000L ) ) {
255 0 : close_all( client );
256 0 : return -1;
257 0 : }
258 :
259 0 : int nfds = fd_syscall_poll( client->pollfds, (uint)client->peer_cnt, 0 );
260 0 : if( FD_UNLIKELY( 0==nfds ) ) return 1;
261 0 : else if( FD_UNLIKELY( -1==nfds && errno==EINTR ) ) return 1;
262 0 : else if( FD_UNLIKELY( -1==nfds ) ) FD_LOG_ERR(( "poll() failed (%i-%s)", errno, strerror( errno ) ));
263 :
264 0 : for( ulong i=0UL; i<FD_TOPO_GOSSIP_ENTRYPOINTS_MAX; i++ ) {
265 0 : if( FD_UNLIKELY( -1==client->pollfds[ i ].fd ) ) continue;
266 :
267 0 : if( FD_LIKELY( client->pollfds[ i ].revents & POLLOUT ) ) {
268 0 : if( FD_UNLIKELY( client->peers[ i ].writing ) ) *charge_busy = 1;
269 0 : write_conn( client, i );
270 0 : }
271 0 : if( FD_UNLIKELY( -1==client->pollfds[ i ].fd ) ) continue;
272 0 : if( FD_LIKELY( client->pollfds[ i ].revents & POLLIN ) ) {
273 0 : if( FD_UNLIKELY( !client->peers[ i ].writing ) ) *charge_busy = 1;
274 0 : if( FD_LIKELY( !read_conn( client, i, buffer, buffer_sz ) ) ) {
275 0 : close_all( client );
276 0 : *peer = client->peers[ i ].addr;
277 0 : return 0;
278 0 : }
279 0 : }
280 0 : }
281 :
282 0 : return 1;
283 0 : }
284 :
285 : struct pollfd const *
286 0 : fd_genesis_client_get_pollfds( fd_genesis_client_t * client ) {
287 0 : return client->pollfds;
288 0 : }
|