Line data    Source code 
       1             : #include "fd_genesis_client.h"
       2             : 
       3             : #include "../../waltz/http/picohttpparser.h"
       4             : #include "../../disco/topo/fd_topo.h"
       5             : #include "../../util/fd_util.h"
       6             : #include "../../ballet/sha256/fd_sha256.h"
       7             : 
       8             : #include <errno.h>
       9             : #include <netinet/in.h>
      10             : #include <string.h>
      11             : #include <strings.h>
      12             : #include <sys/socket.h>
      13             : #include <unistd.h>
      14             : #include <sys/poll.h>
      15             : #include <stdlib.h>
      16             : 
      17             : struct fd_genesis_client_peer {
      18             :   fd_ip4_port_t addr;
      19             : 
      20             :   int writing;
      21             :   ulong request_bytes_sent;
      22             :   ulong response_bytes_read;
      23             :   uchar response[ 10UL*1024UL*1024UL ]; /* 10 MiB max response */
      24             : };
      25             : 
      26             : typedef struct fd_genesis_client_peer fd_genesis_client_peer_t;
      27             : 
      28             : struct fd_genesis_client_private {
      29             :   long start_time_nanos;
      30             :   ulong peer_cnt;
      31             :   ulong remaining_peer_cnt;
      32             : 
      33             :   struct pollfd pollfds[ FD_TOPO_GOSSIP_ENTRYPOINTS_MAX ];
      34             :   fd_genesis_client_peer_t peers[ FD_TOPO_GOSSIP_ENTRYPOINTS_MAX ];
      35             : 
      36             :   ulong magic;
      37             : };
      38             : 
      39             : FD_FN_CONST ulong
      40           0 : fd_genesis_client_align( void ) {
      41           0 :   return alignof(fd_genesis_client_t);
      42           0 : }
      43             : 
      44             : FD_FN_CONST ulong
      45           0 : fd_genesis_client_footprint( void ) {
      46           0 :   return sizeof(fd_genesis_client_t);
      47           0 : }
      48             : 
      49             : void *
      50           0 : fd_genesis_client_new( void * shmem ) {
      51           0 :   fd_genesis_client_t * gen = (fd_genesis_client_t *)shmem;
      52             : 
      53           0 :   if( FD_UNLIKELY( !shmem ) ) {
      54           0 :     FD_LOG_WARNING(( "NULL shmem" ));
      55           0 :     return NULL;
      56           0 :   }
      57             : 
      58           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_genesis_client_align() ) ) ) {
      59           0 :     FD_LOG_WARNING(( "misaligned shmem" ));
      60           0 :     return NULL;
      61           0 :   }
      62             : 
      63           0 :   FD_COMPILER_MFENCE();
      64           0 :   FD_VOLATILE( gen->magic ) = FD_GENESIS_CLIENT_MAGIC;
      65           0 :   FD_COMPILER_MFENCE();
      66             : 
      67           0 :   return (void *)gen;
      68           0 : }
      69             : 
      70             : fd_genesis_client_t *
      71           0 : fd_genesis_client_join( void * shgen ) {
      72           0 :   if( FD_UNLIKELY( !shgen ) ) {
      73           0 :     FD_LOG_WARNING(( "NULL shgen" ));
      74           0 :     return NULL;
      75           0 :   }
      76             : 
      77           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shgen, fd_genesis_client_align() ) ) ) {
      78           0 :     FD_LOG_WARNING(( "misaligned shgen" ));
      79           0 :     return NULL;
      80           0 :   }
      81             : 
      82           0 :   fd_genesis_client_t * gen = (fd_genesis_client_t *)shgen;
      83             : 
      84           0 :   if( FD_UNLIKELY( gen->magic!=FD_GENESIS_CLIENT_MAGIC ) ) {
      85           0 :     FD_LOG_WARNING(( "bad magic" ));
      86           0 :     return NULL;
      87           0 :   }
      88             : 
      89           0 :   return gen;
      90           0 : }
      91             : 
      92             : void
      93             : fd_genesis_client_init( fd_genesis_client_t * client,
      94             :                         fd_ip4_port_t const * servers,
      95           0 :                         ulong                 servers_len ) {
      96           0 :   FD_TEST( servers_len<=FD_TOPO_GOSSIP_ENTRYPOINTS_MAX );
      97           0 :   ulong peer_cnt = 0UL;
      98             : 
      99           0 :   for( ulong i=0UL; i<servers_len; i++ ) {
     100           0 :     fd_ip4_port_t server = servers[ i ];
     101           0 :     server.port = 8899;  // TODO: SPECIFY IN CONFIG
     102             : 
     103           0 :     int sockfd = socket( AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0 );
     104           0 :     if( FD_UNLIKELY( -1==sockfd ) ) FD_LOG_ERR(( "socket() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
     105             : 
     106           0 :     struct sockaddr_in addr = {
     107           0 :       .sin_family = AF_INET,
     108           0 :       .sin_port   = fd_ushort_bswap( server.port ),
     109           0 :       .sin_addr   = { .s_addr = server.addr }
     110           0 :     };
     111             : 
     112           0 :     if( FD_UNLIKELY( -1==connect( sockfd, fd_type_pun( &addr ), sizeof(addr) ) && errno!=EINPROGRESS ) ) {
     113           0 :       if( FD_UNLIKELY( -1==close( sockfd ) ) ) FD_LOG_ERR(( "close() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
     114           0 :       continue;
     115           0 :     }
     116             : 
     117           0 :     client->pollfds[ peer_cnt ] = (struct pollfd){
     118           0 :       .fd = sockfd,
     119           0 :       .events = POLLIN | POLLOUT,
     120           0 :       .revents = 0
     121           0 :     };
     122           0 :     client->peers[ peer_cnt ].addr = server;
     123           0 :     client->peers[ peer_cnt ].writing = 1;
     124           0 :     client->peers[ peer_cnt ].request_bytes_sent = 0UL;
     125           0 :     client->peers[ peer_cnt ].response_bytes_read = 0UL;
     126           0 :     peer_cnt++;
     127           0 :   }
     128             : 
     129           0 :   for( ulong i=peer_cnt; i<FD_TOPO_GOSSIP_ENTRYPOINTS_MAX; i++ ) client->pollfds[ i ].fd = -1;
     130             : 
     131           0 :   client->peer_cnt = peer_cnt;
     132           0 :   client->remaining_peer_cnt = peer_cnt;
     133           0 :   client->start_time_nanos = fd_log_wallclock();
     134           0 : }
     135             : 
     136             : static void
     137             : close_one( fd_genesis_client_t * client,
     138           0 :            ulong                 idx ) {
     139           0 :   if( FD_UNLIKELY( -1==close( client->pollfds[ idx ].fd ) ) ) FD_LOG_ERR(( "close() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
     140           0 :   client->pollfds[ idx ].fd = -1;
     141           0 :   client->remaining_peer_cnt--;
     142           0 : }
     143             : 
     144             : static void
     145           0 : close_all( fd_genesis_client_t * client ) {
     146           0 :   for( ulong i=0UL; i<client->peer_cnt; i++ ) {
     147           0 :     if( FD_UNLIKELY( -1==client->pollfds[ i ].fd ) ) continue;
     148           0 :     close_one( client, i );
     149           0 :   }
     150           0 : }
     151             : 
     152             : static void
     153             : write_conn( fd_genesis_client_t * client,
     154           0 :             ulong                 conn_idx ) {
     155           0 :   fd_genesis_client_peer_t * peer = &client->peers[ conn_idx ];
     156             : 
     157           0 :   if( FD_LIKELY( !peer->writing ) ) return;
     158             : 
     159           0 :   char request[ 1024UL ];
     160           0 :   FD_TEST( fd_cstr_printf_check( request, sizeof(request), NULL, "GET /genesis.tar.bz2 HTTP/1.1\r\n"
     161           0 :                                                                  "Cache-Control: no-cache\r\n"
     162           0 :                                                                  "Connection: keep-alive\r\n"
     163           0 :                                                                  "Pragma: no-cache\r\n"
     164           0 :                                                                  "User-Agent: Firedancer\r\n"
     165           0 :                                                                  "Host: " FD_IP4_ADDR_FMT ":%hu\r\n\r\n",
     166           0 :                                                                  FD_IP4_ADDR_FMT_ARGS( peer->addr.addr ), fd_ushort_bswap( peer->addr.port ) ) );
     167             : 
     168           0 :   long written = sendto( client->pollfds[ conn_idx ].fd,
     169           0 :                          request+peer->request_bytes_sent,
     170           0 :                          sizeof(request)-peer->request_bytes_sent,
     171           0 :                          MSG_NOSIGNAL,
     172           0 :                          NULL,
     173           0 :                          0 );
     174           0 :   if( FD_UNLIKELY( -1==written && errno==EAGAIN ) ) return; /* No data was written, continue. */
     175           0 :   else if( FD_UNLIKELY( -1==written ) ) {
     176           0 :     close_one( client, conn_idx );
     177           0 :     return;
     178           0 :   }
     179             : 
     180           0 :   peer->request_bytes_sent += (ulong)written;
     181           0 :   if( FD_UNLIKELY( peer->request_bytes_sent==sizeof(request) ) ) {
     182           0 :     peer->writing = 0;
     183           0 :     peer->response_bytes_read = 0UL;
     184           0 :   }
     185           0 : }
     186             : 
     187             : static ulong
     188             : rpc_phr_content_length( struct phr_header * headers,
     189           0 :                         ulong               num_headers ) {
     190           0 :   for( ulong i=0UL; i<num_headers; i++ ) {
     191           0 :     if( FD_LIKELY( headers[i].name_len!=14UL ) ) continue;
     192           0 :     if( FD_LIKELY( strncasecmp( headers[i].name, "Content-Length", 14UL ) ) ) continue;
     193           0 :     char * end;
     194           0 :     ulong content_length = strtoul( headers[i].value, &end, 10 );
     195           0 :     if( FD_UNLIKELY( end==headers[i].value ) ) return ULONG_MAX;
     196           0 :     return content_length;
     197           0 :   }
     198           0 :   return ULONG_MAX;
     199           0 : }
     200             : 
     201             : static int
     202             : read_conn( fd_genesis_client_t * client,
     203             :            ulong                 conn_idx,
     204             :            uchar **              buffer,
     205           0 :            ulong *               buffer_sz ) {
     206           0 :   fd_genesis_client_peer_t * peer = &client->peers[ conn_idx ];
     207             : 
     208           0 :   if( FD_UNLIKELY( peer->writing ) ) return 1;
     209           0 :   long read = recvfrom( client->pollfds[ conn_idx ].fd,
     210           0 :                         peer->response+peer->response_bytes_read,
     211           0 :                         sizeof(peer->response)-peer->response_bytes_read,
     212           0 :                         0,
     213           0 :                         NULL,
     214           0 :                         NULL );
     215           0 :   if( FD_UNLIKELY( -1==read && (errno==EAGAIN || errno==EINTR) ) ) return 1;
     216           0 :   else if( FD_UNLIKELY( -1==read ) ) {
     217           0 :     close_one( client, conn_idx );
     218           0 :     return 1;
     219           0 :   }
     220             : 
     221           0 :   peer->response_bytes_read += (ulong)read;
     222             : 
     223           0 :   int minor_version;
     224           0 :   int status;
     225           0 :   const char * message;
     226           0 :   ulong message_len;
     227           0 :   struct phr_header headers[ 32 ];
     228           0 :   ulong num_headers = 32UL;
     229           0 :   int len = phr_parse_response( (char*)peer->response, peer->response_bytes_read,
     230           0 :                                 &minor_version, &status, &message, &message_len,
     231           0 :                                 headers, &num_headers, 0L );
     232           0 :   if( FD_UNLIKELY( -1==len ) ) {
     233           0 :     close_one( client, conn_idx );
     234           0 :     return 1;
     235           0 :   } else if( FD_UNLIKELY( -2==len ) ) {
     236           0 :     return 1;
     237           0 :   }
     238             : 
     239           0 :   if( FD_UNLIKELY( status!=200 ) ) {
     240           0 :     close_one( client, conn_idx );
     241           0 :     return 1;
     242           0 :   }
     243             : 
     244           0 :   ulong content_length = rpc_phr_content_length( headers, num_headers );
     245           0 :   if( FD_UNLIKELY( content_length==ULONG_MAX ) ) {
     246           0 :     close_one( client, conn_idx );
     247           0 :     return 1;
     248           0 :   }
     249           0 :   if( FD_UNLIKELY( content_length+(ulong)len>sizeof(peer->response) ) ) {
     250           0 :     close_one( client, conn_idx );
     251           0 :     return 1;
     252           0 :   }
     253           0 :   if( FD_LIKELY( content_length+(ulong)len>peer->response_bytes_read ) ) {
     254           0 :     return 1;
     255           0 :   }
     256             : 
     257           0 :   *buffer_sz = content_length;
     258           0 :   *buffer    = peer->response + (ulong)len;
     259             : 
     260           0 :   uchar hash[ 32UL ] = {0};
     261           0 :   fd_sha256_hash( *buffer, *buffer_sz, hash );
     262             : 
     263           0 :   return 0;
     264           0 : }
     265             : 
     266             : int
     267             : fd_genesis_client_poll( fd_genesis_client_t * client,
     268             :                         fd_ip4_port_t *       peer,
     269             :                         uchar **              buffer,
     270             :                         ulong *               buffer_sz,
     271           0 :                         int *                 charge_busy ) {
     272           0 :   if( FD_UNLIKELY( !client->remaining_peer_cnt ) ) return -1;
     273           0 :   if( FD_UNLIKELY( fd_log_wallclock()-client->start_time_nanos>20L*1000L*1000*1000L ) ) {
     274           0 :     close_all( client );
     275           0 :     return -1;
     276           0 :   }
     277             : 
     278           0 :   int nfds = fd_syscall_poll( client->pollfds, (uint)client->peer_cnt, 0 );
     279           0 :   if( FD_UNLIKELY( 0==nfds ) ) return 1;
     280           0 :   else if( FD_UNLIKELY( -1==nfds && errno==EINTR ) ) return 1;
     281           0 :   else if( FD_UNLIKELY( -1==nfds ) ) FD_LOG_ERR(( "poll() failed (%i-%s)", errno, strerror( errno ) ));
     282             : 
     283           0 :   *charge_busy = 1;
     284             : 
     285           0 :   for( ulong i=0UL; i<FD_TOPO_GOSSIP_ENTRYPOINTS_MAX; i++ ) {
     286           0 :     if( FD_UNLIKELY( -1==client->pollfds[ i ].fd ) ) continue;
     287             : 
     288           0 :     if( FD_LIKELY( client->pollfds[ i ].revents & POLLOUT ) ) write_conn( client, i );
     289           0 :     if( FD_UNLIKELY( -1==client->pollfds[ i ].fd ) ) continue;
     290           0 :     if( FD_LIKELY( client->pollfds[ i ].revents & POLLIN ) ) {
     291           0 :       if( FD_LIKELY( !read_conn( client, i, buffer, buffer_sz ) ) ) {
     292           0 :         close_all( client );
     293           0 :         *peer = client->peers[ i ].addr;
     294           0 :         return 0;
     295           0 :       }
     296           0 :     }
     297           0 :   }
     298             : 
     299           0 :   return 1;
     300           0 : }
     301             : 
     302             : struct pollfd const *
     303           0 : fd_genesis_client_get_pollfds( fd_genesis_client_t * client ) {
     304           0 :   return client->pollfds;
     305           0 : }
       |