Line data Source code
1 : #include "fd_rpc_client.h"
2 : #include "fd_rpc_client_private.h"
3 :
4 : #include "../../../ballet/http/picohttpparser.h"
5 : #include "../../../ballet/json/cJSON.h"
6 : #include "../../../ballet/base58/fd_base58.h"
7 :
8 : #include <errno.h>
9 : #include <stdio.h>
10 : #include <stdlib.h>
11 : #include <unistd.h>
12 : #include <strings.h>
13 : #include <sys/socket.h>
14 : #include <sys/types.h>
15 : #include <netinet/ip.h>
16 :
17 : #define MAX_REQUEST_LEN (1024UL)
18 :
19 : void *
20 : fd_rpc_client_new( void * mem,
21 : uint rpc_addr,
22 3 : ushort rpc_port ) {
23 3 : fd_rpc_client_t * rpc = (fd_rpc_client_t *)mem;
24 3 : rpc->request_id = 0UL;
25 3 : rpc->rpc_addr = rpc_addr;
26 3 : rpc->rpc_port = rpc_port;
27 387 : for( ulong i=0; i<FD_RPC_CLIENT_REQUEST_CNT; i++ ) {
28 384 : rpc->requests[ i ].state = FD_RPC_CLIENT_STATE_NONE;
29 384 : rpc->fds[ i ].fd = -1;
30 384 : rpc->fds[ i ].events = POLLIN | POLLOUT;
31 384 : }
32 3 : return (void *)rpc;
33 3 : }
34 :
35 : long
36 : fd_rpc_client_wait_ready( fd_rpc_client_t * rpc,
37 0 : long timeout_ns ) {
38 :
39 :
40 0 : struct sockaddr_in addr = {
41 0 : .sin_family = AF_INET,
42 0 : .sin_port = fd_ushort_bswap( rpc->rpc_port ),
43 0 : .sin_addr = { .s_addr = rpc->rpc_addr }
44 0 : };
45 :
46 0 : struct pollfd pfd = {
47 0 : .fd = 0,
48 0 : .events = POLLOUT,
49 0 : .revents = 0
50 0 : };
51 :
52 0 : long start = fd_log_wallclock();
53 0 : for(;;) {
54 0 : pfd.fd = socket( AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0 );
55 0 : if( FD_UNLIKELY( pfd.fd<0 ) ) return FD_RPC_CLIENT_ERR_NETWORK;
56 :
57 0 : if( FD_UNLIKELY( -1==connect( pfd.fd, fd_type_pun( &addr ), sizeof(addr) ) && errno!=EINPROGRESS ) ) {
58 0 : if( FD_UNLIKELY( close( pfd.fd )<0 ) ) FD_LOG_WARNING(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
59 0 : return FD_RPC_CLIENT_ERR_NETWORK;
60 0 : }
61 :
62 0 : for(;;) {
63 0 : long now = fd_log_wallclock();
64 0 : if( FD_UNLIKELY( now-start>=timeout_ns ) ) {
65 0 : if( FD_UNLIKELY( close( pfd.fd )<0 ) ) FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
66 0 : return FD_RPC_CLIENT_ERR_NETWORK;
67 0 : }
68 :
69 0 : int nfds = poll( &pfd, 1, (int)((now-start) / 1000000) );
70 0 : if( FD_UNLIKELY( 0==nfds ) ) continue;
71 0 : else if( FD_UNLIKELY( -1==nfds && errno==EINTR ) ) continue;
72 0 : else if( FD_UNLIKELY( -1==nfds ) ) {
73 0 : if( FD_UNLIKELY( close( pfd.fd )<0 ) ) FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
74 0 : return FD_RPC_CLIENT_ERR_NETWORK;
75 0 : } else if( FD_LIKELY( pfd.revents & (POLLERR | POLLHUP) ) ) {
76 0 : if( FD_UNLIKELY( close( pfd.fd )<0 ) ) FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
77 0 : break;
78 0 : } else if( FD_LIKELY( pfd.revents & POLLOUT ) ) {
79 0 : if( FD_UNLIKELY( close( pfd.fd )<0 ) ) FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
80 0 : return FD_RPC_CLIENT_SUCCESS;
81 0 : }
82 0 : }
83 0 : }
84 0 : }
85 :
86 : static ulong
87 6 : fd_rpc_available_slot( fd_rpc_client_t * rpc ) {
88 6 : for( ulong i=0UL; i<FD_RPC_CLIENT_REQUEST_CNT; i++ ) {
89 6 : if( FD_LIKELY( rpc->requests[i].state==FD_RPC_CLIENT_STATE_NONE ) ) return i;
90 6 : }
91 0 : return ULONG_MAX;
92 6 : }
93 :
94 : static ulong
95 : fd_rpc_find_request( fd_rpc_client_t * rpc,
96 12 : long request_id ) {
97 12 : for( ulong i=0UL; i<FD_RPC_CLIENT_REQUEST_CNT; i++ ) {
98 12 : if( FD_LIKELY( rpc->requests[i].state==FD_RPC_CLIENT_STATE_NONE ) ) continue;
99 12 : if( FD_LIKELY( rpc->requests[i].response.request_id!=request_id ) ) continue;
100 12 : return i;
101 12 : }
102 0 : return ULONG_MAX;
103 12 : }
104 :
105 : static long
106 : fd_rpc_client_request( fd_rpc_client_t * rpc,
107 : ulong method,
108 : long request_id,
109 : char * contents,
110 6 : int contents_len ) {
111 6 : ulong idx = fd_rpc_available_slot( rpc );
112 6 : if( FD_UNLIKELY( idx==ULONG_MAX) ) return FD_RPC_CLIENT_ERR_TOO_MANY;
113 :
114 6 : struct fd_rpc_client_request * request = &rpc->requests[ idx ];
115 :
116 6 : if( FD_UNLIKELY( contents_len<0 ) ) return FD_RPC_CLIENT_ERR_TOO_LARGE;
117 6 : if( FD_UNLIKELY( (ulong)contents_len>=MAX_REQUEST_LEN ) ) return FD_RPC_CLIENT_ERR_TOO_LARGE;
118 :
119 6 : int printed = snprintf( request->connected.request_bytes, sizeof(request->connected.request_bytes),
120 6 : "POST / HTTP/1.1\r\n"
121 6 : "Host: localhost:12001\r\n"
122 6 : "Content-Length: %d\r\n"
123 6 : "Content-Type: application/json\r\n\r\n"
124 6 : "%s", contents_len, contents );
125 6 : if( FD_UNLIKELY( printed<0 ) ) return FD_RPC_CLIENT_ERR_TOO_LARGE;
126 6 : if( FD_UNLIKELY( (ulong)printed>=sizeof(request->connected.request_bytes) ) ) return FD_RPC_CLIENT_ERR_TOO_LARGE;
127 6 : request->connected.request_bytes_cnt = (ulong)printed;
128 :
129 6 : int fd = socket( AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0 );
130 6 : if( FD_UNLIKELY( fd<0 ) ) return FD_RPC_CLIENT_ERR_NETWORK;
131 :
132 6 : struct sockaddr_in addr = {
133 6 : .sin_family = AF_INET,
134 6 : .sin_port = fd_ushort_bswap( rpc->rpc_port ),
135 6 : .sin_addr = { .s_addr = rpc->rpc_addr }
136 6 : };
137 :
138 6 : if( FD_UNLIKELY( -1==connect( fd, fd_type_pun( &addr ), sizeof(addr) ) && errno!=EINPROGRESS ) ) {
139 0 : if( FD_UNLIKELY( close( fd )<0 ) ) FD_LOG_WARNING(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
140 0 : return FD_RPC_CLIENT_ERR_NETWORK;
141 0 : }
142 :
143 6 : rpc->request_id = request_id;
144 6 : rpc->fds[ idx ].fd = fd;
145 6 : request->response.method = method;
146 6 : request->response.status = FD_RPC_CLIENT_PENDING;
147 6 : request->response.request_id = rpc->request_id;
148 6 : request->connected.request_bytes_sent = 0UL;
149 6 : request->state = FD_RPC_CLIENT_STATE_CONNECTED;
150 6 : return request->response.request_id;
151 6 : }
152 :
153 : long
154 3 : fd_rpc_client_request_latest_block_hash( fd_rpc_client_t * rpc ) {
155 3 : char contents[ MAX_REQUEST_LEN ];
156 3 : long request_id = fd_long_if( rpc->request_id==LONG_MAX, 0L, rpc->request_id+1L );
157 :
158 3 : int contents_len = snprintf( contents, sizeof(contents),
159 3 : "{\"jsonrpc\":\"2.0\",\"id\":\"%ld\",\"method\":\"getLatestBlockhash\",\"params\":[]}",
160 3 : request_id );
161 :
162 3 : return fd_rpc_client_request( rpc, FD_RPC_CLIENT_METHOD_LATEST_BLOCK_HASH, request_id, contents, contents_len );
163 3 : }
164 :
165 : long
166 3 : fd_rpc_client_request_transaction_count( fd_rpc_client_t * rpc ) {
167 3 : char contents[ MAX_REQUEST_LEN ];
168 3 : long request_id = fd_long_if( rpc->request_id==LONG_MAX, 0L, rpc->request_id+1L );
169 :
170 3 : int contents_len = snprintf( contents, sizeof(contents),
171 3 : "{\"jsonrpc\":\"2.0\",\"id\":\"%ld\",\"method\":\"getTransactionCount\",\"params\":[ { \"commitment\": \"processed\" } ]}",
172 3 : request_id );
173 :
174 3 : return fd_rpc_client_request( rpc, FD_RPC_CLIENT_METHOD_TRANSACTION_COUNT, request_id, contents, contents_len );
175 3 : }
176 :
177 : static void
178 : fd_rpc_mark_error( fd_rpc_client_t * rpc,
179 : ulong idx,
180 0 : long error ) {
181 0 : if( FD_LIKELY( rpc->fds[ idx ].fd>=0 ) ) {
182 0 : if( FD_UNLIKELY( close( rpc->fds[ idx ].fd )<0 ) ) FD_LOG_WARNING(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
183 0 : rpc->fds[ idx ].fd = -1;
184 0 : }
185 0 : rpc->requests[ idx ].state = FD_RPC_CLIENT_STATE_FINISHED;
186 0 : rpc->requests[ idx ].response.status = error;
187 0 : }
188 :
189 : static ulong
190 : fd_rpc_phr_content_length( struct phr_header * headers,
191 6 : ulong num_headers ) {
192 12 : for( ulong i=0UL; i<num_headers; i++ ) {
193 12 : if( FD_LIKELY( headers[i].name_len!=14UL ) ) continue;
194 6 : if( FD_LIKELY( strncasecmp( headers[i].name, "Content-Length", 14UL ) ) ) continue;
195 6 : char * end;
196 6 : ulong content_length = strtoul( headers[i].value, &end, 10 );
197 6 : if( FD_UNLIKELY( end==headers[i].value ) ) return ULONG_MAX;
198 6 : return content_length;
199 6 : }
200 0 : return ULONG_MAX;
201 6 : }
202 :
203 : static long
204 : parse_response( char * response,
205 : ulong response_len,
206 : ulong last_response_len,
207 6 : fd_rpc_client_response_t * result ) {
208 6 : int minor_version;
209 6 : int status;
210 6 : const char * message;
211 6 : ulong message_len;
212 6 : struct phr_header headers[ 32 ];
213 6 : ulong num_headers = 32UL;
214 6 : int http_len = phr_parse_response( response, response_len,
215 6 : &minor_version, &status, &message, &message_len,
216 6 : headers, &num_headers, last_response_len );
217 6 : if( FD_UNLIKELY( -2==http_len ) ) return FD_RPC_CLIENT_PENDING;
218 6 : else if( FD_UNLIKELY( -1==http_len ) ) return FD_RPC_CLIENT_ERR_MALFORMED;
219 :
220 6 : ulong content_length = fd_rpc_phr_content_length( headers, num_headers );
221 6 : if( FD_UNLIKELY( content_length==ULONG_MAX ) ) return FD_RPC_CLIENT_ERR_MALFORMED;
222 6 : if( FD_UNLIKELY( content_length+(ulong)http_len > MAX_REQUEST_LEN ) ) return FD_RPC_CLIENT_ERR_TOO_LARGE;
223 6 : if( FD_LIKELY( content_length+(ulong)http_len>response_len ) ) return FD_RPC_CLIENT_PENDING;
224 :
225 6 : if( FD_UNLIKELY( status!=200 ) ) return FD_RPC_CLIENT_ERR_MALFORMED;
226 :
227 6 : const char * parse_end;
228 6 : cJSON * json = cJSON_ParseWithLengthOpts( response + http_len, content_length, &parse_end, 0 );
229 6 : if( FD_UNLIKELY( !json ) ) return FD_RPC_CLIENT_ERR_MALFORMED;
230 :
231 6 : switch( result->method ) {
232 3 : case FD_RPC_CLIENT_METHOD_TRANSACTION_COUNT: {
233 3 : const cJSON * node = cJSON_GetObjectItemCaseSensitive( json, "result" );
234 3 : if( FD_UNLIKELY( !cJSON_IsNumber( node ) || node->valueulong==ULONG_MAX ) ) {
235 0 : cJSON_Delete( json );
236 0 : return FD_RPC_CLIENT_ERR_MALFORMED;
237 0 : }
238 :
239 3 : result->result.transaction_count.transaction_count = node->valueulong;
240 3 : cJSON_Delete( json );
241 3 : return FD_RPC_CLIENT_SUCCESS;
242 3 : }
243 3 : case FD_RPC_CLIENT_METHOD_LATEST_BLOCK_HASH: {
244 3 : const cJSON * node = cJSON_GetObjectItemCaseSensitive( json, "result" );
245 3 : if( FD_UNLIKELY( !cJSON_IsObject( node ) ) ) {
246 0 : cJSON_Delete( json );
247 0 : return FD_RPC_CLIENT_ERR_MALFORMED;
248 0 : }
249 :
250 3 : node = cJSON_GetObjectItemCaseSensitive( node, "value" );
251 3 : if( FD_UNLIKELY( !cJSON_IsObject( node ) ) ) {
252 0 : cJSON_Delete( json );
253 0 : return FD_RPC_CLIENT_ERR_MALFORMED;
254 0 : }
255 :
256 3 : node = cJSON_GetObjectItemCaseSensitive( node, "blockhash" );
257 3 : if( FD_UNLIKELY( !cJSON_IsString( node ) ) ) {
258 0 : cJSON_Delete( json );
259 0 : return FD_RPC_CLIENT_ERR_MALFORMED;
260 0 : }
261 :
262 3 : if( FD_UNLIKELY( strnlen( node->valuestring, 45UL )>44UL ) ) {
263 0 : cJSON_Delete( json );
264 0 : return FD_RPC_CLIENT_ERR_MALFORMED;
265 0 : }
266 :
267 3 : if( FD_UNLIKELY( !fd_base58_decode_32( node->valuestring, result->result.latest_block_hash.block_hash ) ) ) {
268 0 : cJSON_Delete( json );
269 0 : return FD_RPC_CLIENT_ERR_MALFORMED;
270 0 : }
271 :
272 3 : cJSON_Delete( json );
273 3 : return FD_RPC_CLIENT_SUCCESS;
274 3 : }
275 0 : default:
276 0 : FD_TEST( 0 );
277 6 : }
278 6 : }
279 :
280 : int
281 : fd_rpc_client_service( fd_rpc_client_t * rpc,
282 34259 : int wait ) {
283 34259 : int timeout = wait ? -1 : 0;
284 34259 : int nfds = poll( rpc->fds, FD_RPC_CLIENT_REQUEST_CNT, timeout );
285 34259 : if( FD_UNLIKELY( 0==nfds ) ) return 0;
286 34259 : else if( FD_UNLIKELY( -1==nfds && errno==EINTR ) ) return 0;
287 34259 : else if( FD_UNLIKELY( -1==nfds ) ) FD_LOG_ERR(( "poll failed (%i-%s)", errno, strerror( errno ) ));
288 :
289 4419411 : for( ulong i=0UL; i<FD_RPC_CLIENT_REQUEST_CNT; i++ ) {
290 4385152 : struct fd_rpc_client_request * request = &rpc->requests[i];
291 :
292 4385152 : if( FD_LIKELY( request->state==FD_RPC_CLIENT_STATE_CONNECTED && ( rpc->fds[ i ].revents & POLLOUT ) ) ) {
293 6 : long sent = send( rpc->fds[ i ].fd, request->connected.request_bytes+request->connected.request_bytes_sent,
294 6 : request->connected.request_bytes_cnt-request->connected.request_bytes_sent, 0 );
295 6 : if( FD_UNLIKELY( -1==sent && errno==EAGAIN ) ) continue;
296 6 : if( FD_UNLIKELY( -1==sent ) ) {
297 0 : fd_rpc_mark_error( rpc, i, FD_RPC_CLIENT_ERR_NETWORK );
298 0 : continue;
299 0 : }
300 :
301 6 : request->connected.request_bytes_sent += (ulong)sent;
302 6 : if( FD_UNLIKELY( request->connected.request_bytes_sent==request->connected.request_bytes_cnt ) ) {
303 6 : request->sent.response_bytes_read = 0UL;
304 6 : request->state = FD_RPC_CLIENT_STATE_SENT;
305 6 : }
306 6 : }
307 :
308 4385152 : if( FD_LIKELY( request->state==FD_RPC_CLIENT_STATE_SENT && ( rpc->fds[ i ].revents & POLLIN ) ) ) {
309 6 : long read = recv( rpc->fds[ i ].fd, request->response_bytes+request->sent.response_bytes_read,
310 6 : sizeof(request->response_bytes)-request->sent.response_bytes_read, 0 );
311 6 : if( FD_UNLIKELY( -1==read && errno==EAGAIN ) ) continue;
312 6 : else if( FD_UNLIKELY( -1==read ) ) {
313 0 : fd_rpc_mark_error( rpc, i, FD_RPC_CLIENT_ERR_NETWORK );
314 0 : continue;
315 0 : }
316 :
317 6 : request->sent.response_bytes_read += (ulong)read;
318 6 : if( FD_UNLIKELY( request->sent.response_bytes_read==sizeof(request->response_bytes) ) ) {
319 0 : fd_rpc_mark_error( rpc, i, FD_RPC_CLIENT_ERR_TOO_LARGE );
320 0 : continue;
321 0 : }
322 :
323 6 : fd_rpc_client_response_t * response = &request->response;
324 6 : long status = parse_response( request->response_bytes,
325 6 : request->sent.response_bytes_read,
326 6 : request->sent.response_bytes_read-(ulong)read,
327 6 : response );
328 6 : if( FD_LIKELY( status==FD_RPC_CLIENT_PENDING ) ) continue;
329 6 : else if( FD_UNLIKELY( status==FD_RPC_CLIENT_SUCCESS ) ) {
330 6 : if( FD_UNLIKELY( close( rpc->fds[ i ].fd )<0 ) ) FD_LOG_WARNING(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
331 6 : rpc->fds[ i ].fd = -1;
332 6 : response->status = FD_RPC_CLIENT_SUCCESS;
333 6 : request->state = FD_RPC_CLIENT_STATE_FINISHED;
334 6 : continue;
335 6 : } else {
336 0 : fd_rpc_mark_error( rpc, i, status );
337 0 : continue;
338 0 : }
339 6 : }
340 4385152 : }
341 :
342 34259 : return 1;
343 34259 : }
344 :
345 : fd_rpc_client_response_t *
346 : fd_rpc_client_status( fd_rpc_client_t * rpc,
347 : long request_id,
348 6 : int wait ) {
349 6 : ulong idx = fd_rpc_find_request( rpc, request_id );
350 6 : if( FD_UNLIKELY( idx==ULONG_MAX ) ) return NULL;
351 :
352 6 : if( FD_LIKELY( !wait ) ) return &rpc->requests[ idx ].response;
353 :
354 34265 : for(;;) {
355 34265 : if( FD_LIKELY( rpc->requests[ idx ].state==FD_RPC_CLIENT_STATE_FINISHED ) ) return &rpc->requests[ idx ].response;
356 34259 : fd_rpc_client_service( rpc, 1 );
357 34259 : }
358 6 : }
359 :
360 : void
361 : fd_rpc_client_close( fd_rpc_client_t * rpc,
362 6 : long request_id ) {
363 6 : ulong idx = fd_rpc_find_request( rpc, request_id );
364 6 : if( FD_UNLIKELY( idx==ULONG_MAX ) ) return;
365 :
366 6 : if( FD_LIKELY( rpc->fds[ idx ].fd>=0 ) ) {
367 0 : if( FD_UNLIKELY( close( rpc->fds[ idx ].fd )<0 ) ) FD_LOG_WARNING(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
368 0 : rpc->fds[ idx ].fd = -1;
369 0 : }
370 6 : rpc->requests[ idx ].state = FD_RPC_CLIENT_STATE_NONE;
371 6 : }
|