Line data Source code
1 : #include "fd_snapshot_http.h"
2 : #include "../../waltz/http/picohttpparser.h"
3 : #include "fd_snapshot.h"
4 :
5 : #include <assert.h>
6 : #include <errno.h>
7 : #include <stdlib.h>
8 : #include <strings.h>
9 : #include <unistd.h>
10 : #include <netinet/in.h>
11 : #include <netinet/ip.h>
12 : #include <sys/socket.h>
13 : #include <sys/types.h>
14 : #include <sys/stat.h>
15 : #include <fcntl.h>
16 :
17 : /* fd_snapshot_http_set_path renders the 'GET /path' chunk of the HTTP
18 : request. The chunk is right aligned and is followed immediately by
19 : 'HTTP/1.1\r\n...' to form a contiguous message. */
20 :
21 : void
22 : fd_snapshot_http_set_path( fd_snapshot_http_t * this,
23 : char const * path,
24 : ulong path_len,
25 6 : ulong base_slot ) {
26 :
27 6 : if( FD_UNLIKELY( !path_len ) ) {
28 0 : path = "/";
29 0 : path_len = 1UL;
30 0 : }
31 :
32 6 : if( FD_UNLIKELY( path_len > FD_SNAPSHOT_HTTP_REQ_PATH_MAX ) ) {
33 0 : FD_LOG_CRIT(( "http: path too long (%lu chars)", path_len ));
34 0 : }
35 :
36 6 : if( FD_UNLIKELY( this->save_snapshot && this->snapshot_filename_max<path_len ) ) {
37 0 : FD_LOG_CRIT(( "http: path too long (%lu chars)", path_len ));
38 0 : }
39 :
40 6 : ulong off = sizeof(this->path) - path_len - 4;
41 6 : char * p = this->path + off;
42 :
43 6 : fd_memcpy( p, "GET ", 4UL );
44 6 : fd_memcpy( p+4, path, path_len );
45 :
46 6 : this->req_tail = (ushort)off;
47 6 : this->path_off = (ushort)off;
48 :
49 6 : this->base_slot = base_slot;
50 :
51 6 : if( FD_LIKELY( this->save_snapshot ) ) {
52 0 : fd_memcpy( this->snapshot_path + this->snapshot_filename_off, path, path_len );
53 0 : this->snapshot_path[ this->snapshot_filename_off + path_len ] = '\0';
54 0 : }
55 6 : }
56 :
57 : fd_snapshot_http_t *
58 : fd_snapshot_http_new( void * mem,
59 : const char * dst_str,
60 : uint dst_ipv4,
61 : ushort dst_port,
62 : const char * snapshot_dir,
63 6 : fd_snapshot_name_t * name_out ) {
64 :
65 6 : fd_snapshot_http_t * this = (fd_snapshot_http_t *)mem;
66 6 : if( FD_UNLIKELY( !this ) ) {
67 0 : FD_LOG_WARNING(( "NULL mem" ));
68 0 : return NULL;
69 0 : }
70 :
71 6 : fd_memset( this, 0, sizeof(fd_snapshot_http_t) );
72 6 : this->next_ipv4 = dst_ipv4;
73 6 : this->next_port = dst_port;
74 6 : this->socket_fd = -1;
75 6 : this->state = FD_SNAPSHOT_HTTP_STATE_INIT;
76 6 : this->req_timeout = 10e9; /* 10s */
77 6 : this->hops = FD_SNAPSHOT_HTTP_DEFAULT_HOPS;
78 6 : this->name_out = name_out;
79 6 : if( !this->name_out ) this->name_out = this->name_dummy;
80 6 : fd_memset( this->name_out, 0, sizeof(fd_snapshot_name_t) );
81 :
82 6 : ulong snapshot_dir_len = snapshot_dir!=NULL ? strlen( snapshot_dir ) : 0UL;
83 6 : if( FD_LIKELY( snapshot_dir_len && snapshot_dir_len<sizeof(this->snapshot_path) ) ) {
84 0 : strcpy( this->snapshot_path, snapshot_dir );
85 0 : this->snapshot_path[ snapshot_dir_len ] = '/';
86 0 : this->snapshot_path[ snapshot_dir_len + 1UL ] = '\0';
87 :
88 0 : this->save_snapshot = 1UL;
89 0 : this->snapshot_filename_max = sizeof(this->snapshot_path) - snapshot_dir_len - 2UL;
90 0 : this->snapshot_filename_off = snapshot_dir_len + 1UL;
91 6 : } else {
92 6 : this->save_snapshot = 0UL;
93 6 : this->snapshot_filename_max = 0UL;
94 6 : }
95 6 : this->snapshot_fd = -1;
96 :
97 : /* Right-aligned render the request path */
98 :
99 6 : static char const default_path[] = "/snapshot.tar.bz2";
100 6 : fd_snapshot_http_set_path( this, default_path, sizeof(default_path)-1, 0UL );
101 :
102 : /* Left-aligned render the headers, completing the message */
103 :
104 6 : char * p = fd_cstr_init( this->req_hdrs );
105 6 : static char const hdr_part1[] =
106 6 : " HTTP/1.1\r\n"
107 6 : "user-agent: Firedancer\r\n"
108 6 : "accept: */*\r\n"
109 6 : "accept-encoding: identity\r\n"
110 6 : "host: ";
111 6 : p = fd_cstr_append_text( p, hdr_part1, sizeof(hdr_part1)-1 );
112 :
113 6 : p = fd_cstr_append_text( p, dst_str, strlen(dst_str) );
114 :
115 6 : static char const hdr_part2[] =
116 6 : "\r\n"
117 6 : "\r\n";
118 6 : p = fd_cstr_append_text( p, hdr_part2, sizeof(hdr_part2)-1 );
119 :
120 6 : this->req_head = (ushort)( p - this->req_buf );
121 :
122 6 : return this;
123 6 : }
124 :
125 : static void
126 6 : fd_snapshot_http_cleanup_fds( fd_snapshot_http_t * this ) {
127 6 : if( this->snapshot_fd!=-1 ) {
128 0 : close( this->snapshot_fd );
129 0 : this->snapshot_fd = -1;
130 0 : }
131 6 : if( this->socket_fd!=-1 ) {
132 3 : close( this->socket_fd );
133 3 : this->socket_fd = -1;
134 3 : }
135 6 : }
136 :
137 : void *
138 6 : fd_snapshot_http_delete( fd_snapshot_http_t * this ) {
139 6 : if( FD_UNLIKELY( !this ) ) return NULL;
140 6 : fd_snapshot_http_cleanup_fds( this );
141 6 : return (void *)this;
142 6 : }
143 :
144 : /* fd_snapshot_http_init gets called the first time an object is polled
145 : for snapshot data. Creates a new outgoing TCP connection. */
146 :
147 : static int
148 0 : fd_snapshot_http_init( fd_snapshot_http_t * this ) {
149 :
150 0 : FD_LOG_NOTICE(( "Connecting to " FD_IP4_ADDR_FMT ":%u ...",
151 0 : FD_IP4_ADDR_FMT_ARGS( this->next_ipv4 ), this->next_port ));
152 :
153 0 : this->req_deadline = fd_log_wallclock() + this->req_timeout;
154 :
155 0 : this->socket_fd = socket( AF_INET, SOCK_STREAM, 0 );
156 0 : if( FD_UNLIKELY( this->socket_fd < 0 ) ) {
157 0 : FD_LOG_WARNING(( "socket(AF_INET, SOCK_STREAM, 0) failed (%d-%s)",
158 0 : errno, fd_io_strerror( errno ) ));
159 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
160 0 : return errno;
161 0 : }
162 :
163 0 : int optval = 4*FD_SNAPSHOT_HTTP_RESP_BUF_MAX;
164 0 : if( setsockopt( this->socket_fd, SOL_SOCKET, SO_RCVBUF, (char *)&optval, sizeof(int) ) < 0 ) {
165 0 : FD_LOG_WARNING(( "setsockopt failed (%d-%s)",
166 0 : errno, fd_io_strerror( errno ) ));
167 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
168 0 : return errno;
169 0 : }
170 :
171 0 : struct sockaddr_in addr = {
172 0 : .sin_family = AF_INET,
173 0 : .sin_addr = { .s_addr = this->next_ipv4 },
174 0 : .sin_port = fd_ushort_bswap( this->next_port ),
175 0 : };
176 :
177 : /* TODO consider using O_NONBLOCK socket so we can control the
178 : connect timeout interval*/
179 :
180 0 : if( 0!=connect( this->socket_fd, fd_type_pun_const( &addr ), sizeof(struct sockaddr_in) ) ) {
181 0 : FD_LOG_WARNING(( "connect(%d," FD_IP4_ADDR_FMT ":%u) failed (%d-%s)",
182 0 : this->socket_fd,
183 0 : FD_IP4_ADDR_FMT_ARGS( this->next_ipv4 ), this->next_port,
184 0 : errno, fd_io_strerror( errno ) ));
185 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
186 0 : return errno;
187 0 : }
188 :
189 0 : FD_LOG_INFO(( "Sending request" ));
190 :
191 0 : this->state = FD_SNAPSHOT_HTTP_STATE_REQ;
192 0 : return 0;
193 0 : }
194 :
195 : /* fd_snapshot_http_req writes out the request. */
196 :
197 : static int
198 3 : fd_snapshot_http_req( fd_snapshot_http_t * this ) {
199 :
200 3 : long now = fd_log_wallclock();
201 3 : long deadline = this->req_deadline;
202 :
203 3 : if( FD_UNLIKELY( now > deadline ) ) {
204 0 : FD_LOG_WARNING(( "Timed out while sending request." ));
205 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
206 0 : return ETIMEDOUT;
207 0 : }
208 :
209 3 : int socket_fd = this->socket_fd;
210 :
211 3 : uint avail_sz = (uint)this->req_head - (uint)this->req_tail;
212 3 : assert( avail_sz < sizeof(this->req_buf) );
213 0 : long sent_sz = send( socket_fd, this->req_buf + this->req_tail, avail_sz, MSG_DONTWAIT|MSG_NOSIGNAL );
214 3 : if( sent_sz<0L ) {
215 0 : if( FD_UNLIKELY( errno!=EWOULDBLOCK ) ) {
216 0 : FD_LOG_WARNING(( "send(%d,%p,%u) failed (%d-%s)",
217 0 : socket_fd, (void *)(this->req_buf + this->req_tail), avail_sz,
218 0 : errno, fd_io_strerror( errno ) ));
219 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
220 0 : return errno;
221 0 : } else {
222 0 : return 0;
223 0 : }
224 0 : }
225 :
226 3 : this->req_tail = (ushort)( this->req_tail + (uint)sent_sz );
227 3 : if( this->req_tail == this->req_head )
228 3 : this->state = FD_SNAPSHOT_HTTP_STATE_RESP;
229 :
230 3 : return 0;
231 3 : }
232 :
233 : /* fd_snapshot_http_follow_redirect winds up the state machine for a
234 : redirect. */
235 :
236 : static int
237 : fd_snapshot_http_follow_redirect( fd_snapshot_http_t * this,
238 : struct phr_header const * headers,
239 0 : ulong header_cnt ) {
240 :
241 0 : assert( this->hops > 0 );
242 0 : this->hops--;
243 :
244 : /* Look for location header */
245 :
246 0 : char const * loc = NULL;
247 0 : ulong loc_len;
248 0 : for( ulong i = 0; i<header_cnt; i++ ) {
249 0 : if( 0==strncasecmp( headers[i].name, "location", headers[i].name_len ) ) {
250 0 : loc = headers[i].value;
251 0 : loc_len = headers[i].value_len;
252 0 : break;
253 0 : }
254 0 : }
255 0 : if( FD_UNLIKELY( !loc ) ) {
256 0 : FD_LOG_WARNING(( "Invalid redirect (no location header)" ));
257 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
258 0 : return EINVAL;
259 0 : }
260 :
261 : /* Validate character set (TODO too restrictive?) */
262 :
263 0 : if( FD_UNLIKELY( loc_len > FD_SNAPSHOT_HTTP_REQ_PATH_MAX ) ) {
264 0 : FD_LOG_WARNING(( "Redirect location too long" ));
265 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
266 0 : return EINVAL;
267 0 : }
268 0 : if( FD_UNLIKELY( loc_len==0 || loc[0] != '/' ) ) {
269 0 : FD_LOG_WARNING(( "Redirect is not an absolute path on the current host. Refusing to follow." ));
270 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
271 0 : return EPROTO;
272 0 : }
273 0 : for( ulong j=0UL; j<loc_len; j++ ) {
274 0 : int c = loc[j];
275 0 : int c_ok = ( (c>='a') & (c<='z') ) |
276 0 : ( (c>='A') & (c<='Z') ) |
277 0 : ( (c>='0') & (c<='9') ) |
278 0 : (c=='.') | (c=='/') | (c=='-') | (c=='_') |
279 0 : (c=='+') | (c=='=') | (c=='&') | (c=='~') |
280 0 : (c=='%') | (c=='#');
281 0 : if( FD_UNLIKELY( !c_ok ) ) {
282 0 : FD_LOG_WARNING(( "Invalid char '0x%02x' in redirect location", (uint)c ));
283 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
284 0 : return EPROTO;
285 0 : }
286 0 : }
287 :
288 : /* Re-initialize */
289 :
290 0 : FD_LOG_NOTICE(( "Following redirect to %.*s", (int)loc_len, loc ));
291 :
292 0 : if( FD_UNLIKELY( !fd_snapshot_name_from_buf( this->name_out, loc, loc_len ) ) ) {
293 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
294 0 : return EPROTO;
295 0 : }
296 0 : if( FD_UNLIKELY( this->base_slot!=ULONG_MAX && fd_snapshot_name_slot_validate( this->name_out, this->base_slot ) ) ) {
297 0 : FD_LOG_WARNING(( "Cannot validate snapshot based on name. This likely indicates that the full snapsnot is stale and that the incremental snapshot is based on a newer slot." ));
298 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
299 0 : return EINVAL;
300 0 : }
301 :
302 0 : fd_snapshot_http_set_path( this, loc, loc_len, this->base_slot );
303 :
304 0 : this->req_deadline = fd_log_wallclock() + this->req_timeout;
305 0 : this->state = FD_SNAPSHOT_HTTP_STATE_REQ;
306 0 : this->resp_tail = 0U;
307 0 : this->resp_head = 0U;
308 :
309 0 : return 0;
310 0 : }
311 :
312 : /* fd_snapshot_http_resp waits for response headers. */
313 :
314 : static int
315 3 : fd_snapshot_http_resp( fd_snapshot_http_t * this ) {
316 3 : long now = fd_log_wallclock();
317 3 : long deadline = this->req_deadline;
318 :
319 3 : if( FD_UNLIKELY( now > deadline ) ) {
320 0 : FD_LOG_WARNING(( "Timed out while receiving response headers." ));
321 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
322 0 : return ETIMEDOUT;
323 0 : }
324 :
325 : /* Do blocking read of TCP data until timeout */
326 :
327 3 : int socket_fd = this->socket_fd;
328 :
329 3 : uchar * next = this->resp_buf + this->resp_head;
330 3 : ulong bufsz = FD_SNAPSHOT_HTTP_RESP_BUF_MAX - this->resp_head;
331 3 : assert( this->resp_head <= FD_SNAPSHOT_HTTP_RESP_BUF_MAX );
332 :
333 0 : long recv_sz = recv( socket_fd, next, bufsz, MSG_DONTWAIT );
334 3 : if( recv_sz<0L ) {
335 0 : if( FD_UNLIKELY( errno!=EWOULDBLOCK ) ) {
336 0 : FD_LOG_WARNING(( "recv(%d,%p,%lu) failed (%d-%s)",
337 0 : socket_fd, (void *)next, bufsz,
338 0 : errno, fd_io_strerror( errno ) ));
339 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
340 0 : return errno;
341 0 : } else {
342 0 : return 0;
343 0 : }
344 3 : } else if( recv_sz==0L ) {
345 0 : return 0;
346 0 : }
347 :
348 : /* Attempt to parse response. (Might fail due to incomplete response) */
349 :
350 3 : ulong last_len = this->resp_head;
351 3 : this->resp_head += (uint)recv_sz;
352 3 : assert( this->resp_head <= FD_SNAPSHOT_HTTP_RESP_BUF_MAX );
353 :
354 0 : int minor_version;
355 3 : int status;
356 3 : char const * msg_start;
357 3 : ulong msg_len;
358 3 : struct phr_header headers[ FD_SNAPSHOT_HTTP_RESP_HDR_CNT ];
359 3 : ulong header_cnt = FD_SNAPSHOT_HTTP_RESP_HDR_CNT;
360 3 : int parse_res =
361 3 : phr_parse_response( (const char *)this->resp_buf,
362 3 : this->resp_head,
363 3 : &minor_version,
364 3 : &status,
365 3 : &msg_start,
366 3 : &msg_len,
367 3 : headers,
368 3 : &header_cnt,
369 3 : last_len );
370 :
371 3 : if( FD_UNLIKELY( parse_res==-1 ) ) {
372 0 : FD_LOG_HEXDUMP_NOTICE(( "Failed HTTP response", this->resp_buf, this->resp_head ));
373 0 : FD_LOG_WARNING(( "Failed to parse HTTP response." ));
374 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
375 0 : return EPROTO;
376 0 : }
377 :
378 3 : if( parse_res==-2 ) return 0; /* response headers incomplete */
379 3 : assert( parse_res>=0 );
380 :
381 : /* OK, we parsed the response headers.
382 : Remember where the leftover tail started so we can later reuse it
383 : during response reading. */
384 :
385 0 : this->resp_tail = (uint)parse_res;
386 :
387 : /* Is it a redirect? If so, start over. */
388 :
389 3 : int is_redirect = (int)( (status==301) | (status==303) |
390 3 : (status==304) | (status==307) |
391 3 : (status==308) );
392 3 : if( FD_UNLIKELY( (!this->hops) & (is_redirect) ) ) {
393 0 : FD_LOG_WARNING(( "Too many redirects. Aborting." ));
394 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
395 0 : return ELOOP;
396 0 : }
397 :
398 3 : if( is_redirect ) {
399 0 : FD_LOG_NOTICE(( "Redirecting due to code %d", status ));
400 0 : return fd_snapshot_http_follow_redirect( this, headers, header_cnt );
401 0 : }
402 :
403 : /* Validate response header */
404 :
405 3 : if( FD_UNLIKELY( status!=200 ) ) {
406 0 : FD_LOG_WARNING(( "Unexpected HTTP status %d", status ));
407 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
408 0 : return EPROTO;
409 0 : }
410 :
411 : /* Find content-length */
412 :
413 3 : this->content_len = ULONG_MAX;
414 3 : const ulong target_len = sizeof("content-length")-1;
415 9 : for( ulong i = 0; i < header_cnt; ++i ) {
416 9 : if( headers[i].name_len==target_len && strncasecmp( headers[i].name, "content-length", target_len ) == 0 ) {
417 3 : this->content_len = strtoul( headers[i].value, NULL, 10 );
418 3 : break;
419 3 : }
420 9 : }
421 3 : if( this->content_len == ULONG_MAX ) {
422 0 : FD_LOG_WARNING(( "Missing content-length" ));
423 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
424 0 : return EPROTO;
425 0 : }
426 :
427 : /* Start downloading */
428 :
429 3 : if( FD_UNLIKELY( this->name_out->type == FD_SNAPSHOT_TYPE_UNSPECIFIED ) ) {
430 : /* We must not have followed a redirect. Try to parse here. */
431 3 : ulong off = (ulong)this->path_off + 4;
432 3 : if( FD_UNLIKELY( !fd_snapshot_name_from_buf( this->name_out, this->path + off, sizeof(this->path) - off ) ) ) {
433 3 : FD_LOG_WARNING(( "Cannot download, snapshot hash is unknown" ));
434 3 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
435 3 : return EINVAL;
436 3 : }
437 0 : if( FD_UNLIKELY( this->base_slot!=ULONG_MAX && fd_snapshot_name_slot_validate( this->name_out, this->base_slot ) ) ) {
438 0 : FD_LOG_WARNING(( "Cannot validate snapshot based on name. This likely indicates that the full snapsnot is stale and that the incremental snapshot is based on a newer slot." ));
439 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
440 0 : return EINVAL;
441 0 : }
442 0 : }
443 :
444 0 : if( FD_UNLIKELY( !this->save_snapshot ) ) {
445 0 : FD_LOG_NOTICE(( "snapshot will be downloaded into in-memory buffer rather than file" ));
446 0 : this->state = FD_SNAPSHOT_HTTP_STATE_DL;
447 0 : return 0;
448 0 : }
449 :
450 0 : struct stat sb;
451 0 : if ( FD_LIKELY( stat( this->snapshot_path, &sb )==0 ) ) {
452 0 : ulong file_len = (ulong)sb.st_size;
453 0 : if( FD_LIKELY( file_len==this->content_len ) ) {
454 0 : FD_LOG_NOTICE(( "snapshot file %s size %lu is equal to response content-length %lu so skipping download; manually remove the snapshot file if it's corrupted", this->snapshot_path, file_len, this->content_len ));
455 0 : this->snapshot_fd = open( this->snapshot_path, O_RDONLY );
456 0 : if( FD_UNLIKELY( this->snapshot_fd<0 ) ) {
457 0 : FD_LOG_WARNING(( "open(%s) failed (%d-%s)", this->snapshot_path, errno, fd_io_strerror( errno ) ));
458 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
459 0 : return EACCES;
460 0 : }
461 0 : this->state = FD_SNAPSHOT_HTTP_STATE_READ;
462 0 : return 0;
463 0 : }
464 0 : if( FD_UNLIKELY( file_len>this->content_len ) ) {
465 0 : FD_LOG_WARNING(( "snapshot file %s size %lu is larger than response content-length %lu !? Manually remove the snapshot file if it's corrupted", this->snapshot_path, file_len, this->content_len ));
466 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
467 0 : return EINVAL;
468 0 : }
469 0 : if( FD_UNLIKELY( file_len<this->content_len ) ) {
470 0 : FD_LOG_NOTICE(( "snapshot file %s size %lu is smaller than response content-length %lu likely due to partial download; attempting re-download", this->snapshot_path, file_len, this->content_len ));
471 0 : this->snapshot_fd = open( this->snapshot_path, O_WRONLY|O_TRUNC );
472 0 : if( FD_UNLIKELY( this->snapshot_fd<0 ) ) {
473 0 : FD_LOG_WARNING(( "open(%s) failed (%d-%s)", this->snapshot_path, errno, fd_io_strerror( errno ) ));
474 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
475 0 : return EACCES;
476 0 : }
477 0 : this->state = FD_SNAPSHOT_HTTP_STATE_DL;
478 0 : return 0;
479 0 : }
480 0 : __builtin_unreachable();
481 0 : } else if( FD_LIKELY( errno==ENOENT ) ) {
482 0 : FD_LOG_NOTICE(( "snapshot will be downloaded into file %s", this->snapshot_path ));
483 0 : this->snapshot_fd = open( this->snapshot_path, O_WRONLY|O_CREAT, S_IRUSR|S_IWUSR );
484 0 : if( FD_UNLIKELY( this->snapshot_fd<0 ) ) {
485 0 : FD_LOG_WARNING(( "open(%s) failed (%d-%s)", this->snapshot_path, errno, fd_io_strerror( errno ) ));
486 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
487 0 : return EACCES;
488 0 : }
489 0 : this->state = FD_SNAPSHOT_HTTP_STATE_DL;
490 0 : return 0;
491 0 : } else {
492 0 : FD_LOG_WARNING(( "cannot stat snapshot file %s: (%d-%s)", this->snapshot_path, errno, fd_io_strerror( errno ) ));
493 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
494 0 : return EACCES;
495 0 : }
496 0 : }
497 :
498 : /* fd_snapshot_http_dl downloads bytes and returns them to the caller.
499 : No timeout set here. */
500 :
501 : static int
502 : fd_snapshot_http_dl( fd_snapshot_http_t * this,
503 : void * dst,
504 : ulong dst_max,
505 0 : ulong * dst_sz ) {
506 :
507 0 : if( FD_UNLIKELY( this->state!=FD_SNAPSHOT_HTTP_STATE_DL ) ) {
508 0 : FD_LOG_CRIT(( "invalid state %d", this->state ));
509 0 : }
510 :
511 0 : if( this->resp_head == this->resp_tail ) {
512 0 : if( this->content_len == this->dl_total ) {
513 0 : FD_LOG_NOTICE(( "download already complete at %lu MB", this->dl_total>>20 ));
514 0 : return -1;
515 0 : }
516 0 : this->resp_tail = this->resp_head = 0U;
517 0 : long recv_sz = recv( this->socket_fd, this->resp_buf,
518 0 : fd_ulong_min( this->content_len - this->dl_total, FD_SNAPSHOT_HTTP_RESP_BUF_MAX ),
519 0 : MSG_DONTWAIT );
520 0 : if( recv_sz<0L ) {
521 0 : if( FD_UNLIKELY( errno!=EWOULDBLOCK && errno!=EAGAIN ) ) {
522 0 : FD_LOG_WARNING(( "recv(%d,%p,%lu) failed while downloading response body (%d-%s)",
523 0 : this->socket_fd, (void *)this->resp_buf, FD_SNAPSHOT_HTTP_RESP_BUF_MAX,
524 0 : errno, fd_io_strerror( errno ) ));
525 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
526 0 : fd_snapshot_http_cleanup_fds( this );
527 0 : return errno;
528 0 : } else {
529 0 : return 0;
530 0 : }
531 0 : }
532 0 : if( !recv_sz ) { /* Connection closed */
533 0 : FD_LOG_WARNING(( "connection closed at %lu MB", this->dl_total>>20 ));
534 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
535 0 : fd_snapshot_http_cleanup_fds( this );
536 0 : return -1;
537 0 : }
538 0 : this->resp_head = (uint)recv_sz;
539 0 : #define DL_PERIOD (100UL<<20)
540 0 : static ulong x = 0;
541 0 : static ulong last_dl_total;
542 0 : static long last_nanos;
543 0 : this->dl_total += (ulong)recv_sz;
544 0 : if( x != this->dl_total/DL_PERIOD ) {
545 :
546 0 : FD_LOG_NOTICE(( "downloaded %lu MB (%lu%%) ...",
547 0 : this->dl_total>>20U, 100UL*this->dl_total/this->content_len ));
548 0 : x = this->dl_total/DL_PERIOD;
549 0 : if( FD_LIKELY( x >= 2UL ) ) {
550 0 : ulong dl_delta = this->dl_total - last_dl_total;
551 0 : ulong nanos_delta = (ulong)(fd_log_wallclock() - last_nanos);
552 0 : FD_LOG_NOTICE(( "estimate %lu MB/s", dl_delta*1000UL/nanos_delta ));
553 0 : }
554 0 : last_dl_total = this->dl_total;
555 0 : last_nanos = fd_log_wallclock();
556 0 : }
557 0 : if( this->content_len <= this->dl_total ) {
558 0 : FD_LOG_NOTICE(( "download complete at %lu MB", this->dl_total>>20 ));
559 0 : close( this->socket_fd );
560 0 : this->socket_fd = -1;
561 0 : if( FD_UNLIKELY( this->content_len < this->dl_total ) ) {
562 0 : FD_LOG_WARNING(( "server transmitted more than Content-Length %lu bytes vs %lu bytes", this->content_len, this->dl_total ));
563 0 : }
564 0 : }
565 0 : }
566 :
567 0 : uint avail_sz = this->resp_head - this->resp_tail;
568 0 : if( FD_UNLIKELY( this->dl_total==0UL ) ) {
569 0 : this->dl_total = avail_sz;
570 0 : }
571 0 : ulong write_sz = fd_ulong_min( avail_sz, dst_max );
572 0 : fd_memcpy( dst, this->resp_buf + this->resp_tail, write_sz );
573 0 : *dst_sz = write_sz;
574 0 : if( this->snapshot_fd!=-1 ) {
575 0 : ulong src_sz;
576 0 : int err = fd_io_write( this->snapshot_fd, this->resp_buf + this->resp_tail, write_sz, write_sz, &src_sz );
577 0 : if( FD_UNLIKELY( err!=0 ) ) {
578 0 : FD_LOG_WARNING(( "fd_io_write() failed (%d-%s) requested %lu bytes and wrote %lu bytes", err, fd_io_strerror( err ), write_sz, src_sz ));
579 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
580 0 : fd_snapshot_http_cleanup_fds( this );
581 0 : return err;
582 0 : }
583 0 : }
584 0 : this->resp_tail += (uint)write_sz;
585 0 : this->write_total += write_sz;
586 0 : if( this->content_len == this->write_total ) {
587 0 : FD_LOG_NOTICE(( "wrote out all %lu MB", this->write_total>>20 ));
588 0 : this->state = FD_SNAPSHOT_HTTP_STATE_DONE;
589 0 : fd_snapshot_http_cleanup_fds( this );
590 0 : }
591 :
592 0 : return 0;
593 0 : }
594 :
595 : /* fd_snapshot_http_read reads bytes from a pre-existing snapshot file
596 : and returns them to the caller. */
597 :
598 : static int
599 : fd_snapshot_http_read( fd_snapshot_http_t * this,
600 : void * dst,
601 : ulong dst_max,
602 0 : ulong * dst_sz ) {
603 :
604 0 : if( FD_UNLIKELY( this->state!=FD_SNAPSHOT_HTTP_STATE_READ ) ) {
605 0 : FD_LOG_CRIT(( "invalid state %d", this->state ));
606 0 : }
607 :
608 0 : ulong src_sz;
609 0 : ulong write_sz = fd_ulong_min( this->content_len-this->write_total, dst_max );
610 0 : int err = fd_io_read( this->snapshot_fd, dst, write_sz, write_sz, &src_sz );
611 0 : if( FD_UNLIKELY( err!=0 ) ) {
612 0 : FD_LOG_WARNING(( "fd_io_read() failed (%d-%s) requested %lu bytes and read %lu bytes", err, fd_io_strerror( err ), write_sz, src_sz ));
613 0 : this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
614 0 : fd_snapshot_http_cleanup_fds( this );
615 0 : return err;
616 0 : }
617 0 : *dst_sz = write_sz;
618 :
619 0 : this->write_total += write_sz;
620 0 : if( this->content_len == this->write_total ) {
621 0 : FD_LOG_NOTICE(( "wrote out all %lu MB", this->write_total>>20 ));
622 0 : this->state = FD_SNAPSHOT_HTTP_STATE_DONE;
623 0 : fd_snapshot_http_cleanup_fds( this );
624 0 : }
625 :
626 0 : return 0;
627 0 : }
628 :
629 : /* fd_snapshot_http_req gets called when we are ready to send our HTTP
630 : request for the snapshot to the server. */
631 :
632 : int
633 : fd_io_istream_snapshot_http_read( void * _this,
634 : void * dst,
635 : ulong dst_max,
636 6 : ulong * dst_sz ) {
637 :
638 6 : fd_snapshot_http_t * this = (fd_snapshot_http_t *)_this;
639 :
640 6 : int err = 0;
641 6 : switch( this->state ) {
642 0 : case FD_SNAPSHOT_HTTP_STATE_INIT:
643 0 : err = fd_snapshot_http_init( this );
644 0 : break;
645 3 : case FD_SNAPSHOT_HTTP_STATE_REQ:
646 3 : err = fd_snapshot_http_req( this );
647 3 : break;
648 3 : case FD_SNAPSHOT_HTTP_STATE_RESP:
649 3 : err = fd_snapshot_http_resp( this );
650 3 : break;
651 0 : case FD_SNAPSHOT_HTTP_STATE_DL:
652 0 : return fd_snapshot_http_dl( this, dst, dst_max, dst_sz );
653 0 : case FD_SNAPSHOT_HTTP_STATE_READ:
654 0 : return fd_snapshot_http_read( this, dst, dst_max, dst_sz );
655 6 : }
656 :
657 : /* Not yet ready to read at this point. */
658 :
659 6 : *dst_sz = 0UL;
660 6 : return err;
661 6 : }
662 :
663 : fd_io_istream_vt_t const fd_io_istream_snapshot_http_vt = {
664 : .read = fd_io_istream_snapshot_http_read,
665 : };
|