Line data Source code
1 : #include "fd_blockstore.h"
2 : #include <errno.h>
3 : #include <fcntl.h>
4 : #include <string.h>
5 : #include <stdio.h> /* snprintf */
6 : #include <unistd.h>
7 :
8 : void *
9 : fd_blockstore_new( void * shmem,
10 : ulong wksp_tag,
11 : ulong seed,
12 : ulong shred_max,
13 : ulong block_max,
14 : ulong idx_max,
15 0 : ulong txn_max ) {
16 0 : fd_blockstore_t * blockstore = (fd_blockstore_t *)shmem;
17 :
18 0 : if( FD_UNLIKELY( !blockstore ) ) {
19 0 : FD_LOG_WARNING(( "NULL blockstore" ));
20 0 : return NULL;
21 0 : }
22 :
23 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)blockstore, fd_blockstore_align() ) )) {
24 0 : FD_LOG_WARNING(( "misaligned blockstore" ));
25 0 : return NULL;
26 0 : }
27 :
28 0 : if( FD_UNLIKELY( !wksp_tag ) ) {
29 0 : FD_LOG_WARNING(( "bad wksp_tag" ));
30 0 : return NULL;
31 0 : }
32 :
33 0 : fd_wksp_t * wksp = fd_wksp_containing( blockstore );
34 0 : if( FD_UNLIKELY( !wksp ) ) {
35 0 : FD_LOG_WARNING(( "shmem must be part of a workspace" ));
36 0 : return NULL;
37 0 : }
38 :
39 0 : fd_memset( blockstore, 0, fd_blockstore_footprint( shred_max, block_max, idx_max, txn_max ) );
40 :
41 0 : int lg_idx_max = fd_ulong_find_msb( fd_ulong_pow2_up( idx_max ) );
42 :
43 0 : FD_SCRATCH_ALLOC_INIT( l, shmem );
44 0 : blockstore = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_blockstore_t), sizeof(fd_blockstore_t) );
45 0 : void * shred_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_buf_shred_pool_align(), fd_buf_shred_pool_footprint( shred_max ) );
46 0 : void * shred_map = FD_SCRATCH_ALLOC_APPEND( l, fd_buf_shred_map_align(), fd_buf_shred_map_footprint( shred_max ) );
47 0 : void * block_map = FD_SCRATCH_ALLOC_APPEND( l, fd_block_map_align(), fd_block_map_footprint( block_max ) );
48 0 : void * block_idx = FD_SCRATCH_ALLOC_APPEND( l, fd_block_idx_align(), fd_block_idx_footprint( lg_idx_max ) );
49 0 : void * slot_deque = FD_SCRATCH_ALLOC_APPEND( l, fd_slot_deque_align(), fd_slot_deque_footprint( block_max ) );
50 0 : void * txn_map = FD_SCRATCH_ALLOC_APPEND( l, fd_txn_map_align(), fd_txn_map_footprint( txn_max ) );
51 0 : void * alloc = FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
52 0 : FD_SCRATCH_ALLOC_FINI( l, fd_blockstore_align() );
53 :
54 0 : blockstore->blockstore_gaddr = fd_wksp_gaddr_fast( wksp, blockstore );
55 0 : blockstore->wksp_tag = wksp_tag;
56 0 : blockstore->seed = seed;
57 :
58 0 : FD_COMPILER_MFENCE();
59 0 : fd_rwseq_new( &blockstore->lock );
60 0 : FD_COMPILER_MFENCE();
61 :
62 0 : blockstore->archiver = (fd_blockstore_archiver_t){
63 0 : .magic = FD_BLOCKSTORE_MAGIC,
64 0 : .fd_size_max = FD_BLOCKSTORE_ARCHIVE_MIN_SIZE,
65 0 : .head = FD_BLOCKSTORE_ARCHIVE_START,
66 0 : .tail = FD_BLOCKSTORE_ARCHIVE_START,
67 0 : .num_blocks = 0,
68 0 : };
69 :
70 0 : blockstore->lps = FD_SLOT_NULL;
71 0 : blockstore->hcs = FD_SLOT_NULL;
72 0 : blockstore->smr = FD_SLOT_NULL;
73 :
74 0 : blockstore->shred_max = shred_max;
75 0 : blockstore->block_max = block_max;
76 0 : blockstore->idx_max = idx_max;
77 0 : blockstore->txn_max = txn_max;
78 :
79 0 : blockstore->shred_pool_gaddr = fd_wksp_gaddr( wksp, fd_buf_shred_pool_join( fd_buf_shred_pool_new( shred_pool, shred_max ) ) );
80 0 : blockstore->shred_map_gaddr = fd_wksp_gaddr( wksp, fd_buf_shred_map_join( fd_buf_shred_map_new( shred_map, shred_max, seed ) ) );
81 0 : blockstore->block_map_gaddr = fd_wksp_gaddr( wksp, fd_block_map_join( fd_block_map_new( block_map, block_max, seed ) ) );
82 0 : blockstore->block_idx_gaddr = fd_wksp_gaddr( wksp, fd_block_idx_join( fd_block_idx_new( block_idx, lg_idx_max ) ) );
83 0 : blockstore->slot_deque_gaddr = fd_wksp_gaddr( wksp, fd_slot_deque_join (fd_slot_deque_new( slot_deque, block_max ) ) );
84 0 : blockstore->txn_map_gaddr = fd_wksp_gaddr( wksp, fd_txn_map_join (fd_txn_map_new( txn_map, txn_max, seed ) ) );
85 0 : blockstore->alloc_gaddr = fd_wksp_gaddr( wksp, fd_alloc_join (fd_alloc_new( alloc, wksp_tag ), wksp_tag ) );
86 :
87 0 : FD_TEST( blockstore->shred_pool_gaddr );
88 0 : FD_TEST( blockstore->shred_map_gaddr );
89 0 : FD_TEST( blockstore->block_map_gaddr );
90 0 : FD_TEST( blockstore->block_idx_gaddr );
91 0 : FD_TEST( blockstore->slot_deque_gaddr );
92 0 : FD_TEST( blockstore->txn_map_gaddr );
93 0 : FD_TEST( blockstore->alloc_gaddr );
94 :
95 0 : FD_COMPILER_MFENCE();
96 0 : FD_VOLATILE( blockstore->magic ) = FD_BLOCKSTORE_MAGIC;
97 0 : FD_COMPILER_MFENCE();
98 :
99 0 : return (void *)blockstore;
100 0 : }
101 :
102 : fd_blockstore_t *
103 0 : fd_blockstore_join( void * shblockstore ) {
104 0 : fd_blockstore_t * blockstore = (fd_blockstore_t *)shblockstore;
105 :
106 0 : if( FD_UNLIKELY( !blockstore ) ) {
107 0 : FD_LOG_WARNING(( "NULL shblockstore" ));
108 0 : return NULL;
109 0 : }
110 :
111 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)blockstore, fd_blockstore_align() ) )) {
112 0 : FD_LOG_WARNING(( "misaligned shblockstore" ));
113 0 : return NULL;
114 0 : }
115 :
116 0 : if( FD_UNLIKELY( blockstore->magic != FD_BLOCKSTORE_MAGIC ) ) {
117 0 : FD_LOG_WARNING(( "bad magic" ));
118 0 : return NULL;
119 0 : }
120 :
121 0 : return blockstore;
122 0 : }
123 :
124 : void *
125 0 : fd_blockstore_leave( fd_blockstore_t * blockstore ) {
126 :
127 0 : if( FD_UNLIKELY( !blockstore ) ) {
128 0 : FD_LOG_WARNING(( "NULL blockstore" ));
129 0 : return NULL;
130 0 : }
131 :
132 0 : fd_wksp_t * wksp = fd_wksp_containing( blockstore );
133 0 : if( FD_UNLIKELY( !wksp ) ) {
134 0 : FD_LOG_WARNING(( "shmem must be part of a workspace" ));
135 0 : return NULL;
136 0 : }
137 :
138 0 : FD_TEST( fd_buf_shred_pool_leave( fd_blockstore_shred_pool( blockstore ) ) );
139 0 : FD_TEST( fd_buf_shred_map_leave( fd_blockstore_shred_map( blockstore ) ) );
140 0 : FD_TEST( fd_block_map_leave( fd_blockstore_block_map( blockstore ) ) );
141 0 : FD_TEST( fd_block_idx_leave( fd_blockstore_block_idx( blockstore ) ) );
142 0 : FD_TEST( fd_slot_deque_leave( fd_blockstore_slot_deque( blockstore ) ) );
143 0 : FD_TEST( fd_txn_map_leave( fd_blockstore_txn_map( blockstore ) ) );
144 0 : FD_TEST( fd_alloc_leave( fd_blockstore_alloc( blockstore ) ) );
145 :
146 0 : return (void *)blockstore;
147 0 : }
148 :
149 : void *
150 0 : fd_blockstore_delete( void * shblockstore ) {
151 0 : fd_blockstore_t * blockstore = (fd_blockstore_t *)shblockstore;
152 :
153 0 : if( FD_UNLIKELY( !blockstore ) ) {
154 0 : FD_LOG_WARNING(( "NULL shblockstore" ));
155 0 : return NULL;
156 0 : }
157 :
158 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)blockstore, fd_blockstore_align() ) )) {
159 0 : FD_LOG_WARNING(( "misaligned shblockstore" ));
160 0 : return NULL;
161 0 : }
162 :
163 0 : if( FD_UNLIKELY( blockstore->magic != FD_BLOCKSTORE_MAGIC ) ) {
164 0 : FD_LOG_WARNING(( "bad magic" ));
165 0 : return NULL;
166 0 : }
167 :
168 0 : fd_wksp_t * wksp = fd_wksp_containing( blockstore );
169 0 : if( FD_UNLIKELY( !wksp ) ) {
170 0 : FD_LOG_WARNING(( "shmem must be part of a workspace" ));
171 0 : return NULL;
172 0 : }
173 :
174 : /* Delete all structures. */
175 :
176 0 : FD_TEST( fd_buf_shred_pool_delete( fd_blockstore_shred_pool( blockstore ) ) );
177 0 : FD_TEST( fd_buf_shred_map_delete( fd_blockstore_shred_map( blockstore ) ) );
178 0 : FD_TEST( fd_block_map_delete( fd_blockstore_block_map( blockstore ) ) );
179 0 : FD_TEST( fd_block_idx_delete( fd_blockstore_block_idx( blockstore ) ) );
180 0 : FD_TEST( fd_slot_deque_delete( fd_blockstore_slot_deque( blockstore ) ) );
181 0 : FD_TEST( fd_txn_map_delete( fd_blockstore_txn_map( blockstore ) ) );
182 0 : FD_TEST( fd_alloc_delete( fd_blockstore_alloc( blockstore ) ) );
183 :
184 0 : FD_COMPILER_MFENCE();
185 0 : FD_VOLATILE( blockstore->magic ) = 0UL;
186 0 : FD_COMPILER_MFENCE();
187 :
188 0 : return blockstore;
189 0 : }
190 :
191 0 : static inline void check_read_write_err( int err ) {
192 0 : if( FD_UNLIKELY( err < 0 ) ) {
193 0 : FD_LOG_ERR(( "unexpected EOF %s", strerror( errno ) ));
194 0 : }
195 0 : if( FD_UNLIKELY( err > 0 ) ) {
196 0 : FD_LOG_ERR(( "unable to read/write %s", strerror( errno ) ));
197 0 : }
198 0 : }
199 :
200 : ulong
201 0 : fd_blockstore_archiver_lrw_slot( fd_blockstore_t * blockstore, int fd, fd_block_map_t * lrw_block_map, fd_block_t * lrw_block ) {
202 0 : fd_block_idx_t * block_idx = fd_blockstore_block_idx( blockstore );
203 0 : if ( FD_UNLIKELY ( fd_block_idx_key_cnt( block_idx ) == 0 ) ) {
204 0 : return FD_SLOT_NULL;
205 0 : }
206 :
207 0 : fd_block_idx_t lrw_block_idx = { 0 };
208 0 : lrw_block_idx.off = blockstore->archiver.head;
209 0 : int err = fd_blockstore_block_meta_restore(&blockstore->archiver, fd, &lrw_block_idx, lrw_block_map, lrw_block);
210 0 : check_read_write_err( err );
211 0 : return lrw_block_map->slot;
212 0 : }
213 :
214 : bool
215 0 : fd_blockstore_archiver_verify( fd_blockstore_t * blockstore, fd_blockstore_archiver_t * fd_metadata ) {
216 0 : return ( fd_metadata->head < FD_BLOCKSTORE_ARCHIVE_START )
217 0 : || ( fd_metadata->tail < FD_BLOCKSTORE_ARCHIVE_START )
218 0 : || ( fd_metadata->fd_size_max != blockstore->archiver.fd_size_max ) // should be initialized same as archive file
219 0 : || ( fd_metadata->magic != FD_BLOCKSTORE_MAGIC );
220 0 : }
221 :
222 : #define check_read_err_safe( cond, msg ) \
223 0 : do { \
224 0 : if( FD_UNLIKELY( cond ) ) { \
225 0 : FD_LOG_WARNING(( "[%s] %s", __func__, msg )); \
226 0 : return FD_BLOCKSTORE_ERR_SLOT_MISSING; \
227 0 : } \
228 0 : } while(0);
229 :
230 : /* Where read_off is where to start reading from */
231 : /* Guarantees that read_off is at the end of what we just finished on return. */
232 : static int read_with_wraparound( fd_blockstore_archiver_t * archvr,
233 : int fd,
234 : uchar * dst,
235 : ulong dst_sz,
236 : ulong * rsz,
237 0 : ulong * read_off ) {
238 0 : check_read_err_safe( lseek( fd, (long)*read_off, SEEK_SET ) == -1,
239 0 : "failed to seek to read offset" );
240 :
241 0 : ulong remaining_sz = archvr->fd_size_max - *read_off;
242 0 : if ( remaining_sz < dst_sz ) {
243 0 : int err = fd_io_read( fd, dst, remaining_sz, remaining_sz, rsz );
244 0 : check_read_err_safe( err, "failed to read file near end" );
245 0 : *read_off = FD_BLOCKSTORE_ARCHIVE_START;
246 0 : check_read_err_safe( lseek( fd, (long)*read_off, SEEK_SET ) == -1,
247 0 : "failed to seek to file start" );
248 0 : err = fd_io_read( fd, dst + remaining_sz, dst_sz - remaining_sz, dst_sz - remaining_sz, rsz );
249 0 : check_read_err_safe( err, "failed to read file near start" );
250 0 : *read_off = FD_BLOCKSTORE_ARCHIVE_START + *rsz;
251 0 : } else {
252 0 : int err = fd_io_read( fd, dst, dst_sz, dst_sz, rsz );
253 0 : check_read_err_safe( err, "failed to read file" );
254 0 : *read_off += *rsz;
255 0 : }
256 : // if we read to EOF, set read_off ready for next read
257 : // In reality should never be > blockstore->fd_size_max
258 0 : if ( *read_off >= archvr->fd_size_max ) {
259 0 : *read_off = FD_BLOCKSTORE_ARCHIVE_START;
260 0 : }
261 :
262 0 : return FD_BLOCKSTORE_OK;
263 0 : }
264 :
265 : static ulong
266 0 : wrap_offset( fd_blockstore_archiver_t * archvr, ulong off ) {
267 0 : if ( off == archvr->fd_size_max ) {
268 0 : return FD_BLOCKSTORE_ARCHIVE_START;
269 0 : } else if ( off > archvr->fd_size_max ) {
270 0 : return FD_BLOCKSTORE_ARCHIVE_START + ( off - archvr->fd_size_max );
271 0 : } else {
272 0 : return off;
273 0 : }
274 0 : }
275 :
276 : /* Build the archival file index */
277 :
278 0 : static inline void build_idx( fd_blockstore_t * blockstore, int fd ) {
279 0 : if ( FD_UNLIKELY( fd == -1 ) ) {
280 0 : return;
281 0 : }
282 :
283 0 : FD_LOG_NOTICE(( "[%s] building index of blockstore archival file", __func__ ));
284 :
285 0 : fd_block_idx_t * block_idx = fd_blockstore_block_idx( blockstore );
286 0 : fd_block_map_t block_map_out = { 0 };
287 0 : fd_block_t block_out = { 0 };
288 :
289 0 : off_t sz = lseek( fd, 0, SEEK_END );
290 0 : if ( FD_UNLIKELY( sz == -1 ) ) {
291 0 : FD_LOG_ERR(( "unable to seek to end of archival file %s", strerror( errno ) ));
292 0 : } else if ( FD_UNLIKELY( sz == 0 ) ) { /* empty file */
293 0 : return;
294 0 : }
295 :
296 0 : lseek( fd, 0, SEEK_SET );
297 0 : int err = 0;
298 0 : ulong rsz = 0;
299 :
300 0 : fd_blockstore_archiver_t metadata;
301 0 : err = fd_io_read( fd, &metadata, sizeof(fd_blockstore_archiver_t), sizeof(fd_blockstore_archiver_t), &rsz );
302 0 : check_read_write_err( err );
303 0 : if ( fd_blockstore_archiver_verify( blockstore, &metadata ) ) {
304 0 : FD_LOG_ERR(( "[%s] archival file was invalid: blockstore may have been crashed or been killed mid-write.", __func__ ));
305 0 : return;
306 0 : }
307 :
308 0 : blockstore->archiver = metadata;
309 0 : ulong off = metadata.head;
310 0 : ulong total_blocks = metadata.num_blocks;
311 0 : ulong blocks_read = 0;
312 :
313 : /* If the file has content, but is perfectly filled, then off == end at the start.
314 : Then it is impossible to distinguish from an empty file except for num_blocks field. */
315 :
316 0 : while ( FD_LIKELY( blocks_read < total_blocks ) ) {
317 0 : blocks_read++;
318 0 : fd_block_idx_t block_idx_entry = { 0 };
319 0 : block_idx_entry.off = off;
320 0 : err = fd_blockstore_block_meta_restore( &blockstore->archiver, fd, &block_idx_entry, &block_map_out, &block_out );
321 0 : check_read_write_err( err );
322 :
323 0 : if( FD_UNLIKELY( fd_block_idx_key_cnt( block_idx ) == fd_block_idx_key_max( block_idx ) ) ) {
324 : /* evict a block */
325 0 : fd_block_map_t lrw_block_map;
326 0 : fd_block_t lrw_block;
327 0 : ulong lrw_slot = fd_blockstore_archiver_lrw_slot( blockstore, fd, &lrw_block_map, &lrw_block );
328 :
329 0 : fd_block_idx_t * lrw_block_index = fd_block_idx_query( block_idx, lrw_slot, NULL );
330 0 : fd_block_idx_remove( block_idx, lrw_block_index );
331 :
332 0 : blockstore->archiver.head = wrap_offset(&blockstore->archiver, blockstore->archiver.head + lrw_block.data_sz + sizeof(fd_block_map_t) + sizeof(fd_block_t));;
333 0 : blockstore->archiver.num_blocks--;
334 0 : }
335 0 : if ( FD_UNLIKELY( fd_block_idx_query( block_idx, block_map_out.slot, NULL ) ) ) {
336 0 : FD_LOG_ERR(( "[%s] existing archival file contained duplicates of slot %lu", __func__, block_map_out.slot ));
337 0 : }
338 :
339 0 : fd_block_idx_t * idx_entry = fd_block_idx_insert( block_idx, block_map_out.slot );
340 0 : idx_entry->off = off;
341 0 : idx_entry->block_hash = block_map_out.block_hash;
342 0 : idx_entry->bank_hash = block_map_out.bank_hash;
343 0 : blockstore->mrw_slot = block_map_out.slot;
344 :
345 0 : FD_LOG_NOTICE(( "[%s] read block (%lu/%lu) at offset: %lu. slot no: %lu", __func__, blocks_read, total_blocks, off, block_map_out.slot ));
346 :
347 : /* seek past data */
348 0 : off = wrap_offset( &blockstore->archiver, off + sizeof(fd_block_map_t) + sizeof(fd_block_t) + block_out.data_sz );
349 0 : check_read_write_err( lseek( fd, (long)off, SEEK_SET ) == -1);
350 0 : }
351 0 : FD_LOG_NOTICE(( "[%s] successfully indexed blockstore archival file. entries: %lu", __func__, fd_block_idx_key_cnt( block_idx ) ));
352 0 : }
353 :
354 : fd_blockstore_t *
355 0 : fd_blockstore_init( fd_blockstore_t * blockstore, int fd, ulong fd_size_max, fd_slot_bank_t const * slot_bank ) {
356 0 : if ( fd_size_max < FD_BLOCKSTORE_ARCHIVE_MIN_SIZE ) {
357 0 : FD_LOG_ERR(( "archive file size too small" ));
358 0 : return NULL;
359 0 : }
360 0 : blockstore->archiver.fd_size_max = fd_size_max;
361 :
362 0 : build_idx( blockstore, fd );
363 0 : lseek( fd, 0, SEEK_END );
364 :
365 : /* initialize fields using slot bank */
366 :
367 0 : ulong smr = slot_bank->slot;
368 :
369 0 : blockstore->lps = smr;
370 0 : blockstore->hcs = smr;
371 0 : blockstore->smr = smr;
372 0 : blockstore->wmk = smr;
373 :
374 0 : fd_block_map_t * block_map_entry = fd_block_map_insert( fd_blockstore_block_map( blockstore ),
375 0 : &smr );
376 :
377 0 : block_map_entry->parent_slot = slot_bank->prev_slot;
378 0 : memset( block_map_entry->child_slots, UCHAR_MAX, FD_BLOCKSTORE_CHILD_SLOT_MAX * sizeof( ulong ) );
379 0 : block_map_entry->child_slot_cnt = 0;
380 :
381 0 : block_map_entry->height = slot_bank->block_height;
382 0 : memcpy(&block_map_entry->block_hash, slot_bank->block_hash_queue.last_hash, sizeof(fd_hash_t));
383 0 : block_map_entry->bank_hash = slot_bank->banks_hash;
384 0 : block_map_entry->block_hash = slot_bank->poh;
385 0 : block_map_entry->flags = fd_uchar_set_bit(
386 0 : fd_uchar_set_bit(
387 0 : fd_uchar_set_bit(
388 0 : fd_uchar_set_bit(
389 0 : fd_uchar_set_bit( block_map_entry->flags,
390 0 : FD_BLOCK_FLAG_COMPLETED ),
391 0 : FD_BLOCK_FLAG_PROCESSED ),
392 0 : FD_BLOCK_FLAG_EQVOCSAFE ),
393 0 : FD_BLOCK_FLAG_CONFIRMED ),
394 0 : FD_BLOCK_FLAG_FINALIZED );
395 0 : block_map_entry->reference_tick = 0;
396 0 : block_map_entry->ts = 0;
397 :
398 0 : block_map_entry->consumed_idx = 0;
399 0 : block_map_entry->received_idx = 0;
400 0 : block_map_entry->complete_idx = 0;
401 :
402 : /* This creates an empty allocation for a block, to "facade" that we
403 : have this particular block (even though we don't). This is useful
404 : to avoid special-casing various blockstore APIs.
405 :
406 : This should only ever be done for the snapshot slot, after booting
407 : up from the snapshot. */
408 :
409 0 : fd_block_t * block = fd_alloc_malloc( fd_blockstore_alloc( blockstore ),
410 0 : alignof(fd_block_t),
411 0 : sizeof(fd_block_t) );
412 :
413 : /* Point to the fake block. */
414 :
415 0 : block_map_entry->block_gaddr = fd_wksp_gaddr_fast( fd_blockstore_wksp( blockstore ), block );
416 :
417 : /* Set all fields to 0. Caller's responsibility to check gaddr and sz != 0. */
418 :
419 0 : memset( block, 0, sizeof(fd_block_t) );
420 :
421 0 : return blockstore;
422 0 : }
423 :
424 : void
425 0 : fd_blockstore_fini( fd_blockstore_t * blockstore ) {
426 :
427 : /* Free all allocations by removing all slots (whether they are
428 : complete or not). */
429 :
430 0 : for( fd_block_map_iter_t iter = fd_block_map_iter_init( fd_blockstore_block_map( blockstore ) );
431 0 : !fd_block_map_iter_done( fd_blockstore_block_map( blockstore ), iter );
432 0 : iter = fd_block_map_iter_next( fd_blockstore_block_map( blockstore ), iter ) ) {
433 0 : fd_block_map_t * ele = fd_block_map_iter_ele( fd_blockstore_block_map( blockstore ), iter );
434 0 : fd_blockstore_slot_remove( blockstore, ele->slot );
435 0 : }
436 0 : }
437 :
438 : /* txn map helpers */
439 :
440 : int
441 0 : fd_txn_key_equal( fd_txn_key_t const * k0, fd_txn_key_t const * k1 ) {
442 0 : for( ulong i = 0; i < FD_ED25519_SIG_SZ / sizeof( ulong ); ++i )
443 0 : if( k0->v[i] != k1->v[i] ) return 0;
444 0 : return 1;
445 0 : }
446 :
447 : ulong
448 0 : fd_txn_key_hash( fd_txn_key_t const * k, ulong seed ) {
449 0 : ulong h = seed;
450 0 : for( ulong i = 0; i < FD_ED25519_SIG_SZ / sizeof( ulong ); ++i )
451 0 : h ^= k->v[i];
452 0 : return h;
453 0 : }
454 :
455 : static void
456 0 : fd_blockstore_scan_block( fd_blockstore_t * blockstore, ulong slot, fd_block_t * block ) {
457 :
458 0 : #define MAX_MICROS ( 1 << 17 )
459 0 : fd_block_micro_t micros[MAX_MICROS];
460 0 : ulong micros_cnt = 0;
461 0 : #define MAX_TXNS ( 1 << 17 )
462 0 : fd_block_txn_t txns[MAX_TXNS];
463 0 : ulong txns_cnt = 0;
464 :
465 : /*
466 : * Agave decodes precisely one array of microblocks from each batch.
467 : * As of bincode version 1.3.3, the default deserializer used when
468 : * decoding a batch in the blockstore allows for trailing bytes to be
469 : * ignored.
470 : * https://github.com/anza-xyz/agave/blob/v2.1.0/ledger/src/blockstore.rs#L3764
471 : */
472 0 : uchar allow_trailing = 1UL;
473 :
474 0 : uchar const * data = fd_blockstore_block_data_laddr( blockstore, block );
475 0 : FD_LOG_DEBUG(( "scanning slot %lu, ptr %p, sz %lu", slot, (void *)data, block->data_sz ));
476 :
477 0 : fd_block_entry_batch_t const * batch_laddr = fd_blockstore_block_batch_laddr( blockstore, block );
478 0 : ulong const batch_cnt = block->batch_cnt;
479 :
480 0 : ulong blockoff = 0UL;
481 0 : for( ulong batch_i = 0UL; batch_i < batch_cnt; batch_i++ ) {
482 0 : ulong const batch_end_off = batch_laddr[ batch_i ].end_off;
483 0 : if( blockoff + sizeof( ulong ) > batch_end_off ) FD_LOG_ERR(( "premature end of batch" ));
484 0 : ulong mcount = FD_LOAD( ulong, data + blockoff );
485 0 : blockoff += sizeof( ulong );
486 :
487 : /* Loop across microblocks */
488 0 : for( ulong mblk = 0; mblk < mcount; ++mblk ) {
489 0 : if( blockoff + sizeof( fd_microblock_hdr_t ) > batch_end_off )
490 0 : FD_LOG_ERR(( "premature end of batch" ));
491 0 : if( micros_cnt < MAX_MICROS ) {
492 0 : fd_block_micro_t * m = micros + ( micros_cnt++ );
493 0 : m->off = blockoff;
494 0 : }
495 0 : fd_microblock_hdr_t * hdr = (fd_microblock_hdr_t *)( data + blockoff );
496 0 : blockoff += sizeof( fd_microblock_hdr_t );
497 :
498 : /* Loop across transactions */
499 0 : for( ulong txn_idx = 0; txn_idx < hdr->txn_cnt; txn_idx++ ) {
500 0 : uchar txn_out[FD_TXN_MAX_SZ];
501 0 : uchar const * raw = data + blockoff;
502 0 : ulong pay_sz = 0;
503 0 : ulong txn_sz = fd_txn_parse_core( (uchar const *)raw,
504 0 : fd_ulong_min( batch_end_off - blockoff, FD_TXN_MTU ),
505 0 : txn_out,
506 0 : NULL,
507 0 : &pay_sz );
508 0 : if( txn_sz == 0 || txn_sz > FD_TXN_MTU ) {
509 0 : FD_LOG_ERR(( "failed to parse transaction %lu in microblock %lu in slot %lu. txn size: %lu",
510 0 : txn_idx,
511 0 : mblk,
512 0 : slot,
513 0 : txn_sz ));
514 0 : }
515 0 : fd_txn_t const * txn = (fd_txn_t const *)txn_out;
516 :
517 0 : if( pay_sz == 0UL )
518 0 : FD_LOG_ERR(( "failed to parse transaction %lu in microblock %lu in slot %lu",
519 0 : txn_idx,
520 0 : mblk,
521 0 : slot ));
522 :
523 0 : fd_txn_key_t const * sigs =
524 0 : (fd_txn_key_t const *)( (ulong)raw + (ulong)txn->signature_off );
525 0 : fd_txn_map_t * txn_map = fd_blockstore_txn_map( blockstore );
526 0 : for( ulong j = 0; j < txn->signature_cnt; j++ ) {
527 0 : if( FD_UNLIKELY( fd_txn_map_key_cnt( txn_map ) ==
528 0 : fd_txn_map_key_max( txn_map ) ) ) {
529 0 : break;
530 0 : }
531 0 : fd_txn_key_t sig;
532 0 : fd_memcpy( &sig, sigs + j, sizeof( sig ) );
533 0 : fd_txn_map_t * elem = fd_txn_map_insert( txn_map, &sig );
534 0 : if( elem == NULL ) { break; }
535 0 : elem->slot = slot;
536 0 : elem->offset = blockoff;
537 0 : elem->sz = pay_sz;
538 0 : elem->meta_gaddr = 0;
539 0 : elem->meta_sz = 0;
540 :
541 0 : if( txns_cnt < MAX_TXNS ) {
542 0 : fd_block_txn_t * ref = &txns[txns_cnt++];
543 0 : ref->txn_off = blockoff;
544 0 : ref->id_off = (ulong)( sigs + j ) - (ulong)data;
545 0 : ref->sz = pay_sz;
546 0 : }
547 0 : }
548 :
549 0 : blockoff += pay_sz;
550 0 : }
551 0 : }
552 0 : if( FD_UNLIKELY( blockoff > batch_end_off ) ) {
553 0 : FD_LOG_ERR(( "parser error: shouldn't have been allowed to read past batch boundary" ));
554 0 : }
555 0 : if( FD_UNLIKELY( blockoff < batch_end_off ) ) {
556 0 : if( FD_LIKELY( allow_trailing ) ) {
557 0 : FD_LOG_NOTICE(( "ignoring %lu trailing bytes in slot %lu batch %lu", batch_end_off-blockoff, slot, batch_i ));
558 0 : }
559 0 : if( FD_UNLIKELY( !allow_trailing ) ) {
560 0 : FD_LOG_ERR(( "%lu trailing bytes in slot %lu batch %lu", batch_end_off-blockoff, slot, batch_i ));
561 0 : }
562 0 : }
563 0 : blockoff = batch_end_off;
564 0 : }
565 :
566 0 : fd_block_micro_t * micros_laddr =
567 0 : fd_alloc_malloc( fd_blockstore_alloc( blockstore ),
568 0 : alignof( fd_block_micro_t ),
569 0 : sizeof( fd_block_micro_t ) * micros_cnt );
570 0 : fd_memcpy( micros_laddr, micros, sizeof( fd_block_micro_t ) * micros_cnt );
571 0 : block->micros_gaddr = fd_wksp_gaddr_fast( fd_blockstore_wksp( blockstore ), micros_laddr );
572 0 : block->micros_cnt = micros_cnt;
573 :
574 0 : fd_block_txn_t * txns_laddr =
575 0 : fd_alloc_malloc( fd_blockstore_alloc( blockstore ),
576 0 : alignof( fd_block_txn_t ),
577 0 : sizeof( fd_block_txn_t ) * txns_cnt );
578 0 : fd_memcpy( txns_laddr, txns, sizeof( fd_block_txn_t ) * txns_cnt );
579 0 : block->txns_gaddr = fd_wksp_gaddr_fast( fd_blockstore_wksp( blockstore ), txns_laddr );
580 0 : block->txns_cnt = txns_cnt;
581 0 : }
582 :
583 : /* Remove a slot from blockstore */
584 : void
585 0 : fd_blockstore_slot_remove( fd_blockstore_t * blockstore, ulong slot ) {
586 0 : FD_LOG_NOTICE(( "[%s] slot: %lu", __func__, slot ));
587 :
588 0 : fd_block_map_t * block_map_entry = fd_block_map_remove( fd_blockstore_block_map( blockstore ), &slot );
589 0 : if( FD_UNLIKELY( !block_map_entry ) ) return;
590 :
591 : /* It is not safe to remove a replaying block. */
592 :
593 0 : if( FD_UNLIKELY( fd_uchar_extract_bit( block_map_entry->flags, FD_BLOCK_FLAG_REPLAYING ) ) ) {
594 0 : FD_LOG_WARNING(( "[%s] slot %lu has replay in progress. not removing.", __func__, slot ));
595 0 : return;
596 0 : }
597 :
598 : /* Unlink slot from its parent only if it is not published. */
599 :
600 0 : fd_block_map_t * parent_block_map_entry =
601 0 : fd_blockstore_block_map_query( blockstore, block_map_entry->parent_slot );
602 0 : if( FD_LIKELY( parent_block_map_entry ) ) {
603 0 : for( ulong i = 0; i < parent_block_map_entry->child_slot_cnt; i++ ) {
604 0 : if( FD_LIKELY( parent_block_map_entry->child_slots[i] == slot ) ) {
605 0 : parent_block_map_entry->child_slots[i] =
606 0 : parent_block_map_entry->child_slots[--parent_block_map_entry->child_slot_cnt];
607 0 : }
608 0 : }
609 0 : }
610 :
611 : /* block_gaddr 0 indicates it hasn't received all shreds yet.
612 :
613 : TODO refactor to use FD_BLOCK_FLAG_COMPLETED. */
614 :
615 0 : if( FD_LIKELY( block_map_entry->block_gaddr == 0 ) ) {
616 :
617 : /* Remove buf_shreds if there's no block yet (we haven't received all shreds). */
618 :
619 0 : fd_buf_shred_map_t * map = fd_blockstore_shred_map( blockstore );
620 0 : fd_buf_shred_t * pool = fd_blockstore_shred_pool( blockstore );
621 0 : for( uint idx = 0; idx < block_map_entry->received_idx; idx++ ) {
622 0 : fd_shred_key_t key = { .slot = slot, .idx = idx };
623 0 : fd_buf_shred_t * buf_shred = fd_buf_shred_map_ele_remove( map, &key, NULL, pool );
624 0 : if ( FD_LIKELY( buf_shred ) ) {
625 0 : fd_buf_shred_pool_ele_release( pool, buf_shred );
626 0 : }
627 0 : }
628 :
629 : /* Return early because there are no allocations without a block. */
630 :
631 0 : return;
632 0 : }
633 :
634 : /* Remove all the allocations relating to a block. */
635 :
636 0 : fd_wksp_t * wksp = fd_blockstore_wksp( blockstore );
637 0 : fd_alloc_t * alloc = fd_blockstore_alloc( blockstore );
638 :
639 0 : fd_txn_map_t * txn_map = fd_blockstore_txn_map( blockstore );
640 0 : fd_block_t * block = fd_wksp_laddr_fast( wksp, block_map_entry->block_gaddr );
641 :
642 : /* DO THIS FIRST FOR THREAD SAFETY */
643 0 : FD_COMPILER_MFENCE();
644 0 : block_map_entry->block_gaddr = 0;
645 :
646 0 : uchar * data = fd_wksp_laddr_fast( wksp, block->data_gaddr );
647 0 : fd_block_txn_t * txns = fd_wksp_laddr_fast( wksp, block->txns_gaddr );
648 0 : for( ulong j = 0; j < block->txns_cnt; ++j ) {
649 0 : fd_txn_key_t sig;
650 0 : fd_memcpy( &sig, data + txns[j].id_off, sizeof( sig ) );
651 0 : fd_txn_map_remove( txn_map, &sig );
652 0 : }
653 0 : if( block->micros_gaddr ) fd_alloc_free( alloc, fd_wksp_laddr_fast( wksp, block->micros_gaddr ) );
654 0 : if( block->txns_gaddr ) fd_alloc_free( alloc, txns );
655 0 : ulong mgaddr = block->txns_meta_gaddr;
656 0 : while( mgaddr ) {
657 0 : ulong * laddr = fd_wksp_laddr_fast( wksp, mgaddr );
658 0 : ulong mgaddr2 = laddr[0]; /* link to next allocation */
659 0 : fd_alloc_free( alloc, laddr );
660 0 : mgaddr = mgaddr2;
661 0 : }
662 0 : fd_alloc_free( alloc, block );
663 0 : return;
664 0 : }
665 :
666 : int
667 0 : fd_blockstore_buffered_shreds_remove( fd_blockstore_t * blockstore, ulong slot ) {
668 0 : fd_block_map_t * block_map = fd_blockstore_block_map( blockstore );
669 0 : fd_block_map_t * block_map_entry = fd_block_map_query( block_map, &slot, NULL );
670 0 : if( FD_UNLIKELY( !block_map_entry ) ) return FD_BLOCKSTORE_OK;
671 0 : fd_buf_shred_t * shred_pool = fd_blockstore_shred_pool( blockstore );
672 0 : fd_buf_shred_map_t * shred_map = fd_blockstore_shred_map( blockstore );
673 0 : ulong shred_cnt = block_map_entry->complete_idx + 1;
674 0 : for( uint i = 0; i < shred_cnt; i++ ) {
675 0 : fd_shred_key_t key = { .slot = slot, .idx = i };
676 0 : fd_buf_shred_t * ele;
677 0 : while( FD_UNLIKELY(
678 0 : ele = fd_buf_shred_map_ele_remove( shred_map, &key, NULL, shred_pool ) ) )
679 0 : fd_buf_shred_pool_ele_release( shred_pool, ele );
680 0 : }
681 0 : fd_block_map_remove( block_map, &slot );
682 0 : return FD_BLOCKSTORE_OK;
683 0 : }
684 :
685 : /** Where write_off is where we want to write to, and we return
686 : the next valid location to write to (either wraparound, or
687 : right after where we just wrote ) */
688 : static ulong write_with_wraparound( fd_blockstore_archiver_t * archvr,
689 : int fd,
690 : uchar * src,
691 : ulong src_sz,
692 0 : ulong write_off ) {
693 :
694 0 : if ( FD_UNLIKELY( lseek( fd, (long)write_off, SEEK_SET ) == -1 ) ) {
695 0 : FD_LOG_ERR(( "[%s] failed to seek to offset %lu", __func__, write_off ));
696 0 : }
697 0 : ulong wsz;
698 0 : ulong remaining_sz = archvr->fd_size_max - write_off;
699 0 : if ( remaining_sz < src_sz ) {
700 0 : int err = fd_io_write( fd, src, remaining_sz, remaining_sz, &wsz );
701 0 : check_read_write_err( err );
702 0 : write_off = FD_BLOCKSTORE_ARCHIVE_START;
703 0 : if ( FD_UNLIKELY( lseek( fd, (long)write_off, SEEK_SET ) == -1 ) ) {
704 0 : FD_LOG_ERR(( "[%s] failed to seek to offset %lu", __func__, write_off ));
705 0 : }
706 0 : err = fd_io_write( fd, src + remaining_sz, src_sz - remaining_sz, src_sz - remaining_sz, &wsz );
707 0 : check_read_write_err( err );
708 0 : write_off += wsz;
709 0 : } else {
710 0 : int err = fd_io_write( fd, src, src_sz, src_sz, &wsz );
711 0 : check_read_write_err( err );
712 0 : write_off += wsz;
713 0 : }
714 0 : if ( write_off >= archvr->fd_size_max ) {
715 0 : write_off = FD_BLOCKSTORE_ARCHIVE_START;
716 0 : }
717 0 : return write_off;
718 0 : }
719 :
720 0 : static void start_archive_write( fd_blockstore_archiver_t * archvr, int fd ) {
721 : /* Invalidates the blocks that will be overwritten by marking them as free space */
722 0 : if ( FD_UNLIKELY( lseek( fd, 0, SEEK_SET ) == -1 ) ) {
723 0 : FD_LOG_ERR(( "[%s] failed to seek to start", __func__ ));
724 0 : }
725 0 : ulong wsz;
726 0 : int err = fd_io_write( fd, archvr, sizeof(fd_blockstore_archiver_t), sizeof(fd_blockstore_archiver_t), &wsz );
727 0 : check_read_write_err( err );
728 0 : }
729 :
730 : static void end_archive_write( fd_blockstore_archiver_t * archvr,
731 0 : int fd ) {
732 0 : if ( FD_UNLIKELY( lseek( fd, 0, SEEK_SET ) == -1 ) ) {
733 0 : FD_LOG_ERR(( "[%s] failed to seek to start", __func__ ));
734 0 : }
735 0 : ulong wsz;
736 0 : int err = fd_io_write( fd, archvr, sizeof(fd_blockstore_archiver_t), sizeof(fd_blockstore_archiver_t), &wsz );
737 0 : check_read_write_err( err );
738 0 : }
739 :
740 : /* Clears any to be overwritten blocks in the archive from the index and updates archvr */
741 : static void
742 0 : fd_blockstore_lrw_archive_clear( fd_blockstore_t * blockstore, int fd, ulong wsz, ulong write_off ) {
743 0 : fd_blockstore_archiver_t * archvr = &blockstore->archiver;
744 0 : fd_block_idx_t * block_idx = fd_blockstore_block_idx( blockstore );
745 :
746 0 : ulong non_wrapped_end = write_off + wsz;
747 0 : ulong wrapped_end = wrap_offset(archvr, non_wrapped_end);
748 0 : bool mrw_wraps = non_wrapped_end > archvr->fd_size_max;
749 :
750 0 : if ( FD_UNLIKELY( fd_block_idx_key_cnt( block_idx ) == 0 ) ) {
751 0 : return;
752 0 : }
753 :
754 0 : fd_block_map_t lrw_block_map;
755 0 : fd_block_t lrw_block;
756 :
757 0 : ulong lrw_slot = fd_blockstore_archiver_lrw_slot( blockstore, fd, &lrw_block_map, &lrw_block );
758 0 : fd_block_idx_t * lrw_block_index = fd_block_idx_query( block_idx, lrw_slot, NULL );
759 :
760 0 : while( lrw_block_index &&
761 0 : ( ( lrw_block_index->off >= write_off && lrw_block_index->off < non_wrapped_end ) ||
762 0 : ( mrw_wraps && lrw_block_index->off < wrapped_end ) ) ){
763 : /* evict blocks */
764 0 : FD_LOG_DEBUG(( "[%s] overwriting lrw block %lu", __func__, lrw_block_map.slot ));
765 0 : fd_block_idx_remove( block_idx, lrw_block_index );
766 :
767 0 : archvr->head = wrap_offset(archvr, archvr->head + lrw_block.data_sz + sizeof(fd_block_map_t) + sizeof(fd_block_t));
768 0 : archvr->num_blocks--;
769 :
770 0 : lrw_slot = fd_blockstore_archiver_lrw_slot( blockstore, fd, &lrw_block_map, &lrw_block );
771 0 : lrw_block_index = fd_block_idx_query(block_idx, lrw_slot, NULL);
772 :
773 0 : if ( lrw_block_index && (lrw_block_index->off != archvr->head) ){
774 0 : FD_LOG_ERR(( "[%s] block index mismatch %lu != %lu", __func__, lrw_block_index->off, archvr->head ));
775 0 : }
776 0 : }
777 0 : }
778 :
779 : /* Performs any block index & updates mrw after archiving a block. We start guaranteed having */
780 :
781 : static void fd_blockstore_post_checkpt_update( fd_blockstore_t * blockstore,
782 : fd_blockstore_ser_t * ser,
783 : int fd,
784 : ulong slot,
785 : ulong wsz,
786 0 : ulong write_off ) {
787 0 : fd_blockstore_archiver_t * archvr = &blockstore->archiver;
788 0 : fd_block_idx_t * block_idx = fd_blockstore_block_idx( blockstore );
789 :
790 : /* Successfully archived block, so update index and offset. */
791 :
792 0 : if ( fd_block_idx_key_cnt( block_idx ) == fd_block_idx_key_max( block_idx ) ){
793 : /* make space if needed */
794 0 : fd_block_map_t lrw_block_map_out;
795 0 : fd_block_t lrw_block_out;
796 0 : ulong lrw_slot = fd_blockstore_archiver_lrw_slot( blockstore, fd, &lrw_block_map_out, &lrw_block_out );
797 0 : fd_block_idx_t * lrw_block_index = fd_block_idx_query(block_idx, lrw_slot, NULL);
798 0 : fd_block_idx_remove( block_idx, lrw_block_index );
799 :
800 0 : archvr->head = wrap_offset(archvr, archvr->head + lrw_block_out.data_sz + sizeof(fd_block_map_t) + sizeof(fd_block_t));
801 0 : archvr->num_blocks--;
802 0 : }
803 :
804 0 : fd_block_idx_t * idx_entry = fd_block_idx_insert( fd_blockstore_block_idx( blockstore ), slot );
805 0 : idx_entry->off = write_off;
806 0 : idx_entry->block_hash = ser->block_map->block_hash;
807 0 : idx_entry->bank_hash = ser->block_map->bank_hash;
808 :
809 0 : archvr->num_blocks++;
810 0 : archvr->tail = wrap_offset( archvr, write_off + wsz);;
811 0 : blockstore->mrw_slot = slot;
812 0 : }
813 :
814 : ulong
815 : fd_blockstore_block_checkpt( fd_blockstore_t * blockstore,
816 : fd_blockstore_ser_t * ser,
817 : int fd,
818 0 : ulong slot ) {
819 0 : ulong write_off = blockstore->archiver.tail;
820 0 : ulong og_write_off = write_off;
821 0 : if ( FD_UNLIKELY( fd == -1 ) ) {
822 0 : FD_LOG_DEBUG(( "[%s] fd is -1", __func__ ));
823 0 : return 0;
824 0 : }
825 0 : if ( FD_UNLIKELY( lseek( fd, (long)write_off, SEEK_SET ) == -1 ) ) {
826 0 : FD_LOG_ERR(( "[%s] failed to seek to offset %lu", __func__, write_off ));
827 0 : }
828 :
829 0 : ulong total_wsz = sizeof(fd_block_map_t) + sizeof(fd_block_t) + ser->block->data_sz;
830 :
831 : /* clear any potential overwrites */
832 0 : fd_blockstore_lrw_archive_clear( blockstore, fd, total_wsz, write_off );
833 :
834 0 : start_archive_write( &blockstore->archiver, fd );
835 :
836 0 : write_off = write_with_wraparound( &blockstore->archiver, fd, (uchar*)ser->block_map, sizeof(fd_block_map_t), write_off );
837 0 : write_off = write_with_wraparound( &blockstore->archiver, fd, (uchar*)ser->block, sizeof(fd_block_t), write_off );
838 0 : write_off = write_with_wraparound( &blockstore->archiver, fd, ser->data, ser->block->data_sz, write_off );
839 :
840 0 : fd_blockstore_post_checkpt_update( blockstore, ser, fd, slot, total_wsz, og_write_off );
841 :
842 0 : end_archive_write( &blockstore->archiver, fd );
843 :
844 0 : FD_LOG_NOTICE(( "[%s] archived block %lu at %lu: size %lu", __func__, slot, og_write_off, total_wsz ));
845 0 : return total_wsz;
846 0 : }
847 :
848 : int
849 : fd_blockstore_block_meta_restore( fd_blockstore_archiver_t * archvr,
850 : int fd,
851 : fd_block_idx_t * block_idx_entry,
852 : fd_block_map_t * block_map_entry_out,
853 0 : fd_block_t * block_out ) {
854 0 : ulong rsz;
855 0 : ulong read_off = block_idx_entry->off;
856 0 : int err = read_with_wraparound( archvr,
857 0 : fd,
858 0 : (uchar *)fd_type_pun(block_map_entry_out),
859 0 : sizeof(fd_block_map_t),
860 0 : &rsz,
861 0 : &read_off );
862 0 : check_read_err_safe( err, "failed to read block map" );
863 0 : err = read_with_wraparound( archvr,
864 0 : fd,
865 0 : (uchar *)fd_type_pun(block_out),
866 0 : sizeof(fd_block_t),
867 0 : &rsz,
868 0 : &read_off );
869 0 : check_read_err_safe( err, "failed to read block" );
870 0 : return FD_BLOCKSTORE_OK;
871 0 : }
872 :
873 : int
874 : fd_blockstore_block_data_restore( fd_blockstore_archiver_t * archvr,
875 : int fd,
876 : fd_block_idx_t * block_idx_entry,
877 : uchar * buf_out,
878 : ulong buf_max,
879 0 : ulong data_sz ) {
880 0 : ulong data_off = wrap_offset(archvr, block_idx_entry->off + sizeof(fd_block_map_t) + sizeof(fd_block_t));
881 0 : if( FD_UNLIKELY( buf_max < data_sz ) ) {
882 0 : FD_LOG_ERR(( "[%s] data_out_sz %lu < data_sz %lu", __func__, buf_max, data_sz ));
883 0 : return -1;
884 0 : }
885 0 : if( FD_UNLIKELY( lseek( fd, (long)data_off, SEEK_SET ) == -1 ) ) {
886 0 : FD_LOG_WARNING(( "failed to seek" ));
887 0 : return FD_BLOCKSTORE_ERR_SLOT_MISSING;
888 0 : }
889 0 : ulong rsz;
890 0 : int err = read_with_wraparound( archvr, fd, buf_out, data_sz, &rsz, &data_off );
891 0 : check_read_err_safe( err, "failed to read block data" );
892 0 : return FD_BLOCKSTORE_OK;
893 0 : }
894 :
895 : void
896 0 : fd_blockstore_publish( fd_blockstore_t * blockstore, int fd ) {
897 0 : FD_LOG_NOTICE(( "[%s] wmk %lu => smr %lu", __func__, blockstore->wmk, blockstore->smr ));
898 :
899 : /* Caller is incorrectly calling publish. */
900 :
901 0 : if( FD_UNLIKELY( blockstore->wmk == blockstore->smr ) ) {
902 0 : FD_LOG_WARNING(( "[%s] attempting to re-publish when wmk %lu already at smr %lu", __func__, blockstore->wmk, blockstore->smr ));
903 0 : return;
904 0 : }
905 :
906 : /* q uses the slot_deque as the BFS queue */
907 :
908 0 : ulong * q = fd_blockstore_slot_deque( blockstore );
909 :
910 : /* Clear the deque, preparing it to be reused. */
911 :
912 0 : fd_slot_deque_remove_all( q );
913 :
914 : /* Push the watermark onto the queue. */
915 :
916 0 : fd_slot_deque_push_tail( q, blockstore->wmk );
917 :
918 : /* Conduct a BFS to find slots to prune or archive. */
919 :
920 0 : while( !fd_slot_deque_empty( q ) ) {
921 0 : ulong slot = fd_slot_deque_pop_head( q );
922 :
923 0 : fd_block_map_t * block_map_entry = fd_blockstore_block_map_query( blockstore, slot );
924 :
925 : /* Add slot's children to the queue. */
926 :
927 0 : for( ulong i = 0; i < block_map_entry->child_slot_cnt; i++ ) {
928 :
929 : /* Stop upon reaching the SMR. */
930 :
931 0 : if( FD_LIKELY( block_map_entry->child_slots[i] != blockstore->smr ) ) {
932 0 : fd_slot_deque_push_tail( q, block_map_entry->child_slots[i] );
933 0 : }
934 0 : }
935 :
936 : /* Archive the block into a file if it is finalized. */
937 :
938 0 : if( fd_uchar_extract_bit( block_map_entry->flags, FD_BLOCK_FLAG_FINALIZED ) ) {
939 0 : fd_block_t * block = fd_wksp_laddr_fast( fd_blockstore_wksp( blockstore ), block_map_entry->block_gaddr );
940 0 : uchar * data = fd_wksp_laddr_fast( fd_blockstore_wksp( blockstore ), block->data_gaddr );
941 :
942 0 : fd_block_idx_t * block_idx = fd_blockstore_block_idx( blockstore );
943 :
944 0 : if( FD_UNLIKELY( fd_block_idx_query( block_idx, slot, NULL ) ) ) {
945 0 : FD_LOG_ERR(( "[%s] invariant violation. attempted to re-archive finalized block: %lu", __func__, slot ));
946 0 : } else {
947 0 : fd_blockstore_ser_t ser = {
948 0 : .block_map = block_map_entry,
949 0 : .block = block,
950 0 : .data = data
951 0 : };
952 0 : fd_blockstore_block_checkpt( blockstore, &ser, fd, slot );
953 0 : }
954 0 : }
955 :
956 0 : fd_blockstore_slot_remove( blockstore, slot );
957 0 : }
958 :
959 : /* Scan to clean up any orphaned blocks or shreds < new SMR. */
960 :
961 0 : for (ulong slot = blockstore->wmk; slot < blockstore->smr; slot++) {
962 0 : fd_blockstore_slot_remove( blockstore, slot );
963 0 : }
964 :
965 0 : blockstore->wmk = blockstore->smr;
966 :
967 0 : return;
968 0 : }
969 :
970 : /* Deshred into a block once we've received all shreds for a slot. */
971 :
972 : static int
973 0 : deshred( fd_blockstore_t * blockstore, ulong slot ) {
974 0 : FD_LOG_DEBUG(( "[%s] slot %lu", __func__, slot ));
975 :
976 0 : fd_block_map_t * block_map_entry = fd_blockstore_block_map_query( blockstore, slot );
977 0 : FD_TEST( block_map_entry->block_gaddr == 0 ); /* FIXME duplicate blocks are not supported */
978 :
979 0 : block_map_entry->ts = fd_log_wallclock();
980 :
981 :
982 0 : fd_buf_shred_t * shred_pool = fd_blockstore_shred_pool( blockstore );
983 0 : fd_buf_shred_map_t * shred_map = fd_blockstore_shred_map( blockstore );
984 :
985 0 : ulong block_sz = 0UL;
986 0 : ulong shred_cnt = block_map_entry->complete_idx + 1;
987 0 : ulong batch_cnt = 0UL;
988 0 : for( uint idx = 0; idx < shred_cnt; idx++ ) {
989 0 : fd_shred_key_t key = { .slot = slot, .idx = idx };
990 0 : fd_buf_shred_t const * query = fd_buf_shred_map_ele_query_const( shred_map, &key, NULL, shred_pool );
991 0 : if( FD_UNLIKELY( !query ) ) {
992 0 : FD_LOG_ERR(( "[%s] missing shred slot: %lu idx: %u while deshredding", __func__, slot, idx ));
993 0 : }
994 0 : block_sz += fd_shred_payload_sz( &query->hdr );
995 0 : if( FD_LIKELY( (query->hdr.data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE) || query->hdr.data.flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE ) ) {
996 0 : batch_cnt++;
997 0 : }
998 0 : }
999 :
1000 : // alloc mem for the block
1001 0 : ulong data_off = fd_ulong_align_up( sizeof(fd_block_t), 128UL );
1002 0 : ulong shred_off = fd_ulong_align_up( data_off + block_sz, alignof(fd_block_shred_t) );
1003 0 : ulong batch_off = fd_ulong_align_up( shred_off + (sizeof(fd_block_shred_t) * shred_cnt), alignof(fd_block_entry_batch_t) );
1004 0 : ulong tot_sz = batch_off + (sizeof(fd_block_entry_batch_t) * batch_cnt);
1005 :
1006 0 : fd_alloc_t * alloc = fd_blockstore_alloc( blockstore );
1007 0 : fd_wksp_t * wksp = fd_blockstore_wksp( blockstore );
1008 0 : fd_block_t * block = fd_alloc_malloc( alloc, 128UL, tot_sz );
1009 0 : if( FD_UNLIKELY( !block ) ) {
1010 0 : FD_LOG_ERR(( "[%s] OOM: failed to alloc block. blockstore needs to hold in memory all blocks for slots >= SMR, so either increase memory or check for issues with publishing new SMRs.", __func__ ));
1011 0 : }
1012 :
1013 0 : fd_memset( block, 0, sizeof(fd_block_t) );
1014 :
1015 0 : uchar * data_laddr = (uchar *)((ulong)block + data_off);
1016 0 : block->data_gaddr = fd_wksp_gaddr_fast( wksp, data_laddr );
1017 0 : block->data_sz = block_sz;
1018 0 : fd_block_shred_t * shreds_laddr = (fd_block_shred_t *)((ulong)block + shred_off);
1019 0 : block->shreds_gaddr = fd_wksp_gaddr_fast( wksp, shreds_laddr );
1020 0 : block->shreds_cnt = shred_cnt;
1021 0 : fd_block_entry_batch_t * batch_laddr = (fd_block_entry_batch_t *)((ulong)block + batch_off);
1022 0 : block->batch_gaddr = fd_wksp_gaddr_fast( wksp, batch_laddr );
1023 0 : block->batch_cnt = batch_cnt;
1024 :
1025 : /* deshred the shreds into the block mem */
1026 0 : fd_deshredder_t deshredder;
1027 0 : fd_deshredder_init( &deshredder, data_laddr, block->data_sz, NULL, 0 );
1028 0 : long rc = -FD_SHRED_EINVAL;
1029 0 : ulong off = 0UL;
1030 0 : ulong batch_i = 0UL;
1031 0 : for( uint i = 0; i < shred_cnt; i++ ) {
1032 : // TODO can do this in one iteration with block sz loop... massage with deshredder API
1033 0 : fd_shred_key_t key = { .slot = slot, .idx = i };
1034 0 : fd_buf_shred_t const * query =
1035 0 : fd_buf_shred_map_ele_query_const( shred_map, &key, NULL, shred_pool );
1036 0 : if( FD_UNLIKELY( !query ) ) FD_LOG_ERR(( "missing shred idx %u during deshred. slot %lu.", i, slot ));
1037 : /* This is a hack to set up the internal state of the deshreddder
1038 : such that it processes exactly one shred.
1039 : TODO improve deshredder API
1040 : */
1041 0 : fd_shred_t const * shred = &query->hdr;
1042 0 : deshredder.shreds = &shred;
1043 0 : deshredder.shred_cnt = 1;
1044 0 : rc = fd_deshredder_next( &deshredder );
1045 0 : FD_TEST( rc >= 0 );
1046 :
1047 0 : shreds_laddr[i].hdr = *shred;
1048 0 : ulong merkle_sz = shreds_laddr[i].merkle_sz = fd_shred_merkle_sz( shred->variant );
1049 0 : FD_TEST( merkle_sz <= sizeof(shreds_laddr[i].merkle) );
1050 0 : if( merkle_sz ) {
1051 0 : fd_memcpy( shreds_laddr[i].merkle, (uchar const*)shred + fd_shred_merkle_off( shred ), merkle_sz );
1052 0 : }
1053 0 : shreds_laddr[i].off = off;
1054 :
1055 0 : FD_TEST( !memcmp( &shreds_laddr[i].hdr, shred, sizeof( fd_shred_t ) ) );
1056 0 : FD_TEST( !memcmp( data_laddr + shreds_laddr[i].off,
1057 0 : fd_shred_data_payload( shred ),
1058 0 : fd_shred_payload_sz( shred ) ) );
1059 :
1060 0 : off += fd_shred_payload_sz( shred );
1061 :
1062 0 : if( FD_LIKELY( (query->hdr.data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE) || query->hdr.data.flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE ) ) {
1063 0 : batch_laddr[ batch_i++ ].end_off = off;
1064 0 : }
1065 :
1066 0 : fd_buf_shred_t * ele = NULL;
1067 0 : while( FD_UNLIKELY( ele = fd_buf_shred_map_ele_remove( shred_map, &key, NULL, shred_pool ) ) ) {
1068 0 : fd_buf_shred_pool_ele_release( shred_pool, ele );
1069 0 : }
1070 0 : }
1071 0 : if( FD_UNLIKELY( batch_cnt != batch_i ) ) {
1072 0 : FD_LOG_ERR(( "batch_cnt(%lu)!=batch_i(%lu) potential memory corruption", batch_cnt, batch_i ));
1073 0 : }
1074 :
1075 : /* deshredder error handling */
1076 0 : int err;
1077 0 : switch( rc ) {
1078 0 : case -FD_SHRED_EINVAL:
1079 0 : err = FD_BLOCKSTORE_ERR_SHRED_INVALID;
1080 0 : goto fail_deshred;
1081 0 : case -FD_SHRED_ENOMEM:
1082 0 : err = FD_BLOCKSTORE_ERR_NO_MEM;
1083 0 : FD_LOG_ERR(( "should have alloc'd enough memory above. likely indicates memory corruption." ));
1084 0 : }
1085 :
1086 0 : switch( deshredder.result ) {
1087 0 : case FD_SHRED_ESLOT:
1088 0 : fd_blockstore_scan_block( blockstore, slot, block );
1089 :
1090 : /* Do this last when it's safe */
1091 0 : FD_COMPILER_MFENCE();
1092 0 : block_map_entry->block_gaddr = fd_wksp_gaddr_fast( wksp, block );
1093 0 : fd_block_micro_t * micros = fd_wksp_laddr_fast( wksp, block->micros_gaddr );
1094 0 : uchar * data = fd_wksp_laddr_fast( wksp, block->data_gaddr );
1095 0 : fd_microblock_hdr_t * last_micro = (fd_microblock_hdr_t *)( data +
1096 0 : micros[block->micros_cnt - 1].off );
1097 0 : memcpy( &block_map_entry->block_hash, last_micro->hash, sizeof( fd_hash_t ) );
1098 :
1099 0 : block_map_entry->flags = fd_uchar_clear_bit( block_map_entry->flags, FD_BLOCK_FLAG_RECEIVING );
1100 0 : block_map_entry->flags = fd_uchar_set_bit( block_map_entry->flags, FD_BLOCK_FLAG_COMPLETED );
1101 :
1102 0 : return FD_BLOCKSTORE_OK;
1103 0 : case FD_SHRED_EBATCH:
1104 0 : case FD_SHRED_EPIPE:
1105 0 : FD_LOG_WARNING(( "deshredding slot %lu produced invalid block", slot ));
1106 0 : err = FD_BLOCKSTORE_ERR_DESHRED_INVALID;
1107 0 : goto fail_deshred;
1108 0 : default:
1109 0 : err = FD_BLOCKSTORE_ERR_UNKNOWN;
1110 0 : }
1111 :
1112 0 : fail_deshred:
1113 : /* We failed to deshred the block. Throw it away, and try again from scratch. */
1114 0 : FD_LOG_WARNING(( "[%s] failed to deshred slot %lu. err: %d", __func__, slot, err ));
1115 0 : fd_alloc_free( alloc, block );
1116 0 : fd_blockstore_slot_remove( blockstore, slot );
1117 0 : for( uint i = 0; i < shred_cnt; i++ ) {
1118 0 : fd_shred_key_t key = { .slot = slot, .idx = i };
1119 0 : fd_buf_shred_map_ele_remove( shred_map, &key, NULL, shred_pool );
1120 0 : }
1121 0 : return err;
1122 0 : }
1123 :
1124 :
1125 : /* Check if we're seeing a different payload for the same shred key,
1126 : which indicates equivocation. */
1127 :
1128 : static int
1129 0 : is_eqvoc_fec( fd_shred_t * old, fd_shred_t const * new ) {
1130 0 : if( FD_UNLIKELY( fd_shred_type( old->variant ) != fd_shred_type( new->variant ) ) ) {
1131 0 : FD_LOG_WARNING(( "[%s] shred %lu %u not both resigned", __func__, old->slot, old->idx ));
1132 0 : return 1;
1133 0 : }
1134 :
1135 0 : if( FD_UNLIKELY( fd_shred_payload_sz( old ) != fd_shred_payload_sz( new ) ) ) {
1136 0 : FD_LOG_WARNING(( "[%s] shred %lu %u payload_sz not eq", __func__, old->slot, old->idx ));
1137 0 : return 1;
1138 0 : }
1139 :
1140 0 : ulong memcmp_sz = fd_ulong_if( fd_shred_payload_sz( old ) > FD_SHRED_SIGNATURE_SZ &&
1141 0 : fd_shred_is_resigned( fd_shred_type( old->variant ) ),
1142 0 : fd_shred_payload_sz( old ) - FD_SHRED_SIGNATURE_SZ,
1143 0 : fd_shred_payload_sz( old ) );
1144 0 : if( FD_UNLIKELY( 0 != memcmp( fd_shred_data_payload( old ), fd_shred_data_payload( new ), memcmp_sz ) ) ) {
1145 0 : FD_LOG_WARNING(( "[%s] shred %lu %u payload not eq", __func__, old->slot, old->idx ));
1146 0 : return 1;
1147 0 : }
1148 :
1149 0 : return 0;
1150 0 : }
1151 :
1152 : int
1153 0 : fd_buf_shred_insert( fd_blockstore_t * blockstore, fd_shred_t const * shred ) {
1154 0 : FD_LOG_DEBUG(( "[%s] slot %lu idx %u", __func__, shred->slot, shred->idx ));
1155 :
1156 : /* Check this shred > SMR. We ignore shreds before the SMR because by
1157 : it is invariant that we must have a connected, linear chain for the
1158 : SMR and its ancestors. */
1159 :
1160 0 : if( FD_UNLIKELY( shred->slot <= blockstore->smr ) ) {
1161 0 : return FD_BLOCKSTORE_OK;
1162 0 : }
1163 :
1164 : /* Check if we already have this shred */
1165 :
1166 0 : fd_buf_shred_t * shred_pool = fd_blockstore_shred_pool( blockstore );
1167 0 : fd_buf_shred_map_t * shred_map = fd_blockstore_shred_map( blockstore );
1168 0 : fd_shred_key_t shred_key = { .slot = shred->slot, .idx = shred->idx };
1169 0 : fd_buf_shred_t * shred_ = fd_buf_shred_map_ele_query( shred_map, &shred_key, NULL, shred_pool );
1170 0 : if( FD_UNLIKELY( shred_ ) ) {
1171 :
1172 : /* FIXME we currently cannot handle equivocating shreds. */
1173 :
1174 0 : if( FD_UNLIKELY( is_eqvoc_fec( &shred_->hdr, shred ) ) ) {
1175 0 : FD_LOG_WARNING(( "equivocating shred detected %lu %u. halting.", shred->slot, shred->idx ));
1176 0 : return FD_BLOCKSTORE_OK;
1177 0 : }
1178 :
1179 : /* Short-circuit if we already have the shred. */
1180 :
1181 0 : return FD_BLOCKSTORE_OK;
1182 0 : }
1183 :
1184 0 : if( FD_UNLIKELY( !fd_buf_shred_pool_free( shred_pool ) ) ) {
1185 0 : FD_LOG_ERR(( "[%s] OOM: failed to buffer shred. blockstore needs to buffer shreds for slots >= SMR for block assembly, so either increase memory or check for issues with publishing new SMRs.", __func__ ));
1186 0 : }
1187 0 : fd_buf_shred_t * ele = fd_buf_shred_pool_ele_acquire( shred_pool ); /* always non-NULL */
1188 0 : ele->key = shred_key;
1189 0 : ele->hdr = *shred;
1190 0 : fd_memcpy( &ele->raw, shred, fd_shred_sz( shred ) );
1191 0 : fd_buf_shred_map_ele_insert( shred_map, ele, shred_pool ); /* always non-NULL */
1192 :
1193 : /* Update shred's associated slot meta */
1194 :
1195 0 : ulong slot = shred->slot;
1196 0 : fd_block_map_t * block_map = fd_blockstore_block_map( blockstore );
1197 0 : fd_block_map_t * block_map_entry = fd_block_map_query( block_map, &slot, NULL );
1198 0 : if( FD_UNLIKELY( !block_map_entry ) ) {
1199 :
1200 0 : if( FD_UNLIKELY( fd_block_map_key_cnt( block_map ) == fd_block_map_key_max( block_map ) ) ) {
1201 0 : FD_LOG_ERR(( "[%s] OOM: failed to insert new block map entry. blockstore needs to save metadata for all slots >= SMR, so increase memory or check for issues with publishing new SMRs.", __func__ ));
1202 0 : }
1203 :
1204 : /* Try to insert slot into block_map */
1205 :
1206 0 : block_map_entry = fd_block_map_insert( block_map, &slot );
1207 0 : if( FD_UNLIKELY( !block_map_entry ) ) return FD_BLOCKSTORE_ERR_SLOT_FULL;
1208 :
1209 : /* Initialize the block_map_entry. Note some fields are initialized
1210 : to dummy values because we do not have all the necessary metadata
1211 : yet. */
1212 :
1213 0 : block_map_entry->slot = block_map_entry->slot;
1214 :
1215 0 : block_map_entry->parent_slot = shred->slot - shred->data.parent_off;
1216 0 : memset( block_map_entry->child_slots, UCHAR_MAX, FD_BLOCKSTORE_CHILD_SLOT_MAX * sizeof(ulong) );
1217 0 : block_map_entry->child_slot_cnt = 0;
1218 :
1219 0 : block_map_entry->height = 0;
1220 0 : block_map_entry->block_hash = ( fd_hash_t ){ 0 };
1221 0 : block_map_entry->bank_hash = ( fd_hash_t ){ 0 };
1222 0 : block_map_entry->flags = fd_uchar_set_bit( 0, FD_BLOCK_FLAG_RECEIVING );
1223 0 : block_map_entry->ts = 0;
1224 0 : block_map_entry->reference_tick = (uchar)( (int)shred->data.flags &
1225 0 : (int)FD_SHRED_DATA_REF_TICK_MASK );
1226 0 : block_map_entry->consumed_idx = UINT_MAX;
1227 0 : block_map_entry->received_idx = 0;
1228 0 : block_map_entry->complete_idx = UINT_MAX;
1229 :
1230 0 : block_map_entry->block_gaddr = 0;
1231 0 : }
1232 :
1233 0 : FD_LOG_DEBUG(( "slot_meta->consumed_idx: %u, shred->slot: %lu, slot_meta->received_idx: %u, "
1234 0 : "shred->idx: %u, shred->complete_idx: %u",
1235 0 : block_map_entry->consumed_idx,
1236 0 : shred->slot,
1237 0 : block_map_entry->received_idx,
1238 0 : shred->idx,
1239 0 : block_map_entry->complete_idx ));
1240 :
1241 : /* Update shred windowing metadata: consumed, received, shred_cnt */
1242 :
1243 0 : while( FD_LIKELY( fd_buf_shred_query( blockstore, shred->slot, (uint)( block_map_entry->consumed_idx + 1U ) ) ) ) {
1244 0 : block_map_entry->consumed_idx++;
1245 0 : }
1246 0 : block_map_entry->received_idx = fd_uint_max( block_map_entry->received_idx, shred->idx + 1 );
1247 0 : if( FD_UNLIKELY( shred->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE ) ) block_map_entry->complete_idx = shred->idx;
1248 :
1249 : /* update ancestry metadata: parent_slot, is_connected, next_slot */
1250 :
1251 0 : fd_block_map_t * parent_block_map_entry = fd_blockstore_block_map_query( blockstore, block_map_entry->parent_slot );
1252 :
1253 : /* Add this slot to its parent's child slots if not already there. */
1254 :
1255 0 : if( FD_LIKELY( parent_block_map_entry ) ) {
1256 0 : int found = 0;
1257 0 : for( ulong i = 0; i < parent_block_map_entry->child_slot_cnt; i++ ) {
1258 0 : if( FD_LIKELY( parent_block_map_entry->child_slots[i] == slot ) ) {
1259 0 : found = 1;
1260 0 : }
1261 0 : }
1262 0 : if( FD_UNLIKELY( !found ) ) {
1263 0 : if( FD_UNLIKELY( parent_block_map_entry->child_slot_cnt == FD_BLOCKSTORE_CHILD_SLOT_MAX )) {
1264 0 : FD_LOG_ERR(( "failed to add slot %lu to parent %lu's children. exceeding child slot max",
1265 0 : slot,
1266 0 : parent_block_map_entry->slot ));
1267 0 : }
1268 0 : parent_block_map_entry->child_slots[parent_block_map_entry->child_slot_cnt++] = slot;
1269 0 : }
1270 0 : }
1271 :
1272 0 : if( FD_LIKELY( block_map_entry->consumed_idx == UINT_MAX ||
1273 0 : block_map_entry->consumed_idx != block_map_entry->complete_idx ) ) {
1274 0 : return FD_BLOCKSTORE_OK;
1275 0 : }
1276 :
1277 : /* Received all shreds, so try to assemble a block. */
1278 0 : FD_LOG_DEBUG(( "received all shreds for slot %lu - now building a block", shred->slot ));
1279 :
1280 0 : int rc = deshred( blockstore, shred->slot );
1281 0 : switch( rc ) {
1282 0 : case FD_BLOCKSTORE_OK:
1283 0 : return FD_BLOCKSTORE_OK_SLOT_COMPLETE;
1284 0 : case FD_BLOCKSTORE_ERR_SLOT_FULL:
1285 0 : FD_LOG_DEBUG(( "already deshredded slot %lu. ignoring.", shred->slot ));
1286 0 : return FD_BLOCKSTORE_OK;
1287 0 : case FD_BLOCKSTORE_ERR_DESHRED_INVALID:
1288 0 : FD_LOG_DEBUG(( "failed to deshred slot %lu. ignoring.", shred->slot ));
1289 0 : return FD_BLOCKSTORE_OK;
1290 0 : default:
1291 : /* FIXME */
1292 0 : FD_LOG_ERR(( "deshred err %d", rc ));
1293 0 : }
1294 0 : }
1295 :
1296 : fd_shred_t *
1297 0 : fd_buf_shred_query( fd_blockstore_t * blockstore, ulong slot, uint shred_idx ) {
1298 0 : fd_buf_shred_t * shred_pool = fd_blockstore_shred_pool( blockstore );
1299 0 : fd_buf_shred_map_t * shred_map = fd_blockstore_shred_map( blockstore );
1300 0 : fd_shred_key_t key = { .slot = slot, .idx = shred_idx };
1301 0 : fd_buf_shred_t * query =
1302 0 : fd_buf_shred_map_ele_query( shred_map, &key, NULL, shred_pool );
1303 0 : if( FD_UNLIKELY( !query ) ) return NULL;
1304 0 : return &query->hdr;
1305 0 : }
1306 :
1307 : long
1308 0 : fd_buf_shred_query_copy_data( fd_blockstore_t * blockstore, ulong slot, uint shred_idx, void * buf, ulong buf_max ) {
1309 0 : if( buf_max < FD_SHRED_MAX_SZ ) return -1;
1310 :
1311 0 : fd_buf_shred_t * shred_pool = fd_blockstore_shred_pool( blockstore );
1312 0 : fd_buf_shred_map_t * shred_map = fd_blockstore_shred_map( blockstore );
1313 0 : fd_shred_key_t key = { .slot = slot, .idx = shred_idx };
1314 0 : fd_buf_shred_t * shred =
1315 0 : fd_buf_shred_map_ele_query( shred_map, &key, NULL, shred_pool );
1316 0 : if( shred ) {
1317 0 : ulong sz = fd_shred_sz( &shred->hdr );
1318 0 : if( sz > buf_max ) return -1;
1319 0 : fd_memcpy( buf, shred->raw, sz);
1320 0 : return (long)sz;
1321 0 : }
1322 :
1323 0 : fd_block_map_t * query =
1324 0 : fd_block_map_query( fd_blockstore_block_map( blockstore ), &slot, NULL );
1325 0 : if( FD_UNLIKELY( !query || query->block_gaddr == 0 ) ) return -1;
1326 0 : if( shred_idx > query->complete_idx ) return -1;
1327 0 : fd_wksp_t * wksp = fd_blockstore_wksp( blockstore );
1328 0 : fd_block_t * blk = fd_wksp_laddr_fast( wksp, query->block_gaddr );
1329 0 : fd_block_shred_t * shreds = fd_wksp_laddr_fast( wksp, blk->shreds_gaddr );
1330 0 : ulong sz = fd_shred_payload_sz( &shreds[shred_idx].hdr );
1331 0 : if( FD_SHRED_DATA_HEADER_SZ + sz > buf_max ) return -1L;
1332 0 : fd_memcpy( buf, &shreds[shred_idx].hdr, FD_SHRED_DATA_HEADER_SZ );
1333 0 : fd_memcpy( (uchar*)buf + FD_SHRED_DATA_HEADER_SZ, (uchar*)fd_wksp_laddr_fast( wksp, blk->data_gaddr ) + shreds[shred_idx].off, sz );
1334 0 : ulong tot_sz = FD_SHRED_DATA_HEADER_SZ + sz;
1335 0 : ulong merkle_sz = shreds[shred_idx].merkle_sz;
1336 0 : if( merkle_sz ) {
1337 0 : if( tot_sz + merkle_sz > buf_max ) return -1;
1338 0 : fd_memcpy( (uchar*)buf + tot_sz, shreds[shred_idx].merkle, merkle_sz );
1339 0 : tot_sz += merkle_sz;
1340 0 : }
1341 0 : if( tot_sz >= FD_SHRED_MIN_SZ ) return (long)tot_sz;
1342 : /* Zero pad */
1343 0 : fd_memset( (uchar*)buf + tot_sz, 0, FD_SHRED_MIN_SZ - tot_sz );
1344 0 : return (long)FD_SHRED_MIN_SZ;
1345 0 : }
1346 :
1347 : fd_block_t *
1348 0 : fd_blockstore_block_query( fd_blockstore_t * blockstore, ulong slot ) {
1349 0 : fd_block_map_t * query = fd_block_map_query( fd_blockstore_block_map( blockstore ), &slot, NULL );
1350 0 : if( FD_UNLIKELY( !query || query->block_gaddr == 0 ) ) return NULL;
1351 0 : return fd_wksp_laddr_fast( fd_blockstore_wksp( blockstore ), query->block_gaddr );
1352 0 : }
1353 :
1354 : fd_hash_t const *
1355 0 : fd_blockstore_block_hash_query( fd_blockstore_t * blockstore, ulong slot ) {
1356 0 : fd_block_map_t * query =
1357 0 : fd_block_map_query( fd_blockstore_block_map( blockstore ), &slot, NULL );
1358 0 : if( FD_UNLIKELY( !query || query->block_gaddr == 0 ) ) return NULL;
1359 0 : return &query->block_hash;
1360 0 : }
1361 :
1362 : fd_hash_t const *
1363 0 : fd_blockstore_bank_hash_query( fd_blockstore_t * blockstore, ulong slot ) {
1364 0 : fd_block_map_t * block_map_entry = fd_blockstore_block_map_query( blockstore, slot );
1365 0 : if( FD_UNLIKELY( !block_map_entry ) ) return NULL;
1366 0 : return &block_map_entry->bank_hash;
1367 0 : }
1368 :
1369 : fd_block_map_t *
1370 0 : fd_blockstore_block_map_query( fd_blockstore_t * blockstore, ulong slot ) {
1371 0 : return fd_block_map_query( fd_blockstore_block_map( blockstore ), &slot, NULL );
1372 0 : }
1373 :
1374 : ulong
1375 0 : fd_blockstore_parent_slot_query( fd_blockstore_t * blockstore, ulong slot ) {
1376 0 : fd_block_map_t * query = fd_blockstore_block_map_query( blockstore, slot );
1377 0 : if( FD_UNLIKELY( !query ) ) return FD_SLOT_NULL;
1378 0 : return query->parent_slot;
1379 0 : }
1380 :
1381 : int
1382 0 : fd_blockstore_child_slots_query( fd_blockstore_t * blockstore, ulong slot, ulong ** slots_out, ulong * slot_cnt_out ) {
1383 0 : fd_block_map_t * query = fd_blockstore_block_map_query( blockstore, slot );
1384 0 : if( FD_UNLIKELY( !query ) ) return FD_BLOCKSTORE_ERR_SLOT_MISSING;
1385 0 : *slots_out = query->child_slots;
1386 0 : *slot_cnt_out = query->child_slot_cnt;
1387 0 : return FD_BLOCKSTORE_OK;
1388 0 : }
1389 :
1390 : int
1391 : fd_blockstore_block_data_query_volatile( fd_blockstore_t * blockstore,
1392 : int fd,
1393 : ulong slot,
1394 : fd_valloc_t alloc,
1395 : fd_hash_t * parent_block_hash_out,
1396 : fd_block_map_t * block_map_entry_out,
1397 : fd_block_rewards_t * block_rewards_out,
1398 : uchar ** block_data_out,
1399 0 : ulong * block_data_sz_out ) {
1400 :
1401 : /* WARNING: this code is extremely delicate. Do NOT modify without
1402 : understanding all the invariants. In particular, we must never
1403 : dereference through a corrupt pointer. It's OK for the destination
1404 : data to be overwritten/invalid as long as the memory location is
1405 : valid. As long as we don't crash, we can validate the data after it
1406 : is read. */
1407 :
1408 0 : fd_wksp_t * wksp = fd_blockstore_wksp( blockstore );
1409 0 : fd_block_idx_t * block_idx = fd_blockstore_block_idx( blockstore );
1410 0 : fd_block_idx_t * idx_entry = NULL;
1411 :
1412 0 : ulong off = ULONG_MAX;
1413 0 : for(;;) {
1414 0 : uint seqnum;
1415 0 : if( FD_UNLIKELY( fd_rwseq_start_concur_read( &blockstore->lock, &seqnum ) ) ) continue;
1416 0 : idx_entry = fd_block_idx_query( block_idx, slot, NULL );
1417 0 : if( FD_LIKELY( idx_entry ) ) off = idx_entry->off;
1418 0 : if( FD_UNLIKELY( fd_rwseq_check_concur_read( &blockstore->lock, seqnum ) ) ) continue;
1419 0 : else break;
1420 0 : }
1421 :
1422 0 : if ( FD_UNLIKELY( off < ULONG_MAX ) ) { /* optimize for non-archival queries */
1423 0 : FD_LOG_DEBUG( ( "Querying archive for block %lu", slot ) );
1424 0 : fd_block_t block_out;
1425 0 : int err = fd_blockstore_block_meta_restore( &blockstore->archiver, fd, idx_entry, block_map_entry_out, &block_out );
1426 0 : if( FD_UNLIKELY( err ) ) {
1427 0 : return FD_BLOCKSTORE_ERR_SLOT_MISSING;
1428 0 : }
1429 0 : uchar * block_data = fd_valloc_malloc( alloc, 128UL, block_out.data_sz );
1430 0 : err = fd_blockstore_block_data_restore( &blockstore->archiver,
1431 0 : fd,
1432 0 : idx_entry,
1433 0 : block_data,
1434 0 : block_out.data_sz,
1435 0 : block_out.data_sz);
1436 0 : if( FD_UNLIKELY( err ) ) {
1437 0 : return FD_BLOCKSTORE_ERR_SLOT_MISSING;
1438 0 : }
1439 0 : fd_block_idx_t * parent_idx_entry = fd_block_idx_query( block_idx, block_map_entry_out->parent_slot, NULL );
1440 0 : if( FD_UNLIKELY( !parent_idx_entry ) ) {
1441 0 : return FD_BLOCKSTORE_ERR_SLOT_MISSING;
1442 0 : }
1443 0 : *parent_block_hash_out = parent_idx_entry->block_hash;
1444 0 : *block_map_entry_out = *block_map_entry_out; /* no op */
1445 0 : *block_rewards_out = block_out.rewards;
1446 0 : *block_data_out = block_data;
1447 0 : *block_data_sz_out = block_out.data_sz;
1448 0 : return FD_BLOCKSTORE_OK;
1449 0 : }
1450 :
1451 0 : fd_block_map_t const * block_map = fd_blockstore_block_map( blockstore );
1452 0 : uchar * prev_data_out = NULL;
1453 0 : ulong prev_sz = 0;
1454 0 : for(;;) {
1455 0 : uint seqnum;
1456 0 : if( FD_UNLIKELY( fd_rwseq_start_concur_read( &blockstore->lock, &seqnum ) ) ) continue;
1457 :
1458 0 : fd_block_map_t const * query = fd_block_map_query_safe( block_map, &slot, NULL );
1459 0 : if( FD_UNLIKELY( !query ) ) return FD_BLOCKSTORE_ERR_SLOT_MISSING;
1460 :
1461 0 : memcpy( block_map_entry_out, query, sizeof( fd_block_map_t ) );
1462 0 : ulong blk_gaddr = query->block_gaddr;
1463 0 : if( FD_UNLIKELY( !blk_gaddr ) ) return FD_BLOCKSTORE_ERR_SLOT_MISSING;
1464 :
1465 0 : if( FD_UNLIKELY( fd_rwseq_check_concur_read( &blockstore->lock, seqnum ) ) ) continue;
1466 :
1467 0 : fd_block_t * blk = fd_wksp_laddr_fast( wksp, blk_gaddr );
1468 0 : if( block_rewards_out ) memcpy( block_rewards_out, &blk->rewards, sizeof(fd_block_rewards_t) );
1469 0 : ulong blk_data_gaddr = blk->data_gaddr;
1470 0 : if( FD_UNLIKELY( !blk_data_gaddr ) ) return FD_BLOCKSTORE_ERR_SLOT_MISSING;
1471 0 : ulong sz = *block_data_sz_out = blk->data_sz;
1472 0 : if( sz >= FD_SHRED_MAX_PER_SLOT * FD_SHRED_MAX_SZ ) continue;
1473 :
1474 0 : if( FD_UNLIKELY( fd_rwseq_check_concur_read( &blockstore->lock, seqnum ) ) ) continue;
1475 :
1476 0 : uchar * data_out;
1477 0 : if( prev_sz >= sz ) {
1478 0 : data_out = prev_data_out;
1479 0 : } else {
1480 0 : if( prev_data_out != NULL ) {
1481 0 : fd_valloc_free( alloc, prev_data_out );
1482 0 : }
1483 0 : prev_data_out = data_out = fd_valloc_malloc( alloc, 128UL, sz );
1484 0 : prev_sz = sz;
1485 0 : }
1486 0 : if( FD_UNLIKELY( data_out == NULL ) ) return FD_BLOCKSTORE_ERR_SLOT_MISSING;
1487 0 : fd_memcpy( data_out, fd_wksp_laddr_fast( wksp, blk_data_gaddr ), sz );
1488 :
1489 0 : if( FD_UNLIKELY( fd_rwseq_check_concur_read( &blockstore->lock, seqnum ) ) ) {
1490 0 : fd_valloc_free( alloc, data_out );
1491 0 : continue;
1492 0 : }
1493 :
1494 0 : *block_data_out = data_out;
1495 :
1496 0 : if( parent_block_hash_out ) {
1497 0 : if(( query = fd_block_map_query_safe( block_map, &block_map_entry_out->parent_slot, NULL )) == NULL ) {
1498 0 : memset( parent_block_hash_out, 0, sizeof(fd_hash_t) );
1499 0 : } else {
1500 0 : fd_memcpy( parent_block_hash_out, query->block_hash.uc, sizeof(fd_hash_t) );
1501 :
1502 0 : if( FD_UNLIKELY( fd_rwseq_check_concur_read( &blockstore->lock, seqnum ) ) ) {
1503 0 : fd_valloc_free( alloc, data_out );
1504 0 : continue;
1505 0 : }
1506 0 : }
1507 0 : }
1508 :
1509 0 : return FD_BLOCKSTORE_OK;
1510 0 : }
1511 0 : }
1512 :
1513 :
1514 : int
1515 : fd_blockstore_block_map_query_volatile( fd_blockstore_t * blockstore,
1516 : int fd,
1517 : ulong slot,
1518 0 : fd_block_map_t * block_map_entry_out ) {
1519 :
1520 : /* WARNING: this code is extremely delicate. Do NOT modify without
1521 : understanding all the invariants. In particular, we must never
1522 : dereference through a corrupt pointer. It's OK for the destination
1523 : data to be overwritten/invalid as long as the memory location is
1524 : valid. As long as we don't crash, we can validate the data after it
1525 : is read. */
1526 :
1527 0 : fd_block_idx_t * block_idx = fd_blockstore_block_idx( blockstore );
1528 :
1529 0 : ulong off = ULONG_MAX;
1530 0 : for( ;; ) {
1531 0 : uint seqnum;
1532 0 : if( FD_UNLIKELY( fd_rwseq_start_concur_read( &blockstore->lock, &seqnum ) ) ) continue;
1533 0 : fd_block_idx_t * idx_entry = fd_block_idx_query( block_idx, slot, NULL );
1534 0 : if( FD_LIKELY( idx_entry ) ) off = idx_entry->off;
1535 0 : if( FD_UNLIKELY( fd_rwseq_check_concur_read( &blockstore->lock, seqnum ) ) ) continue;
1536 0 : else break;
1537 0 : }
1538 :
1539 0 : if( FD_UNLIKELY( off < ULONG_MAX ) ) { /* optimize for non-archival queries */
1540 0 : if( FD_UNLIKELY( lseek( fd, (long)off, SEEK_SET ) == -1 ) ) {
1541 0 : FD_LOG_WARNING(( "failed to seek" ));
1542 0 : return FD_BLOCKSTORE_ERR_SLOT_MISSING;
1543 0 : }
1544 0 : ulong rsz;
1545 0 : int err = fd_io_read( fd, block_map_entry_out, sizeof( fd_block_map_t ), sizeof( fd_block_map_t ), &rsz );
1546 0 : if( FD_UNLIKELY( err ) ) {
1547 0 : FD_LOG_WARNING(( "failed to read block map entry" ));
1548 0 : return FD_BLOCKSTORE_ERR_SLOT_MISSING;
1549 0 : }
1550 0 : return FD_BLOCKSTORE_OK;
1551 0 : }
1552 :
1553 0 : fd_block_map_t const * block_map = fd_blockstore_block_map( blockstore );
1554 0 : for(;;) {
1555 0 : uint seqnum;
1556 0 : if( FD_UNLIKELY( fd_rwseq_start_concur_read( &blockstore->lock, &seqnum ) ) ) continue;
1557 0 : fd_block_map_t const * query = fd_block_map_query_safe( block_map, &slot, NULL );
1558 0 : if( FD_UNLIKELY( !query ) ) return FD_BLOCKSTORE_ERR_SLOT_MISSING;
1559 0 : memcpy( block_map_entry_out, query, sizeof( fd_block_map_t ) );
1560 0 : ulong blk_gaddr = query->block_gaddr;
1561 0 : if( FD_UNLIKELY( !blk_gaddr ) ) return FD_BLOCKSTORE_ERR_SLOT_MISSING;
1562 :
1563 0 : if( FD_UNLIKELY( fd_rwseq_check_concur_read( &blockstore->lock, seqnum ) ) ) continue;
1564 :
1565 0 : return FD_BLOCKSTORE_OK;
1566 0 : }
1567 0 : }
1568 :
1569 : fd_txn_map_t *
1570 0 : fd_blockstore_txn_query( fd_blockstore_t * blockstore, uchar const sig[FD_ED25519_SIG_SZ] ) {
1571 0 : fd_txn_key_t key;
1572 0 : fd_memcpy( &key, sig, sizeof( key ) );
1573 0 : return fd_txn_map_query( fd_blockstore_txn_map( blockstore ), &key, NULL );
1574 0 : }
1575 :
1576 : int
1577 : fd_blockstore_txn_query_volatile( fd_blockstore_t * blockstore,
1578 : int fd,
1579 : uchar const sig[FD_ED25519_SIG_SZ],
1580 : fd_txn_map_t * txn_out,
1581 : long * blk_ts,
1582 : uchar * blk_flags,
1583 0 : uchar txn_data_out[FD_TXN_MTU] ) {
1584 : /* WARNING: this code is extremely delicate. Do NOT modify without
1585 : understanding all the invariants. In particular, we must never
1586 : dereference through a corrupt pointer. It's OK for the
1587 : destination data to be overwritten/invalid as long as the memory
1588 : location is valid. As long as we don't crash, we can validate the
1589 : data after it is read. */
1590 0 : fd_wksp_t * wksp = fd_blockstore_wksp( blockstore );
1591 :
1592 0 : fd_block_map_t const * block_map = fd_blockstore_block_map( blockstore );
1593 0 : fd_txn_map_t * txn_map = fd_blockstore_txn_map( blockstore );
1594 :
1595 0 : for(;;) {
1596 0 : uint seqnum;
1597 0 : if( FD_UNLIKELY( fd_rwseq_start_concur_read( &blockstore->lock, &seqnum ) ) ) continue;
1598 :
1599 0 : fd_txn_key_t key;
1600 0 : memcpy( &key, sig, sizeof(key) );
1601 0 : fd_txn_map_t const * txn_map_entry = fd_txn_map_query_safe( txn_map, &key, NULL );
1602 0 : if( FD_UNLIKELY( txn_map_entry == NULL ) ) return FD_BLOCKSTORE_ERR_TXN_MISSING;
1603 0 : memcpy( txn_out, txn_map_entry, sizeof(fd_txn_map_t) );
1604 0 : if( FD_UNLIKELY( fd_rwseq_check_concur_read( &blockstore->lock, seqnum ) ) ) continue;
1605 0 : else break;
1606 0 : }
1607 :
1608 0 : fd_block_idx_t * block_idx = fd_blockstore_block_idx( blockstore );
1609 :
1610 0 : ulong off = ULONG_MAX;
1611 0 : for(;;) {
1612 0 : uint seqnum;
1613 0 : if( FD_UNLIKELY( fd_rwseq_start_concur_read( &blockstore->lock, &seqnum ) ) ) continue;
1614 0 : fd_block_idx_t * idx_entry = fd_block_idx_query( block_idx, txn_out->slot, NULL );
1615 0 : if( FD_LIKELY( idx_entry ) ) off = idx_entry->off;
1616 0 : if( FD_UNLIKELY( fd_rwseq_check_concur_read( &blockstore->lock, seqnum ) ) ) continue;
1617 0 : else break;
1618 0 : }
1619 :
1620 0 : if ( FD_UNLIKELY( off < ULONG_MAX ) ) { /* optimize for non-archival */
1621 0 : if( FD_UNLIKELY( lseek( fd, (long)off, SEEK_SET ) == -1 ) ) {
1622 0 : FD_LOG_WARNING(( "failed to seek" ));
1623 0 : return FD_BLOCKSTORE_ERR_SLOT_MISSING;
1624 0 : }
1625 0 : fd_block_map_t block_map_entry;
1626 0 : ulong rsz; int err;
1627 0 : err = fd_io_read( fd, &block_map_entry, sizeof(fd_block_map_t), sizeof(fd_block_map_t), &rsz );
1628 0 : check_read_write_err( err );
1629 0 : err = fd_io_read( fd, txn_data_out, txn_out->sz, txn_out->sz, &rsz );
1630 0 : check_read_write_err( err );
1631 0 : err = (int)lseek( fd, (long)off + (long)txn_out->offset, SEEK_SET );
1632 0 : check_read_write_err( err );
1633 0 : err = fd_io_read( fd, txn_data_out, txn_out->sz, txn_out->sz, &rsz );
1634 0 : check_read_write_err( err);
1635 0 : return FD_BLOCKSTORE_OK;
1636 0 : }
1637 :
1638 0 : for(;;) {
1639 0 : uint seqnum;
1640 0 : if( FD_UNLIKELY( fd_rwseq_start_concur_read( &blockstore->lock, &seqnum ) ) ) continue;
1641 :
1642 0 : fd_block_map_t const * query = fd_block_map_query_safe( block_map, &txn_out->slot, NULL );
1643 0 : if( FD_UNLIKELY( !query ) ) return FD_BLOCKSTORE_ERR_TXN_MISSING;
1644 0 : ulong blk_gaddr = query->block_gaddr;
1645 0 : if( FD_UNLIKELY( !blk_gaddr ) ) return FD_BLOCKSTORE_ERR_TXN_MISSING;
1646 :
1647 0 : if( FD_UNLIKELY( fd_rwseq_check_concur_read( &blockstore->lock, seqnum ) ) ) continue;
1648 :
1649 0 : fd_block_t * blk = fd_wksp_laddr_fast( wksp, blk_gaddr );
1650 0 : if( blk_ts ) *blk_ts = query->ts;
1651 0 : if( blk_flags ) *blk_flags = query->flags;
1652 0 : ulong ptr = blk->data_gaddr;
1653 0 : ulong sz = blk->data_sz;
1654 0 : if( txn_out->offset + txn_out->sz > sz || txn_out->sz > FD_TXN_MTU ) continue;
1655 :
1656 0 : if( FD_UNLIKELY( fd_rwseq_check_concur_read( &blockstore->lock, seqnum ) ) ) continue;
1657 :
1658 0 : if( txn_data_out == NULL ) return FD_BLOCKSTORE_OK;
1659 0 : uchar const * data = fd_wksp_laddr_fast( wksp, ptr );
1660 0 : fd_memcpy( txn_data_out, data + txn_out->offset, txn_out->sz );
1661 :
1662 0 : if( FD_UNLIKELY( fd_rwseq_check_concur_read( &blockstore->lock, seqnum ) ) ) continue;
1663 :
1664 0 : return FD_BLOCKSTORE_OK;
1665 0 : }
1666 0 : }
1667 :
1668 : void
1669 0 : fd_blockstore_block_height_update( fd_blockstore_t * blockstore, ulong slot, ulong height ) {
1670 0 : fd_block_map_t * query = fd_blockstore_block_map_query( blockstore, slot );
1671 0 : if( FD_LIKELY( query )) query->height = height;
1672 0 : }
1673 :
1674 : void
1675 0 : fd_blockstore_log_block_status( fd_blockstore_t * blockstore, ulong around_slot ) {
1676 0 : for( ulong i = around_slot - 5; i < around_slot + 20; ++i ) {
1677 0 : fd_block_map_t * slot_entry =
1678 0 : fd_block_map_query( fd_blockstore_block_map( blockstore ), &i, NULL );
1679 0 : if( !slot_entry ) continue;
1680 0 : FD_LOG_NOTICE(( "%sslot=%lu received=%u consumed=%u finished=%u",
1681 0 : ( i == around_slot ? "*" : " " ),
1682 0 : i,
1683 0 : slot_entry->received_idx,
1684 0 : slot_entry->consumed_idx,
1685 0 : slot_entry->complete_idx ));
1686 0 : }
1687 0 : }
1688 :
1689 : static char *
1690 0 : fd_smart_size( ulong sz, char * tmp, size_t tmpsz ) {
1691 0 : if( sz <= (1UL<<7) )
1692 0 : snprintf( tmp, tmpsz, "%lu B", sz );
1693 0 : else if( sz <= (1UL<<17) )
1694 0 : snprintf( tmp, tmpsz, "%.3f KB", ((double)sz/((double)(1UL<<10))) );
1695 0 : else if( sz <= (1UL<<27) )
1696 0 : snprintf( tmp, tmpsz, "%.3f MB", ((double)sz/((double)(1UL<<20))) );
1697 0 : else
1698 0 : snprintf( tmp, tmpsz, "%.3f GB", ((double)sz/((double)(1UL<<30))) );
1699 0 : return tmp;
1700 0 : }
1701 :
1702 : void
1703 0 : fd_blockstore_log_mem_usage( fd_blockstore_t * blockstore ) {
1704 0 : char tmp1[100];
1705 0 : char tmp2[100];
1706 0 : char tmp3[100];
1707 :
1708 0 : FD_LOG_NOTICE(( "blockstore base footprint: %s",
1709 0 : fd_smart_size( sizeof(fd_blockstore_t), tmp1, sizeof(tmp1) ) ));
1710 0 : fd_buf_shred_t * shred_pool = fd_blockstore_shred_pool( blockstore );
1711 0 : ulong shred_used = fd_buf_shred_pool_used( shred_pool );
1712 0 : ulong shred_max = fd_buf_shred_pool_max( shred_pool );
1713 0 : FD_LOG_NOTICE(( "shred pool footprint: %s (%lu entries used out of %lu, %lu%%)",
1714 0 : fd_smart_size( fd_buf_shred_pool_footprint( shred_max ), tmp1, sizeof(tmp1) ),
1715 0 : shred_used,
1716 0 : shred_max,
1717 0 : (100U*shred_used) / shred_max ));
1718 0 : fd_buf_shred_map_t * shred_map = fd_blockstore_shred_map( blockstore );
1719 0 : ulong shred_map_cnt = fd_buf_shred_map_chain_cnt( shred_map );
1720 0 : FD_LOG_NOTICE(( "shred map footprint: %s (%lu chains, load is %.3f)",
1721 0 : fd_smart_size( fd_buf_shred_map_footprint( shred_map_cnt ), tmp1, sizeof(tmp1) ),
1722 0 : shred_map_cnt,
1723 0 : ((double)shred_used)/((double)shred_map_cnt) ));
1724 0 : fd_block_map_t * slot_map = fd_blockstore_block_map( blockstore );
1725 0 : ulong slot_map_cnt = fd_block_map_key_cnt( slot_map );
1726 0 : ulong slot_map_max = fd_block_map_key_max( slot_map );
1727 0 : FD_LOG_NOTICE(( "slot map footprint: %s (%lu entries used out of %lu, %lu%%)",
1728 0 : fd_smart_size( fd_block_map_footprint( slot_map_max ), tmp1, sizeof(tmp1) ),
1729 0 : slot_map_cnt,
1730 0 : slot_map_max,
1731 0 : (100U*slot_map_cnt)/slot_map_max ));
1732 0 : fd_txn_map_t * txn_map = fd_blockstore_txn_map( blockstore );
1733 0 : ulong txn_map_cnt = fd_txn_map_key_cnt( txn_map );
1734 0 : ulong txn_map_max = fd_txn_map_key_max( txn_map );
1735 0 : FD_LOG_NOTICE(( "txn map footprint: %s (%lu entries used out of %lu, %lu%%)",
1736 0 : fd_smart_size( fd_txn_map_footprint( txn_map_max ), tmp1, sizeof(tmp1) ),
1737 0 : txn_map_cnt,
1738 0 : txn_map_max,
1739 0 : (100U*txn_map_cnt)/txn_map_max ));
1740 0 : ulong block_cnt = 0;
1741 0 : ulong data_tot = 0;
1742 0 : ulong data_max = 0;
1743 0 : ulong txn_tot = 0;
1744 0 : ulong txn_max = 0;
1745 :
1746 0 : ulong * q = fd_blockstore_slot_deque( blockstore );
1747 0 : fd_slot_deque_remove_all( q );
1748 0 : fd_slot_deque_push_tail( q, blockstore->smr );
1749 0 : while( !fd_slot_deque_empty( q ) ) {
1750 0 : ulong curr = fd_slot_deque_pop_head( q );
1751 :
1752 0 : fd_block_map_t * block_map_entry = fd_blockstore_block_map_query( blockstore, curr );
1753 0 : if( FD_UNLIKELY( !block_map_entry || !block_map_entry->block_gaddr ) ) continue;
1754 0 : fd_block_t * block = fd_wksp_laddr_fast( fd_blockstore_wksp( blockstore ), block_map_entry->block_gaddr );
1755 0 : if( block->data_gaddr ) {
1756 0 : block_cnt++;
1757 0 : data_tot += block->data_sz;
1758 0 : data_max = fd_ulong_max( data_max, block->data_sz );
1759 0 : txn_tot += block->txns_cnt;
1760 0 : txn_max = fd_ulong_max( txn_max, block->txns_cnt );
1761 0 : }
1762 :
1763 0 : ulong * child_slots = NULL;
1764 0 : ulong child_slot_cnt = 0;
1765 0 : int rc = fd_blockstore_child_slots_query( blockstore, curr, &child_slots, &child_slot_cnt );
1766 0 : if( FD_UNLIKELY( rc != FD_BLOCKSTORE_OK ) ) {
1767 0 : continue;
1768 0 : }
1769 :
1770 0 : for( ulong i = 0; i < child_slot_cnt; i++ ) {
1771 0 : fd_slot_deque_push_tail( q, child_slots[i] );
1772 0 : }
1773 0 : }
1774 :
1775 0 : if( block_cnt )
1776 0 : FD_LOG_NOTICE(( "block cnt: %lu, total size: %s, avg size: %s, max size: %s, avg txns per block: %lu, max txns: %lu",
1777 0 : block_cnt,
1778 0 : fd_smart_size( data_tot, tmp1, sizeof(tmp1) ),
1779 0 : fd_smart_size( data_tot/block_cnt, tmp2, sizeof(tmp2) ),
1780 0 : fd_smart_size( data_max, tmp3, sizeof(tmp3) ),
1781 0 : txn_tot/block_cnt,
1782 0 : txn_max ));
1783 0 : }
|