LCOV - code coverage report
Current view: top level - app/fddev/rpc_client - fd_rpc_client.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 182 272 66.9 %
Date: 2025-01-08 12:08:44 Functions: 11 13 84.6 %

          Line data    Source code
       1             : #include "fd_rpc_client.h"
       2             : #include "fd_rpc_client_private.h"
       3             : 
       4             : #include "../../../ballet/http/picohttpparser.h"
       5             : #include "../../../ballet/json/cJSON.h"
       6             : #include "../../../ballet/base58/fd_base58.h"
       7             : 
       8             : #include <errno.h>
       9             : #include <stdio.h>
      10             : #include <stdlib.h>
      11             : #include <unistd.h>
      12             : #include <strings.h>
      13             : #include <sys/socket.h>
      14             : #include <sys/types.h>
      15             : #include <netinet/ip.h>
      16             : 
      17             : #define MAX_REQUEST_LEN (1024UL)
      18             : 
      19             : void *
      20             : fd_rpc_client_new( void * mem,
      21             :                    uint   rpc_addr,
      22           3 :                    ushort rpc_port ) {
      23           3 :   fd_rpc_client_t * rpc = (fd_rpc_client_t *)mem;
      24           3 :   rpc->request_id = 0UL;
      25           3 :   rpc->rpc_addr = rpc_addr;
      26           3 :   rpc->rpc_port = rpc_port;
      27         387 :   for( ulong i=0; i<FD_RPC_CLIENT_REQUEST_CNT; i++ ) {
      28         384 :     rpc->requests[ i ].state = FD_RPC_CLIENT_STATE_NONE;
      29         384 :     rpc->fds[ i ].fd = -1;
      30         384 :     rpc->fds[ i ].events = POLLIN | POLLOUT;
      31         384 :   }
      32           3 :   return (void *)rpc;
      33           3 : }
      34             : 
      35             : long
      36             : fd_rpc_client_wait_ready( fd_rpc_client_t * rpc,
      37           0 :                           long              timeout_ns ) {
      38             : 
      39             : 
      40           0 :   struct sockaddr_in addr = {
      41           0 :     .sin_family = AF_INET,
      42           0 :     .sin_port   = fd_ushort_bswap( rpc->rpc_port ),
      43           0 :     .sin_addr   = { .s_addr = rpc->rpc_addr }
      44           0 :   };
      45             : 
      46           0 :   struct pollfd pfd = {
      47           0 :     .fd = 0,
      48           0 :     .events = POLLOUT,
      49           0 :     .revents = 0
      50           0 :   };
      51             : 
      52           0 :   long start = fd_log_wallclock();
      53           0 :   for(;;) {
      54           0 :     pfd.fd = socket( AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0 );
      55           0 :     if( FD_UNLIKELY( pfd.fd<0 ) ) return FD_RPC_CLIENT_ERR_NETWORK;
      56             : 
      57           0 :     if( FD_UNLIKELY( -1==connect( pfd.fd, fd_type_pun( &addr ), sizeof(addr) ) && errno!=EINPROGRESS ) ) {
      58           0 :       if( FD_UNLIKELY( close( pfd.fd )<0 ) ) FD_LOG_WARNING(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
      59           0 :       return FD_RPC_CLIENT_ERR_NETWORK;
      60           0 :     }
      61             : 
      62           0 :     for(;;) {
      63           0 :       long now = fd_log_wallclock();
      64           0 :       if( FD_UNLIKELY( now-start>=timeout_ns ) ) {
      65           0 :         if( FD_UNLIKELY( close( pfd.fd )<0 ) ) FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
      66           0 :         return FD_RPC_CLIENT_ERR_NETWORK;
      67           0 :       }
      68             : 
      69           0 :       int nfds = poll( &pfd, 1, (int)((now-start) / 1000000) );
      70           0 :       if( FD_UNLIKELY( 0==nfds ) ) continue;
      71           0 :       else if( FD_UNLIKELY( -1==nfds && errno==EINTR ) ) continue;
      72           0 :       else if( FD_UNLIKELY( -1==nfds ) ) {
      73           0 :         if( FD_UNLIKELY( close( pfd.fd )<0 ) ) FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
      74           0 :         return FD_RPC_CLIENT_ERR_NETWORK;
      75           0 :       } else if( FD_LIKELY( pfd.revents & (POLLERR | POLLHUP) ) ) {
      76           0 :         if( FD_UNLIKELY( close( pfd.fd )<0 ) ) FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
      77           0 :         break;
      78           0 :       } else if( FD_LIKELY( pfd.revents & POLLOUT ) ) {
      79           0 :         if( FD_UNLIKELY( close( pfd.fd )<0 ) ) FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
      80           0 :         return FD_RPC_CLIENT_SUCCESS;
      81           0 :       }
      82           0 :     }
      83           0 :   }
      84           0 : }
      85             : 
      86             : static ulong
      87           6 : fd_rpc_available_slot( fd_rpc_client_t * rpc ) {
      88           6 :   for( ulong i=0UL; i<FD_RPC_CLIENT_REQUEST_CNT; i++ ) {
      89           6 :     if( FD_LIKELY( rpc->requests[i].state==FD_RPC_CLIENT_STATE_NONE ) ) return i;
      90           6 :   }
      91           0 :   return ULONG_MAX;
      92           6 : }
      93             : 
      94             : static ulong
      95             : fd_rpc_find_request( fd_rpc_client_t * rpc,
      96          12 :                      long              request_id ) {
      97          12 :   for( ulong i=0UL; i<FD_RPC_CLIENT_REQUEST_CNT; i++ ) {
      98          12 :     if( FD_LIKELY( rpc->requests[i].state==FD_RPC_CLIENT_STATE_NONE ) ) continue;
      99          12 :     if( FD_LIKELY( rpc->requests[i].response.request_id!=request_id ) ) continue;
     100          12 :     return i;
     101          12 :   }
     102           0 :   return ULONG_MAX;
     103          12 : }
     104             : 
     105             : static long
     106             : fd_rpc_client_request( fd_rpc_client_t * rpc,
     107             :                        ulong             method,
     108             :                        long              request_id,
     109             :                        char *            contents,
     110           6 :                        int               contents_len ) {
     111           6 :   ulong idx = fd_rpc_available_slot( rpc );
     112           6 :   if( FD_UNLIKELY( idx==ULONG_MAX) ) return FD_RPC_CLIENT_ERR_TOO_MANY;
     113             : 
     114           6 :   struct fd_rpc_client_request * request = &rpc->requests[ idx ];
     115             : 
     116           6 :   if( FD_UNLIKELY( contents_len<0 ) ) return FD_RPC_CLIENT_ERR_TOO_LARGE;
     117           6 :   if( FD_UNLIKELY( (ulong)contents_len>=MAX_REQUEST_LEN ) ) return FD_RPC_CLIENT_ERR_TOO_LARGE;
     118             : 
     119           6 :   int printed = snprintf( request->connected.request_bytes, sizeof(request->connected.request_bytes),
     120           6 :                           "POST / HTTP/1.1\r\n"
     121           6 :                           "Host: localhost:12001\r\n"
     122           6 :                           "Content-Length: %d\r\n"
     123           6 :                           "Content-Type: application/json\r\n\r\n"
     124           6 :                           "%s", contents_len, contents );
     125           6 :   if( FD_UNLIKELY( printed<0 ) ) return FD_RPC_CLIENT_ERR_TOO_LARGE;
     126           6 :   if( FD_UNLIKELY( (ulong)printed>=sizeof(request->connected.request_bytes) ) ) return FD_RPC_CLIENT_ERR_TOO_LARGE;
     127           6 :   request->connected.request_bytes_cnt = (ulong)printed;
     128             : 
     129           6 :   int fd = socket( AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0 );
     130           6 :   if( FD_UNLIKELY( fd<0 ) ) return FD_RPC_CLIENT_ERR_NETWORK;
     131             : 
     132           6 :   struct sockaddr_in addr = {
     133           6 :     .sin_family = AF_INET,
     134           6 :     .sin_port   = fd_ushort_bswap( rpc->rpc_port ),
     135           6 :     .sin_addr   = { .s_addr = rpc->rpc_addr }
     136           6 :   };
     137             : 
     138           6 :   if( FD_UNLIKELY( -1==connect( fd, fd_type_pun( &addr ), sizeof(addr) ) && errno!=EINPROGRESS ) ) {
     139           0 :     if( FD_UNLIKELY( close( fd )<0 ) ) FD_LOG_WARNING(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     140           0 :     return FD_RPC_CLIENT_ERR_NETWORK;
     141           0 :   }
     142             : 
     143           6 :   rpc->request_id = request_id;
     144           6 :   rpc->fds[ idx ].fd = fd;
     145           6 :   request->response.method = method;
     146           6 :   request->response.status = FD_RPC_CLIENT_PENDING;
     147           6 :   request->response.request_id = rpc->request_id;
     148           6 :   request->connected.request_bytes_sent = 0UL;
     149           6 :   request->state = FD_RPC_CLIENT_STATE_CONNECTED;
     150           6 :   return request->response.request_id;
     151           6 : }
     152             : 
     153             : long
     154           3 : fd_rpc_client_request_latest_block_hash( fd_rpc_client_t * rpc ) {
     155           3 :   char contents[ MAX_REQUEST_LEN ];
     156           3 :   long request_id = fd_long_if( rpc->request_id==LONG_MAX, 0L, rpc->request_id+1L );
     157             : 
     158           3 :   int contents_len = snprintf( contents, sizeof(contents),
     159           3 :                                "{\"jsonrpc\":\"2.0\",\"id\":\"%ld\",\"method\":\"getLatestBlockhash\",\"params\":[]}",
     160           3 :                                request_id );
     161             : 
     162           3 :   return fd_rpc_client_request( rpc, FD_RPC_CLIENT_METHOD_LATEST_BLOCK_HASH, request_id, contents, contents_len );
     163           3 : }
     164             : 
     165             : long
     166           3 : fd_rpc_client_request_transaction_count( fd_rpc_client_t * rpc ) {
     167           3 :   char contents[ MAX_REQUEST_LEN ];
     168           3 :   long request_id = fd_long_if( rpc->request_id==LONG_MAX, 0L, rpc->request_id+1L );
     169             : 
     170           3 :   int contents_len = snprintf( contents, sizeof(contents),
     171           3 :                                "{\"jsonrpc\":\"2.0\",\"id\":\"%ld\",\"method\":\"getTransactionCount\",\"params\":[ { \"commitment\": \"processed\" } ]}",
     172           3 :                                request_id );
     173             : 
     174           3 :   return fd_rpc_client_request( rpc, FD_RPC_CLIENT_METHOD_TRANSACTION_COUNT, request_id, contents, contents_len );
     175           3 : }
     176             : 
     177             : static void
     178             : fd_rpc_mark_error( fd_rpc_client_t * rpc,
     179             :                    ulong             idx,
     180           0 :                    long              error ) {
     181           0 :   if( FD_LIKELY( rpc->fds[ idx ].fd>=0 ) ) {
     182           0 :     if( FD_UNLIKELY( close( rpc->fds[ idx ].fd )<0 ) ) FD_LOG_WARNING(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     183           0 :     rpc->fds[ idx ].fd = -1;
     184           0 :   }
     185           0 :   rpc->requests[ idx ].state = FD_RPC_CLIENT_STATE_FINISHED;
     186           0 :   rpc->requests[ idx ].response.status = error;
     187           0 : }
     188             : 
     189             : static ulong
     190             : fd_rpc_phr_content_length( struct phr_header * headers,
     191           6 :                            ulong               num_headers ) {
     192          12 :   for( ulong i=0UL; i<num_headers; i++ ) {
     193          12 :     if( FD_LIKELY( headers[i].name_len!=14UL ) ) continue;
     194           6 :     if( FD_LIKELY( strncasecmp( headers[i].name, "Content-Length", 14UL ) ) ) continue;
     195           6 :     char * end;
     196           6 :     ulong content_length = strtoul( headers[i].value, &end, 10 );
     197           6 :     if( FD_UNLIKELY( end==headers[i].value ) ) return ULONG_MAX;
     198           6 :     return content_length;
     199           6 :   }
     200           0 :   return ULONG_MAX;
     201           6 : }
     202             : 
     203             : static long
     204             : parse_response( char *                     response,
     205             :                 ulong                      response_len,
     206             :                 ulong                      last_response_len,
     207           6 :                 fd_rpc_client_response_t * result ) {
     208           6 :   int minor_version;
     209           6 :   int status;
     210           6 :   const char * message;
     211           6 :   ulong message_len;
     212           6 :   struct phr_header headers[ 32 ];
     213           6 :   ulong num_headers = 32UL;
     214           6 :   int http_len = phr_parse_response( response, response_len,
     215           6 :                                     &minor_version, &status, &message, &message_len,
     216           6 :                                     headers, &num_headers, last_response_len );
     217           6 :   if( FD_UNLIKELY( -2==http_len ) ) return FD_RPC_CLIENT_PENDING;
     218           6 :   else if( FD_UNLIKELY( -1==http_len ) ) return FD_RPC_CLIENT_ERR_MALFORMED;
     219             : 
     220           6 :   ulong content_length = fd_rpc_phr_content_length( headers, num_headers );
     221           6 :   if( FD_UNLIKELY( content_length==ULONG_MAX ) ) return FD_RPC_CLIENT_ERR_MALFORMED;
     222           6 :   if( FD_UNLIKELY( content_length+(ulong)http_len > MAX_REQUEST_LEN ) ) return FD_RPC_CLIENT_ERR_TOO_LARGE;
     223           6 :   if( FD_LIKELY( content_length+(ulong)http_len>response_len ) ) return FD_RPC_CLIENT_PENDING;
     224             : 
     225           6 :   if( FD_UNLIKELY( status!=200 ) ) return FD_RPC_CLIENT_ERR_MALFORMED;
     226             : 
     227           6 :   const char * parse_end;
     228           6 :   cJSON * json = cJSON_ParseWithLengthOpts( response + http_len, content_length, &parse_end, 0 );
     229           6 :   if( FD_UNLIKELY( !json ) ) return FD_RPC_CLIENT_ERR_MALFORMED;
     230             : 
     231           6 :   switch( result->method ) {
     232           3 :     case FD_RPC_CLIENT_METHOD_TRANSACTION_COUNT: {
     233           3 :       const cJSON * node = cJSON_GetObjectItemCaseSensitive( json, "result" );
     234           3 :       if( FD_UNLIKELY( !cJSON_IsNumber( node ) || node->valueulong==ULONG_MAX ) ) {
     235           0 :         cJSON_Delete( json );
     236           0 :         return FD_RPC_CLIENT_ERR_MALFORMED;
     237           0 :       }
     238             : 
     239           3 :       result->result.transaction_count.transaction_count = node->valueulong;
     240           3 :       cJSON_Delete( json );
     241           3 :       return FD_RPC_CLIENT_SUCCESS;
     242           3 :     }
     243           3 :     case FD_RPC_CLIENT_METHOD_LATEST_BLOCK_HASH: {
     244           3 :       const cJSON * node = cJSON_GetObjectItemCaseSensitive( json, "result" );
     245           3 :       if( FD_UNLIKELY( !cJSON_IsObject( node ) ) ) {
     246           0 :         cJSON_Delete( json );
     247           0 :         return FD_RPC_CLIENT_ERR_MALFORMED;
     248           0 :       }
     249             : 
     250           3 :       node = cJSON_GetObjectItemCaseSensitive( node, "value" );
     251           3 :       if( FD_UNLIKELY( !cJSON_IsObject( node ) ) ) {
     252           0 :         cJSON_Delete( json );
     253           0 :         return FD_RPC_CLIENT_ERR_MALFORMED;
     254           0 :       }
     255             : 
     256           3 :       node = cJSON_GetObjectItemCaseSensitive( node, "blockhash" );
     257           3 :       if( FD_UNLIKELY( !cJSON_IsString( node ) ) ) {
     258           0 :         cJSON_Delete( json );
     259           0 :         return FD_RPC_CLIENT_ERR_MALFORMED;
     260           0 :       }
     261             : 
     262           3 :       if( FD_UNLIKELY( strnlen( node->valuestring, 45UL )>44UL ) ) {
     263           0 :         cJSON_Delete( json );
     264           0 :         return FD_RPC_CLIENT_ERR_MALFORMED;
     265           0 :       }
     266             : 
     267           3 :       if( FD_UNLIKELY( !fd_base58_decode_32( node->valuestring, result->result.latest_block_hash.block_hash ) ) ) {
     268           0 :         cJSON_Delete( json );
     269           0 :         return FD_RPC_CLIENT_ERR_MALFORMED;
     270           0 :       }
     271             : 
     272           3 :       cJSON_Delete( json );
     273           3 :       return FD_RPC_CLIENT_SUCCESS;
     274           3 :     }
     275           0 :     default:
     276           0 :       FD_TEST( 0 );
     277           6 :   }
     278           6 : }
     279             : 
     280             : int
     281             : fd_rpc_client_service( fd_rpc_client_t * rpc,
     282       45621 :                        int               wait ) {
     283       45621 :   int timeout = wait ? -1 : 0;
     284       45621 :   int nfds = poll( rpc->fds, FD_RPC_CLIENT_REQUEST_CNT, timeout );
     285       45621 :   if( FD_UNLIKELY( 0==nfds ) ) return 0;
     286       45621 :   else if( FD_UNLIKELY( -1==nfds && errno==EINTR ) ) return 0;
     287       45621 :   else if( FD_UNLIKELY( -1==nfds ) ) FD_LOG_ERR(( "poll failed (%i-%s)", errno, strerror( errno ) ));
     288             : 
     289     5885109 :   for( ulong i=0UL; i<FD_RPC_CLIENT_REQUEST_CNT; i++ ) {
     290     5839488 :     struct fd_rpc_client_request * request = &rpc->requests[i];
     291             : 
     292     5839488 :     if( FD_LIKELY( request->state==FD_RPC_CLIENT_STATE_CONNECTED && ( rpc->fds[ i ].revents & POLLOUT ) ) ) {
     293           6 :       long sent = send( rpc->fds[ i ].fd, request->connected.request_bytes+request->connected.request_bytes_sent,
     294           6 :                         request->connected.request_bytes_cnt-request->connected.request_bytes_sent, 0 );
     295           6 :       if( FD_UNLIKELY( -1==sent && errno==EAGAIN ) ) continue;
     296           6 :       if( FD_UNLIKELY( -1==sent ) ) {
     297           0 :         fd_rpc_mark_error( rpc, i, FD_RPC_CLIENT_ERR_NETWORK );
     298           0 :         continue;
     299           0 :       }
     300             : 
     301           6 :       request->connected.request_bytes_sent += (ulong)sent;
     302           6 :       if( FD_UNLIKELY( request->connected.request_bytes_sent==request->connected.request_bytes_cnt ) ) {
     303           6 :         request->sent.response_bytes_read = 0UL;
     304           6 :         request->state = FD_RPC_CLIENT_STATE_SENT;
     305           6 :       }
     306           6 :     }
     307             : 
     308     5839488 :     if( FD_LIKELY( request->state==FD_RPC_CLIENT_STATE_SENT && ( rpc->fds[ i ].revents & POLLIN ) ) ) {
     309           6 :       long read = recv( rpc->fds[ i ].fd, request->response_bytes+request->sent.response_bytes_read,
     310           6 :                         sizeof(request->response_bytes)-request->sent.response_bytes_read, 0 );
     311           6 :       if( FD_UNLIKELY( -1==read && errno==EAGAIN ) ) continue;
     312           6 :       else if( FD_UNLIKELY( -1==read ) ) {
     313           0 :         fd_rpc_mark_error( rpc, i, FD_RPC_CLIENT_ERR_NETWORK );
     314           0 :         continue;
     315           0 :       }
     316             : 
     317           6 :       request->sent.response_bytes_read += (ulong)read;
     318           6 :       if( FD_UNLIKELY( request->sent.response_bytes_read==sizeof(request->response_bytes) ) ) {
     319           0 :         fd_rpc_mark_error( rpc, i, FD_RPC_CLIENT_ERR_TOO_LARGE );
     320           0 :         continue;
     321           0 :       }
     322             : 
     323           6 :       fd_rpc_client_response_t * response = &request->response;
     324           6 :       long status = parse_response( request->response_bytes,
     325           6 :                                     request->sent.response_bytes_read,
     326           6 :                                     request->sent.response_bytes_read-(ulong)read,
     327           6 :                                     response );
     328           6 :       if( FD_LIKELY( status==FD_RPC_CLIENT_PENDING ) ) continue;
     329           6 :       else if( FD_UNLIKELY( status==FD_RPC_CLIENT_SUCCESS ) ) {
     330           6 :         if( FD_UNLIKELY( close( rpc->fds[ i ].fd )<0 ) ) FD_LOG_WARNING(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     331           6 :         rpc->fds[ i ].fd = -1;
     332           6 :         response->status = FD_RPC_CLIENT_SUCCESS;
     333           6 :         request->state = FD_RPC_CLIENT_STATE_FINISHED;
     334           6 :         continue;
     335           6 :       } else {
     336           0 :         fd_rpc_mark_error( rpc, i, status );
     337           0 :         continue;
     338           0 :       }
     339           6 :     }
     340     5839488 :   }
     341             : 
     342       45621 :   return 1;
     343       45621 : }
     344             : 
     345             : fd_rpc_client_response_t *
     346             : fd_rpc_client_status( fd_rpc_client_t * rpc,
     347             :                       long              request_id,
     348           6 :                       int               wait ) {
     349           6 :   ulong idx = fd_rpc_find_request( rpc, request_id );
     350           6 :   if( FD_UNLIKELY( idx==ULONG_MAX ) ) return NULL;
     351             : 
     352           6 :   if( FD_LIKELY( !wait ) ) return &rpc->requests[ idx ].response;
     353             : 
     354       45627 :   for(;;) {
     355       45627 :     if( FD_LIKELY( rpc->requests[ idx ].state==FD_RPC_CLIENT_STATE_FINISHED ) ) return &rpc->requests[ idx ].response;
     356       45621 :     fd_rpc_client_service( rpc, 1 );
     357       45621 :   }
     358           6 : }
     359             : 
     360             : void
     361             : fd_rpc_client_close( fd_rpc_client_t * rpc,
     362           6 :                      long              request_id ) {
     363           6 :   ulong idx = fd_rpc_find_request( rpc, request_id );
     364           6 :   if( FD_UNLIKELY( idx==ULONG_MAX ) ) return;
     365             : 
     366           6 :   if( FD_LIKELY( rpc->fds[ idx ].fd>=0 ) ) {
     367           0 :     if( FD_UNLIKELY( close( rpc->fds[ idx ].fd )<0 ) ) FD_LOG_WARNING(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     368           0 :     rpc->fds[ idx ].fd = -1;
     369           0 :   }
     370           6 :   rpc->requests[ idx ].state = FD_RPC_CLIENT_STATE_NONE;
     371           6 : }

Generated by: LCOV version 1.14