Line data Source code
1 : #include "fd_snapshot_parser.h"
2 : #include "fd_ssmsg.h"
3 : #include "fd_ssmanifest_parser.h"
4 :
5 : #include "../../../flamenco/runtime/fd_acc_mgr.h" /* FD_ACC_SZ_MAX */
6 : #include "../../../util/archive/fd_tar.h"
7 :
8 : #include <errno.h>
9 : #include <assert.h>
10 : #include <stdio.h>
11 :
12 : FD_FN_CONST ulong
13 0 : fd_snapshot_parser_footprint( ulong max_acc_vecs ) {
14 0 : ulong l = FD_LAYOUT_INIT;
15 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_snapshot_parser_t), sizeof(fd_snapshot_parser_t) );
16 0 : l = FD_LAYOUT_APPEND( l, fd_ssmanifest_parser_align(), fd_ssmanifest_parser_footprint( max_acc_vecs ) );
17 0 : l = FD_LAYOUT_APPEND( l, 16UL, 1UL<<20UL );
18 0 : return FD_LAYOUT_FINI( l, fd_snapshot_parser_align() );
19 0 : }
20 :
21 : fd_snapshot_parser_t *
22 : fd_snapshot_parser_new( void * mem,
23 : void * cb_arg,
24 : ulong seed,
25 : ulong max_acc_vecs,
26 : fd_snapshot_parser_process_manifest_fn_t manifest_cb,
27 : fd_snapshot_process_acc_hdr_fn_t acc_hdr_cb,
28 0 : fd_snapshot_process_acc_data_fn_t acc_data_cb ) {
29 :
30 0 : if( FD_UNLIKELY( !mem ) ) {
31 0 : FD_LOG_WARNING(( "NULL mem" ));
32 0 : return NULL;
33 0 : }
34 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, fd_snapshot_parser_align() ) ) ) {
35 0 : FD_LOG_WARNING(( "unaligned mem" ));
36 0 : return NULL;
37 0 : }
38 :
39 0 : FD_SCRATCH_ALLOC_INIT( l, mem );
40 0 : fd_snapshot_parser_t * self = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapshot_parser_t), sizeof(fd_snapshot_parser_t) );
41 0 : void * parser = FD_SCRATCH_ALLOC_APPEND( l, fd_ssmanifest_parser_align(), fd_ssmanifest_parser_footprint( max_acc_vecs ) );
42 0 : void * _buf_mem = FD_SCRATCH_ALLOC_APPEND( l, 16UL, 1UL<<20UL );
43 :
44 0 : self->state = SNAP_STATE_TAR;
45 0 : self->flags = 0;
46 0 : self->manifest_done = 0;
47 :
48 0 : self->buf_sz = 0UL;
49 0 : self->buf_ctr = 0UL;
50 0 : self->buf_max = 1UL<<20UL;
51 :
52 0 : self->manifest_parser = fd_ssmanifest_parser_join( fd_ssmanifest_parser_new( parser, max_acc_vecs, seed ) );
53 0 : FD_TEST( self->manifest_parser );
54 :
55 0 : self->buf = _buf_mem;
56 :
57 0 : self->manifest_cb = manifest_cb;
58 0 : self->acc_hdr_cb = acc_hdr_cb;
59 0 : self->acc_data_cb = acc_data_cb;
60 0 : self->cb_arg = cb_arg;
61 :
62 0 : self->metrics.accounts_files_processed = 0UL;
63 0 : self->metrics.accounts_files_total = 0UL;
64 0 : self->metrics.accounts_processed = 0UL;
65 0 : self->processing_accv = 0;
66 0 : self->goff = 0UL;
67 :
68 0 : return self;
69 0 : }
70 :
71 : static void
72 0 : fd_snapshot_parser_discard_buf( fd_snapshot_parser_t * self ) {
73 0 : self->buf_ctr = 0UL;
74 0 : self->buf_sz = 0UL;
75 0 : }
76 :
77 : static void *
78 : fd_snapshot_parser_prepare_buf( fd_snapshot_parser_t * self,
79 0 : ulong sz ) {
80 0 : self->buf_ctr = 0UL;
81 0 : self->buf_sz = 0UL;
82 :
83 0 : fd_snapshot_parser_discard_buf( self );
84 0 : if( FD_UNLIKELY( sz > self->buf_max ) ) {
85 0 : FD_LOG_WARNING(( "Alloc failed (need %lu bytes, have %lu)", sz, self->buf_max ));
86 0 : self->flags |= SNAP_FLAG_FAILED;
87 0 : return NULL;
88 0 : }
89 :
90 0 : return self->buf;
91 0 : }
92 :
93 : static int
94 0 : fd_snapshot_parser_expect_account_hdr( fd_snapshot_parser_t * self ) {
95 0 : ulong accv_sz = self->accv_sz;
96 0 : if( accv_sz < sizeof(fd_solana_account_hdr_t) ) {
97 0 : if( FD_LIKELY( accv_sz==0UL ) ) {
98 0 : self->state = SNAP_STATE_ACCOUNT_HDR;
99 0 : return 0;
100 0 : }
101 0 : FD_LOG_WARNING(( "encountered unexpected EOF while reading account header" ));
102 0 : self->flags |= SNAP_FLAG_FAILED;
103 0 : return EINVAL;
104 0 : }
105 :
106 0 : self->state = SNAP_STATE_ACCOUNT_HDR;
107 0 : self->buf_ctr = 0UL;
108 0 : self->buf_sz = sizeof(fd_solana_account_hdr_t);
109 :
110 0 : return 0;
111 0 : }
112 :
113 : static int
114 : fd_snapshot_parser_accv_prepare( fd_snapshot_parser_t * const self,
115 : fd_tar_meta_t const * const meta,
116 0 : ulong const real_sz ) {
117 :
118 0 : if( FD_UNLIKELY( !fd_snapshot_parser_prepare_buf( self, sizeof(fd_solana_account_hdr_t) ) ) ) {
119 0 : FD_LOG_WARNING(( "Failed to allocate read buffer while restoring accounts from snapshot" ));
120 0 : return ENOMEM;
121 0 : }
122 :
123 : /* Parse file name */
124 0 : ulong id, slot;
125 0 : if( FD_UNLIKELY( sscanf( meta->name, "accounts/%lu.%lu", &slot, &id )!=2 ) ) {
126 : /* Ignore entire file if file name invalid */
127 0 : self->state = SNAP_STATE_IGNORE;
128 0 : return 0;
129 0 : }
130 :
131 0 : ulong sz = fd_ssmanifest_acc_vec_sz( self->manifest_parser, slot, id );
132 0 : if( FD_UNLIKELY( sz==ULONG_MAX ) ) {
133 0 : FD_LOG_DEBUG(( "Ignoring %s (sz %lu)", meta->name, real_sz ));
134 0 : self->state = SNAP_STATE_IGNORE;
135 0 : return 0;
136 0 : }
137 :
138 : /* Validate the supposed file size against real size */
139 0 : if( FD_UNLIKELY( sz>real_sz ) ) {
140 0 : FD_LOG_WARNING(( "AppendVec %lu.%lu is %lu bytes long according to manifest, but actually only %lu bytes",
141 0 : slot, id, sz, real_sz ));
142 0 : self->flags |= SNAP_FLAG_FAILED;
143 0 : return EINVAL;
144 0 : }
145 :
146 0 : self->accv_sz = sz;
147 0 : self->accv_slot = slot;
148 0 : self->accv_id = id;
149 0 : self->processing_accv = 1;
150 :
151 : /* Prepare read of account header */
152 0 : FD_LOG_DEBUG(( "Loading account vec %s", meta->name ));
153 0 : return fd_snapshot_parser_expect_account_hdr( self );
154 0 : }
155 :
156 : /* fd_snapshot_restore_manifest_prepare prepares for consumption of the
157 : snapshot manifest. */
158 :
159 : static int
160 : fd_snapshot_parser_manifest_prepare( fd_snapshot_parser_t * self,
161 0 : ulong sz ) {
162 : /* Only read once */
163 0 : if( self->manifest_done ) {
164 0 : FD_LOG_WARNING(( "Snapshot file contains multiple manifests" ));
165 0 : self->state = SNAP_STATE_IGNORE;
166 0 : return 0;
167 0 : }
168 :
169 0 : self->state = SNAP_STATE_MANIFEST;
170 0 : self->buf_sz = sz;
171 :
172 0 : return 0;
173 0 : }
174 :
175 : static void
176 : fd_snapshot_parser_restore_file( void * self_,
177 : fd_tar_meta_t const * meta,
178 0 : ulong sz ) {
179 0 : fd_snapshot_parser_t * self = self_;
180 :
181 0 : self->buf_ctr = 0UL; /* reset buffer */
182 0 : self->state = SNAP_STATE_IGNORE;
183 :
184 0 : if( (sz==0UL) | (!fd_tar_meta_is_reg( meta )) ) return;
185 :
186 : /* Detect account vec files. These are files that contain a vector
187 : of accounts in Solana Labs "AppendVec" format. */
188 0 : assert( sizeof("accounts/")<FD_TAR_NAME_SZ );
189 0 : if( 0==strncmp( meta->name, "accounts/", sizeof("accounts/")-1) ) {
190 0 : if( FD_UNLIKELY( !self->manifest_done ) ) {
191 0 : FD_LOG_WARNING(( "Unsupported snapshot: encountered AppendVec before manifest" ));
192 0 : self->flags |= SNAP_FLAG_FAILED;
193 0 : return;
194 0 : }
195 0 : fd_snapshot_parser_accv_prepare( self, meta, sz );
196 0 : } else if( fd_memeq( meta->name, "snapshots/status_cache", sizeof("snapshots/status_cache") ) ) {
197 : /* TODO */
198 0 : } else if(0==strncmp( meta->name, "snapshots/", sizeof("snapshots/")-1 ) ) {
199 0 : fd_snapshot_parser_manifest_prepare( self, sz );
200 0 : }
201 :
202 0 : }
203 :
204 : static uchar const *
205 : fd_snapshot_parser_tar_process_hdr( fd_snapshot_parser_t * self,
206 : uchar const * cur,
207 0 : uchar const * end ) {
208 :
209 0 : fd_tar_meta_t const * hdr = (fd_tar_meta_t const *)self->buf;
210 :
211 : /* "ustar\x00" and "ustar \x00" (overlaps with version) are both
212 : valid values for magic. These are POSIX ustar and OLDGNU versions
213 : respectively. */
214 0 : if( FD_UNLIKELY( 0!=memcmp( hdr->magic, FD_TAR_MAGIC, 5UL ) ) ) {
215 :
216 : /* Detect EOF. A TAR EOF is marked by 1024 bytes of zeros.
217 : We abort after 512 bytes. */
218 0 : int not_zero=0;
219 0 : for( ulong i=0UL; i<sizeof(fd_tar_meta_t); i++ )
220 0 : not_zero |= self->buf[ i ];
221 0 : if( !not_zero ) {
222 0 : self->flags |= SNAP_FLAG_DONE;
223 0 : return end;
224 0 : }
225 : /* Not an EOF, so must be a protocol error */
226 0 : ulong goff = self->goff - sizeof(fd_tar_meta_t);
227 0 : FD_LOG_WARNING(( "Invalid tar header magic at goff=0x%lx", goff ));
228 0 : FD_LOG_HEXDUMP_WARNING(( "Tar header", hdr, sizeof(fd_tar_meta_t) ));
229 0 : self->flags |= SNAP_FLAG_FAILED;
230 0 : return cur;
231 0 : }
232 :
233 0 : ulong file_sz = fd_tar_meta_get_size( hdr );
234 0 : if( FD_UNLIKELY( file_sz==ULONG_MAX ) ) {
235 0 : FD_LOG_WARNING(( "Failed to parse file size in tar header" ));
236 0 : self->flags |= SNAP_FLAG_FAILED;
237 0 : return cur;
238 0 : }
239 0 : self->tar_file_rem = file_sz;
240 0 : self->buf_ctr = (ushort)0U;
241 :
242 : /* Call back to recipient */
243 0 : fd_snapshot_parser_restore_file( self, hdr, file_sz );
244 0 : return cur;
245 0 : }
246 :
247 : static uchar const *
248 : fd_snapshot_parser_tar_read_hdr( fd_snapshot_parser_t * self,
249 : uchar const * cur,
250 0 : ulong bufsz ) {
251 0 : uchar const * end = cur+bufsz;
252 :
253 : /* Skip padding */
254 0 : if( self->buf_ctr==0UL ) {
255 0 : ulong pad_sz = fd_ulong_align_up( self->goff, 512UL ) - self->goff;
256 0 : pad_sz = fd_ulong_min( pad_sz, (ulong)( end-cur ) );
257 0 : cur += pad_sz;
258 0 : }
259 :
260 : /* Determine number of bytes to read */
261 0 : long chunk_sz = (long)sizeof(fd_tar_meta_t) - (long)self->buf_ctr;
262 0 : FD_TEST( chunk_sz>=0L );
263 0 : if( end-cur < chunk_sz ) chunk_sz = end-cur;
264 :
265 : /* Copy to header */
266 0 : fd_memcpy( self->buf + self->buf_ctr, cur, (ulong)chunk_sz );
267 0 : cur += chunk_sz;
268 0 : self->buf_ctr += (ulong)chunk_sz;
269 :
270 : /* Handle complete header */
271 0 : if( FD_LIKELY( self->buf_ctr == sizeof(fd_tar_meta_t) ) ) {
272 0 : cur = fd_snapshot_parser_tar_process_hdr( self, cur, end );
273 0 : }
274 :
275 0 : return cur;
276 0 : }
277 :
278 : FD_FN_PURE static inline int
279 0 : fd_snapshot_parser_hdr_read_is_complete( fd_snapshot_parser_t const * self ) {
280 0 : return self->buf_ctr == self->buf_sz;
281 0 : }
282 :
283 : static uchar const *
284 : fd_snapshot_parser_read_buffered( fd_snapshot_parser_t * self,
285 : uchar const * buf,
286 0 : ulong bufsz ) {
287 : /* Should not be called if read is complete */
288 0 : FD_TEST( self->buf_ctr < self->buf_sz );
289 :
290 : /* Determine number of bytes to buffer */
291 0 : ulong sz = self->buf_sz - self->buf_ctr;
292 0 : if( sz>bufsz ) sz = bufsz;
293 :
294 : /* Append to buffer */
295 0 : fd_memcpy( self->buf + self->buf_ctr, buf, sz );
296 0 : self->buf_ctr += sz;
297 :
298 0 : return buf+sz;
299 0 : }
300 :
301 : static uchar const *
302 : fd_snapshot_parser_read_discard( fd_snapshot_parser_t * self,
303 : uchar const * buf,
304 0 : ulong bufsz ) {
305 0 : ulong avail = fd_ulong_min( bufsz, self->tar_file_rem );
306 0 : return buf + avail;
307 0 : }
308 :
309 : /* snapshot_read_manifest_chunk reads partial manifest content. */
310 :
311 : static uchar const *
312 : fd_snapshot_parser_read_manifest_chunk( fd_snapshot_parser_t * self,
313 : uchar const * buf,
314 0 : ulong bufsz ) {
315 0 : FD_TEST( self->buf_ctr < self->buf_sz );
316 :
317 0 : ulong sz = self->buf_sz - self->buf_ctr;
318 0 : if( sz>bufsz ) sz = bufsz;
319 :
320 0 : uchar * dst = self->manifest_buf+sizeof(fd_snapshot_manifest_t);
321 0 : fd_memcpy( dst+self->buf_ctr, buf, sz );
322 0 : self->buf_ctr += sz;
323 :
324 0 : int result = fd_ssmanifest_parser_consume( self->manifest_parser, buf, sz );
325 0 : if( -1==result ) self->flags |= SNAP_FLAG_FAILED;
326 0 : if( 0==result ) {
327 0 : if( self->buf_ctr!=self->buf_sz ) {
328 : /* Some additional trailing garbage */
329 0 : self->flags |= SNAP_FLAG_FAILED;
330 0 : return buf;
331 0 : }
332 :
333 : /* manifest cb */
334 0 : if( FD_LIKELY( self->manifest_cb ) ) self->manifest_cb( self->cb_arg, self->buf_sz );
335 :
336 : /* Discard buffer to reclaim heap space */
337 :
338 0 : fd_snapshot_parser_discard_buf( self );
339 0 : self->manifest_done = 1;
340 0 : self->state = SNAP_STATE_IGNORE;
341 0 : }
342 :
343 0 : return buf+sz;
344 0 : }
345 :
346 : static int
347 0 : fd_snapshot_parser_restore_account_hdr( fd_snapshot_parser_t * self ) {
348 0 : fd_solana_account_hdr_t const * hdr = fd_type_pun_const( self->buf );
349 :
350 0 : if( FD_UNLIKELY( hdr->meta.data_len > FD_ACC_SZ_MAX ) ) {
351 0 : FD_LOG_ERR(( "account data size (%lu) exceeds max (%lu) (possible memory corruption?)", hdr->meta.data_len, FD_ACC_SZ_MAX ));
352 0 : }
353 :
354 0 : ulong data_sz = hdr->meta.data_len;
355 0 : self->acc_sz = data_sz;
356 0 : self->acc_rem = data_sz;
357 0 : self->acc_pad = fd_ulong_align_up( data_sz, 8UL ) - data_sz;
358 :
359 0 : if( FD_UNLIKELY( data_sz>(10UL<<20) ) ) {
360 0 : FD_LOG_ERR(( "Oversize account found (%lu bytes)", data_sz ));
361 0 : }
362 :
363 0 : if( FD_LIKELY( self->acc_hdr_cb ) ) {
364 0 : self->acc_hdr_cb( self->cb_arg, hdr );
365 0 : self->metrics.accounts_processed++;
366 0 : }
367 :
368 : /* Next step */
369 0 : if( FD_LIKELY( data_sz == 0UL ) ) {
370 0 : return fd_snapshot_parser_expect_account_hdr( self );
371 0 : }
372 :
373 0 : self->state = SNAP_STATE_ACCOUNT_DATA;
374 0 : self->buf_ctr = 0UL;
375 0 : self->buf_sz = 0UL;
376 0 : return 0;
377 0 : }
378 :
379 : static uchar const *
380 : fd_snapshot_parser_read_account_hdr_chunk( fd_snapshot_parser_t * self,
381 : uchar const * buf,
382 0 : ulong bufsz ) {
383 0 : if( FD_UNLIKELY( !self->accv_sz ) ) {
384 : /* Reached end of AppendVec */
385 0 : self->state = SNAP_STATE_IGNORE;
386 0 : self->buf_ctr = self->buf_sz = 0UL;
387 0 : return buf;
388 0 : }
389 0 : bufsz = fd_ulong_min( bufsz, self->accv_sz );
390 :
391 0 : uchar const * buf_next = fd_snapshot_parser_read_buffered( self, buf, bufsz );
392 0 : ulong hdr_read = (ulong)(buf_next-buf);
393 0 : self->accv_sz -= hdr_read;
394 0 : bufsz -= hdr_read;
395 :
396 : // ulong peek_sz = 0UL;
397 0 : if( FD_LIKELY( fd_snapshot_parser_hdr_read_is_complete( self ) ) ) {
398 0 : if( FD_UNLIKELY( 0!=fd_snapshot_parser_restore_account_hdr( self ) ) ) {
399 0 : return buf; /* parse error */
400 0 : }
401 : // peek_sz = fd_ulong_min( self->acc_rem, bufsz );
402 0 : }
403 :
404 : // self->acc_rem -= peek_sz;
405 : // self->accv_sz -= peek_sz;
406 : // buf_next += peek_sz;
407 :
408 0 : return buf_next;
409 0 : }
410 :
411 : static uchar const *
412 : fd_snapshot_parser_read_account_chunk( fd_snapshot_parser_t * self,
413 : uchar const * buf,
414 0 : ulong bufsz ) {
415 :
416 0 : ulong chunk_sz = fd_ulong_min( self->acc_rem, bufsz );
417 0 : if( FD_UNLIKELY( chunk_sz > self->accv_sz ) )
418 0 : FD_LOG_CRIT(( "OOB account vec read: chunk_sz=%lu accv_sz=%lu", chunk_sz, self->accv_sz ));
419 :
420 0 : if( FD_LIKELY( chunk_sz ) ) {
421 :
422 : /* TODO: make callback here */
423 0 : if( FD_LIKELY( self->acc_data_cb ) ) self->acc_data_cb( self->cb_arg, buf, chunk_sz );
424 :
425 0 : self->acc_rem -= chunk_sz;
426 0 : self->accv_sz -= chunk_sz;
427 0 : buf += chunk_sz;
428 0 : bufsz -= chunk_sz;
429 :
430 0 : }
431 :
432 0 : if( FD_UNLIKELY( self->acc_rem == 0UL ) ) {
433 0 : ulong pad_sz = fd_ulong_min( fd_ulong_min( self->acc_pad, bufsz ), self->accv_sz );
434 0 : buf += pad_sz;
435 0 : bufsz -= pad_sz;
436 0 : self->acc_pad -= pad_sz;
437 0 : self->accv_sz -= pad_sz;
438 :
439 0 : if( FD_UNLIKELY( self->accv_sz == 0UL ) ) {
440 0 : self->state = SNAP_STATE_IGNORE;
441 0 : return buf;
442 0 : }
443 0 : if( FD_UNLIKELY( self->acc_pad == 0UL ) ) {
444 0 : return (0==fd_snapshot_parser_expect_account_hdr( self )) ? buf : NULL;
445 0 : }
446 0 : }
447 :
448 0 : return buf;
449 0 : }
450 :
451 : uchar const *
452 : fd_snapshot_parser_process_chunk( fd_snapshot_parser_t * self,
453 : uchar const * buf,
454 0 : ulong bufsz ) {
455 0 : uchar const * buf_next = NULL;
456 0 : if( FD_UNLIKELY( self->state==SNAP_STATE_TAR ) ) {
457 0 : buf_next = fd_snapshot_parser_tar_read_hdr( self, buf, bufsz );
458 0 : ulong consumed = (ulong)buf_next - (ulong)buf;
459 0 : self->goff += consumed;
460 0 : self->goff += self->tar_file_rem;
461 0 : return buf_next;
462 0 : }
463 :
464 0 : bufsz = fd_ulong_min( bufsz, self->tar_file_rem );
465 :
466 0 : switch( self->state ) {
467 0 : case SNAP_STATE_ACCOUNT_DATA:
468 0 : buf_next = fd_snapshot_parser_read_account_chunk( self, buf, bufsz );
469 0 : break;
470 0 : case SNAP_STATE_ACCOUNT_HDR:
471 0 : buf_next = fd_snapshot_parser_read_account_hdr_chunk( self, buf, bufsz );
472 0 : break;
473 0 : case SNAP_STATE_IGNORE:
474 0 : buf_next = fd_snapshot_parser_read_discard( self, buf, bufsz );
475 0 : break;
476 0 : case SNAP_STATE_MANIFEST:
477 0 : buf_next = fd_snapshot_parser_read_manifest_chunk( self, buf, bufsz );
478 0 : break;
479 0 : default:
480 0 : FD_LOG_ERR(( "Invalid parser state %u (this is a bug)", self->state ));
481 0 : }
482 :
483 0 : ulong consumed = (ulong)buf_next - (ulong)buf;
484 0 : if( FD_UNLIKELY( consumed>bufsz ) ) FD_LOG_CRIT(( "Buffer overflow (consumed=%lu bufsz=%lu)", consumed, bufsz ));
485 0 : self->tar_file_rem -= consumed;
486 0 : if( self->tar_file_rem==0UL ) {
487 0 : fd_snapshot_parser_reset_tar( self );
488 0 : if( self->processing_accv ) {
489 0 : self->metrics.accounts_files_processed++;
490 0 : }
491 0 : }
492 0 : return buf_next;
493 0 : }
|