Line data Source code
1 : /**
2 :
3 : export RUST_LOG=solana_repair=TRACE
4 : cargo run --bin solana-test-validator
5 :
6 : build/native/gcc/bin/fd_repair_tool --peer_id 75dLVGm338wpo2SsfM7pWestidAjJL1Y9nw9Rb1x7yQQ --slot 1533:0,1534:0
7 :
8 : **/
9 :
10 : #define _GNU_SOURCE /* See feature_test_macros(7) */
11 :
12 : #include "fd_repair.h"
13 : #include "../fd_flamenco.h"
14 : #include "../../util/fd_util.h"
15 : #include "../../ballet/base58/fd_base58.h"
16 : #include "../types/fd_types_yaml.h"
17 : #include "../../util/net/fd_eth.h"
18 : #include <stdio.h>
19 : #include <unistd.h>
20 : #include <signal.h>
21 : #include <sys/socket.h>
22 : #include <netinet/in.h>
23 : #include <arpa/inet.h>
24 : #include <sys/random.h>
25 : #include <errno.h>
26 : #include <netdb.h>
27 : #include <stdlib.h>
28 :
29 : // SIGINT signal handler
30 : volatile int stopflag = 0;
31 0 : static void stop(int sig) { (void)sig; stopflag = 1; }
32 :
33 : static int sockfd = -1;
34 :
35 : /* Convert my style of address to UNIX style */
36 : static int
37 0 : repair_to_sockaddr( uchar * dst, fd_repair_peer_addr_t const * src ) {
38 0 : fd_memset(dst, 0, sizeof(struct sockaddr_in));
39 0 : struct sockaddr_in * t = (struct sockaddr_in *)dst;
40 0 : t->sin_family = AF_INET;
41 0 : t->sin_addr.s_addr = src->addr;
42 0 : t->sin_port = src->port;
43 0 : return sizeof(struct sockaddr_in);
44 0 : }
45 :
46 : /* Convert my style of address from UNIX style */
47 : static int
48 0 : repair_from_sockaddr( fd_repair_peer_addr_t * dst, uchar const * src ) {
49 0 : FD_STATIC_ASSERT(sizeof(fd_repair_peer_addr_t) == sizeof(ulong),"messed up size");
50 0 : dst->l = 0;
51 0 : const struct sockaddr_in * sa = (const struct sockaddr_in *)src;
52 0 : dst->addr = sa->sin_addr.s_addr;
53 0 : dst->port = sa->sin_port;
54 0 : return 0;
55 0 : }
56 :
57 : static void FD_FN_UNUSED
58 : send_packet( uchar const * data,
59 : size_t sz,
60 : fd_repair_peer_addr_t const * addr,
61 : uint src_ip4_addr FD_PARAM_UNUSED,
62 0 : void * arg FD_PARAM_UNUSED ) {
63 0 : // FD_LOG_HEXDUMP_NOTICE(("send: ", data, sz));
64 0 : uchar saddr[sizeof(struct sockaddr_in)];
65 0 : int saddrlen = repair_to_sockaddr(saddr, addr);
66 0 : if ( sendto(sockfd, data, sz, MSG_DONTWAIT,
67 0 : (const struct sockaddr *)saddr, (socklen_t)saddrlen) < 0 ) {
68 0 : FD_LOG_WARNING(("sendto failed: %s", strerror(errno)));
69 0 : }
70 0 : }
71 :
72 : static void
73 0 : recv_shred(fd_shred_t const * shred, ulong shred_sz, fd_gossip_peer_addr_t const * from, fd_pubkey_t const * id, void * arg) {
74 0 : (void)from;
75 0 : (void)id;
76 0 : (void)arg;
77 0 : FD_LOG_NOTICE(( "shred variant=0x%02x sz=%lu slot=%lu idx=%u header_sz=0x%lx merkle_sz=0x%lx payload_sz=0x%lx",
78 0 : (uint)shred->variant, shred_sz, shred->slot, shred->idx, fd_shred_header_sz(shred->variant),
79 0 : fd_shred_merkle_sz(shred->variant), fd_shred_payload_sz(shred) ));
80 0 : }
81 :
82 : static void FD_FN_UNUSED
83 : deliver_fail_fun( fd_pubkey_t const * id,
84 : ulong slot,
85 : uint shred_index,
86 : void * arg,
87 0 : int reason ) {
88 0 : (void)arg;
89 0 : FD_LOG_WARNING(( "repair_deliver_fail_fun - shred: %s, slot: %lu, idx: %u, reason: %d",
90 0 : FD_BASE58_ENC_32_ALLOCA( id ),
91 0 : slot,
92 0 : shred_index,
93 0 : reason ));
94 0 : }
95 :
96 : /* NOTE: since the removal of callbacks from repair_tile, this is now a similar but different
97 : version of fd_repair_recv_clnt_packet in fd_repair_tile.c */
98 : /* Pass a raw client response packet into the protocol. addr is the address of the sender */
99 : int
100 : fd_repair_recv_clnt_packet( fd_repair_t * glob,
101 : uchar const * msg,
102 : ulong msglen,
103 : fd_repair_peer_addr_t const * src_addr,
104 0 : uint dst_ip4_addr FD_PARAM_UNUSED) {
105 0 : glob->metrics.recv_clnt_pkt++;
106 :
107 0 : FD_SCRATCH_SCOPE_BEGIN {
108 0 : while( 1 ) {
109 0 : ulong decoded_sz;
110 0 : fd_repair_response_t * gmsg = fd_bincode_decode1_scratch(
111 0 : repair_response, msg, msglen, NULL, &decoded_sz );
112 0 : if( FD_UNLIKELY( !gmsg ) ) {
113 : /* Solana falls back to assuming we got a shred in this case
114 : https://github.com/solana-labs/solana/blob/master/core/src/repair/serve_repair.rs#L1198 */
115 0 : break;
116 0 : }
117 0 : if( FD_UNLIKELY( decoded_sz != msglen ) ) {
118 0 : break;
119 0 : }
120 :
121 0 : switch( gmsg->discriminant ) {
122 0 : case fd_repair_response_enum_ping:
123 : /* TODO: make sure to add back handle_ping from repair tile in order to use */
124 : //fd_repair_handle_ping( glob, &gmsg->inner.ping, src_addr, dst_ip4_addr );
125 0 : break;
126 0 : }
127 :
128 0 : return 0;
129 0 : }
130 :
131 : /* Look at the nonse */
132 0 : if( msglen < sizeof(fd_repair_nonce_t) ) {
133 0 : return 0;
134 0 : }
135 0 : ulong shredlen = msglen - sizeof(fd_repair_nonce_t); /* Nonce is at the end */
136 0 : fd_repair_nonce_t key = *(fd_repair_nonce_t const *)(msg + shredlen);
137 0 : fd_needed_elem_t * val = fd_needed_table_query( glob->needed, &key, NULL );
138 0 : if( NULL == val ) {
139 0 : return 0;
140 0 : }
141 :
142 0 : fd_active_elem_t * active = fd_active_table_query( glob->actives, &val->id, NULL );
143 0 : if( NULL != active ) {
144 : /* Update statistics */
145 0 : active->avg_reps++;
146 0 : active->avg_lat += glob->now - val->when;
147 0 : }
148 :
149 0 : fd_shred_t const * shred = fd_shred_parse(msg, shredlen);
150 0 : if( shred == NULL ) {
151 0 : FD_LOG_WARNING(("invalid shread"));
152 0 : } else {
153 0 : recv_shred(shred, shredlen, src_addr, &val->id, glob->fun_arg);
154 0 : }
155 0 : } FD_SCRATCH_SCOPE_END;
156 0 : return 0;
157 0 : }
158 :
159 :
160 : /* Convert a host:port string to a repair network address. If host is
161 : * missing, it assumes the local hostname. */
162 : static fd_repair_peer_addr_t *
163 0 : resolve_hostport(const char* str /* host:port */, fd_repair_peer_addr_t * res) {
164 0 : fd_memset(res, 0, sizeof(fd_repair_peer_addr_t));
165 :
166 : /* Find the : and copy out the host */
167 0 : char buf[128];
168 0 : uint i;
169 0 : for (i = 0; ; ++i) {
170 0 : if (str[i] == '\0' || i > sizeof(buf)-1U) {
171 0 : FD_LOG_ERR(("missing colon"));
172 0 : return NULL;
173 0 : }
174 0 : if (str[i] == ':') {
175 0 : buf[i] = '\0';
176 0 : break;
177 0 : }
178 0 : buf[i] = str[i];
179 0 : }
180 0 : if (i == 0)
181 : /* :port means $HOST:port */
182 0 : gethostname(buf, sizeof(buf));
183 :
184 0 : struct hostent * host = gethostbyname( buf );
185 0 : if (host == NULL) {
186 0 : FD_LOG_WARNING(("unable to resolve host %s", buf));
187 0 : return NULL;
188 0 : }
189 : /* Convert result to repair address */
190 0 : res->l = 0;
191 0 : res->addr = ((struct in_addr *)host->h_addr)->s_addr;
192 0 : int port = atoi(str + i + 1);
193 0 : if (port < 1024 || port > (int)USHORT_MAX) {
194 0 : FD_LOG_ERR(("invalid port number"));
195 0 : return NULL;
196 0 : }
197 0 : res->port = htons((ushort)port);
198 :
199 0 : return res;
200 0 : }
201 :
202 :
203 : static int
204 0 : main_loop( int * argc, char *** argv, fd_repair_t * glob, fd_repair_config_t * config, volatile int * stopflag ) {
205 0 : int fd;
206 0 : if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
207 0 : FD_LOG_ERR(("socket failed: %s", strerror(errno)));
208 0 : return -1;
209 0 : }
210 0 : sockfd = fd;
211 0 : int optval = 1<<20;
212 0 : if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (char *)&optval, sizeof(int)) < 0) {
213 0 : FD_LOG_ERR(("setsocketopt failed: %s", strerror(errno)));
214 0 : return -1;
215 0 : }
216 0 : if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (char *)&optval, sizeof(int)) < 0) {
217 0 : FD_LOG_ERR(("setsocketopt failed: %s", strerror(errno)));
218 0 : return -1;
219 0 : }
220 0 : uchar saddr[sizeof(struct sockaddr_in6)];
221 0 : int saddrlen = repair_to_sockaddr(saddr, &config->intake_addr);
222 0 : if (saddrlen < 0 || bind(fd, (struct sockaddr*)saddr, (uint)saddrlen) < 0) {
223 0 : FD_LOG_ERR(("bind failed: %s", strerror(errno)));
224 0 : return -1;
225 0 : }
226 :
227 0 : fd_repair_settime(glob, fd_log_wallclock());
228 0 : fd_repair_start(glob);
229 :
230 0 : char const * id_cstr = fd_env_strip_cmdline_cstr ( argc, argv, "--peer_id", NULL, NULL );
231 0 : if ( id_cstr == NULL )
232 0 : FD_LOG_ERR(("--peer_id command line argument required"));
233 0 : fd_pubkey_t id;
234 0 : fd_base58_decode_32(id_cstr, id.uc);
235 0 : char const * addr_cstr = fd_env_strip_cmdline_cstr ( argc, argv, "--peer_addr", NULL, "127.0.0.1:1032" );
236 0 : fd_repair_peer_addr_t peeraddr;
237 0 : if ( fd_repair_add_active_peer(glob, resolve_hostport(addr_cstr, &peeraddr), &id) )
238 0 : return -1;
239 0 : fd_repair_add_sticky(glob, &id);
240 :
241 0 : char const * slot_cstr = fd_env_strip_cmdline_cstr ( argc, argv, "--slot", NULL, NULL );
242 0 : if ( slot_cstr == NULL )
243 0 : FD_LOG_ERR(("--slot command line argument required"));
244 :
245 0 : ulong smax = 8;
246 0 : ulong depth = 1<<17;
247 0 : char * smem = malloc(fd_scratch_smem_footprint(smax) + fd_scratch_fmem_footprint(depth));
248 0 : fd_scratch_attach( smem, smem + fd_scratch_smem_footprint(smax), smax, depth );
249 :
250 0 : #define VLEN 32U
251 0 : struct mmsghdr msgs[VLEN];
252 0 : struct iovec iovecs[VLEN];
253 0 : uchar bufs[VLEN][FD_ETH_PAYLOAD_MAX];
254 0 : uchar sockaddrs[VLEN][sizeof(struct sockaddr_in6)]; /* sockaddr is smaller than sockaddr_in6 */
255 :
256 0 : long lastreq = 0;
257 0 : while ( !*stopflag ) {
258 0 : long now = fd_log_wallclock();
259 :
260 0 : if (now - lastreq > (long)3e9) {
261 0 : lastreq = now;
262 0 : char* cstr = (char*)slot_cstr;
263 0 : do {
264 0 : ulong slot = strtoul(cstr, &cstr, 10);
265 0 : if ( *cstr != ':' )
266 0 : FD_LOG_ERR(("--slot takes <slot>:<idx>,<slot>:<idx>,<slot>:<idx>..."));
267 0 : ++cstr;
268 0 : long idx = strtol(cstr, &cstr, 10);
269 0 : if( idx == -1 ) {
270 0 : if( fd_repair_need_highest_window_index(glob, slot, 0U) )
271 0 : return -1;
272 0 : } else if( idx == -2 ) {
273 0 : if( fd_repair_need_orphan(glob, slot) )
274 0 : return -1;
275 0 : } else if( idx >= 0 ) {
276 0 : if( fd_repair_need_window_index(glob, slot, (uint)idx) != 1 )
277 0 : return -1;
278 0 : }
279 0 : if ( *cstr == '\0' )
280 0 : break;
281 0 : if ( *cstr != ',' )
282 0 : FD_LOG_ERR(("--slot takes <slot>:<idx>,<slot>:<idx>,<slot>:<idx>..."));
283 0 : ++cstr;
284 0 : } while (1);
285 0 : }
286 :
287 0 : fd_repair_settime(glob, now);
288 0 : fd_repair_continue(glob);
289 :
290 0 : fd_memset(msgs, 0, sizeof(msgs));
291 0 : for (uint i = 0; i < VLEN; i++) {
292 0 : iovecs[i].iov_base = bufs[i];
293 0 : iovecs[i].iov_len = FD_ETH_PAYLOAD_MAX;
294 0 : msgs[i].msg_hdr.msg_iov = &iovecs[i];
295 0 : msgs[i].msg_hdr.msg_iovlen = 1;
296 0 : msgs[i].msg_hdr.msg_name = sockaddrs[i];
297 0 : msgs[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_in6);
298 0 : }
299 :
300 : /* Read more packets */
301 0 : long retval = recvmmsg(fd, msgs, VLEN, MSG_DONTWAIT, NULL);
302 0 : if (retval < 0) {
303 0 : if (errno == EINTR || errno == EWOULDBLOCK)
304 0 : continue;
305 0 : FD_LOG_ERR(("recvmmsg failed: %s", strerror(errno)));
306 0 : return -1;
307 0 : }
308 0 : if (retval == 0)
309 0 : continue;
310 :
311 0 : for (uint i = 0; i < (uint)retval; ++i) {
312 0 : fd_repair_peer_addr_t from;
313 0 : repair_from_sockaddr( &from, msgs[i].msg_hdr.msg_name );
314 0 : FD_LOG_HEXDUMP_NOTICE(("recv: ", bufs[i], (ulong)msgs[i].msg_len));
315 0 : fd_repair_recv_clnt_packet( glob, bufs[i], (ulong)msgs[i].msg_len, &from, 0 );
316 0 : }
317 0 : }
318 :
319 0 : fd_scratch_detach(NULL);
320 0 : free(smem);
321 :
322 0 : close(fd);
323 0 : return 0;
324 0 : }
325 :
326 : int main(int argc, char **argv) {
327 : fd_boot ( &argc, &argv );
328 : fd_flamenco_boot( &argc, &argv );
329 :
330 : fd_valloc_t valloc = fd_libc_alloc_virtual();
331 :
332 : fd_repair_config_t config;
333 : fd_memset(&config, 0, sizeof(config));
334 :
335 : uchar private_key[32];
336 : FD_TEST( 32UL==getrandom( private_key, 32UL, 0 ) );
337 : fd_sha512_t sha[1];
338 : fd_pubkey_t public_key;
339 : FD_TEST( fd_ed25519_public_from_private( public_key.uc, private_key, sha ) );
340 :
341 : config.private_key = private_key;
342 : config.public_key = &public_key;
343 :
344 : char hostname[64];
345 : gethostname(hostname, sizeof(hostname));
346 :
347 : char const * my_addr = fd_env_strip_cmdline_cstr ( &argc, &argv, "--my_addr", NULL, ":1125");
348 : FD_TEST( resolve_hostport(my_addr, &config.intake_addr) );
349 :
350 : ulong seed = fd_hash(0, hostname, strnlen(hostname, sizeof(hostname)));
351 :
352 : void * shm = fd_valloc_malloc(valloc, fd_repair_align(), fd_repair_footprint());
353 : fd_repair_t * glob = fd_repair_join(fd_repair_new(shm, seed ));
354 :
355 : if ( fd_repair_set_config(glob, &config) )
356 : return 1;
357 :
358 : signal(SIGINT, stop);
359 : signal(SIGPIPE, SIG_IGN);
360 :
361 : if ( main_loop(&argc, &argv, glob, &config, &stopflag) )
362 : return 1;
363 :
364 : fd_valloc_free(valloc, fd_repair_delete(fd_repair_leave(glob) ));
365 :
366 : fd_halt();
367 :
368 : return 0;
369 : }
|