Line data Source code
1 : #include "fd_blockstore.h"
2 :
3 : #include <fcntl.h>
4 : #include <string.h>
5 : #include <stdio.h> /* snprintf */
6 : #include <unistd.h>
7 :
8 : void *
9 : fd_blockstore_new( void * shmem,
10 : ulong wksp_tag,
11 : ulong seed,
12 : ulong shred_max,
13 : ulong block_max,
14 0 : ulong idx_max ) {
15 : /* TODO temporary fix to make sure block_max is a power of 2, as
16 : required for slot map para. We should change to err in config
17 : verification eventually */
18 0 : block_max = fd_ulong_pow2_up( block_max );
19 0 : ulong lock_cnt = fd_ulong_min( block_max, BLOCK_INFO_LOCK_CNT );
20 :
21 0 : fd_blockstore_shmem_t * blockstore_shmem = (fd_blockstore_shmem_t *)shmem;
22 :
23 0 : if( FD_UNLIKELY( !blockstore_shmem ) ) {
24 0 : FD_LOG_WARNING(( "NULL blockstore_shmem" ));
25 0 : return NULL;
26 0 : }
27 :
28 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)blockstore_shmem, fd_blockstore_align() ) )) {
29 0 : FD_LOG_WARNING(( "misaligned blockstore_shmem" ));
30 0 : return NULL;
31 0 : }
32 :
33 0 : if( FD_UNLIKELY( !wksp_tag ) ) {
34 0 : FD_LOG_WARNING(( "bad wksp_tag" ));
35 0 : return NULL;
36 0 : }
37 :
38 0 : fd_wksp_t * wksp = fd_wksp_containing( blockstore_shmem );
39 0 : if( FD_UNLIKELY( !wksp ) ) {
40 0 : FD_LOG_WARNING(( "shmem must be part of a workspace" ));
41 0 : return NULL;
42 0 : }
43 :
44 0 : if( FD_UNLIKELY( !fd_ulong_is_pow2( shred_max ) ) ) {
45 0 : shred_max = fd_ulong_pow2_up( shred_max );
46 0 : FD_LOG_WARNING(( "blockstore implementation requires shred_max to be a power of two, rounding it up to %lu", shred_max ));
47 0 : }
48 :
49 0 : fd_memset( blockstore_shmem, 0, sizeof( fd_blockstore_shmem_t ) );
50 :
51 0 : int lg_idx_max = fd_ulong_find_msb( fd_ulong_pow2_up( idx_max ) );
52 :
53 0 : FD_SCRATCH_ALLOC_INIT( l, shmem );
54 0 : blockstore_shmem = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_blockstore_shmem_t), sizeof(fd_blockstore_shmem_t) );
55 0 : void * shreds = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_buf_shred_t), sizeof(fd_buf_shred_t) * shred_max );
56 0 : void * shred_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_buf_shred_pool_align(), fd_buf_shred_pool_footprint() );
57 0 : void * shred_map = FD_SCRATCH_ALLOC_APPEND( l, fd_buf_shred_map_align(), fd_buf_shred_map_footprint( shred_max ) );
58 0 : void * blocks = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_block_info_t), sizeof(fd_block_info_t) * block_max );
59 0 : void * block_map = FD_SCRATCH_ALLOC_APPEND( l, fd_block_map_align(), fd_block_map_footprint( block_max, lock_cnt, BLOCK_INFO_PROBE_CNT ) );
60 0 : void * block_idx = FD_SCRATCH_ALLOC_APPEND( l, fd_block_idx_align(), fd_block_idx_footprint( lg_idx_max ) );
61 0 : void * slot_deque = FD_SCRATCH_ALLOC_APPEND( l, fd_slot_deque_align(), fd_slot_deque_footprint( block_max ) );
62 0 : void * alloc = FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
63 0 : ulong top = FD_SCRATCH_ALLOC_FINI( l, fd_blockstore_align() );
64 0 : FD_TEST( fd_ulong_align_up( top - (ulong)shmem, fd_alloc_align() ) == fd_ulong_align_up( fd_blockstore_footprint( shred_max, block_max, idx_max ), fd_alloc_align() ) );
65 :
66 0 : (void)shreds;
67 0 : fd_buf_shred_pool_new( shred_pool );
68 0 : fd_buf_shred_map_new ( shred_map, shred_max, seed );
69 0 : memset( blocks, 0, sizeof(fd_block_info_t) * block_max );
70 0 : FD_TEST( fd_block_map_new ( block_map, block_max, lock_cnt, BLOCK_INFO_PROBE_CNT, seed ) );
71 :
72 : /* Caller is in charge of freeing map_slot_para element store set.
73 : We need to explicitly do this since blocks is memset to 0, which
74 : is not a "freed" state in map_slot_para (slot 0 is a valid key). */
75 0 : fd_block_info_t * blocks_ = (fd_block_info_t *)blocks;
76 0 : for( ulong i=0UL; i<block_max; i++ ) {
77 0 : fd_block_map_private_ele_free( NULL, /* Not needed, avoids a join on block_map */
78 0 : &blocks_[i] );
79 0 : }
80 :
81 0 : blockstore_shmem->block_idx_gaddr = fd_wksp_gaddr( wksp, fd_block_idx_join( fd_block_idx_new( block_idx, lg_idx_max ) ) );
82 0 : blockstore_shmem->slot_deque_gaddr = fd_wksp_gaddr( wksp, fd_slot_deque_join (fd_slot_deque_new( slot_deque, block_max ) ) );
83 0 : blockstore_shmem->alloc_gaddr = fd_wksp_gaddr( wksp, fd_alloc_join (fd_alloc_new( alloc, wksp_tag ), wksp_tag ) );
84 :
85 0 : FD_TEST( blockstore_shmem->block_idx_gaddr );
86 0 : FD_TEST( blockstore_shmem->slot_deque_gaddr );
87 0 : FD_TEST( blockstore_shmem->alloc_gaddr );
88 :
89 0 : blockstore_shmem->blockstore_gaddr = fd_wksp_gaddr_fast( wksp, blockstore_shmem );
90 0 : blockstore_shmem->wksp_tag = wksp_tag;
91 0 : blockstore_shmem->seed = seed;
92 :
93 0 : blockstore_shmem->archiver = (fd_blockstore_archiver_t){
94 0 : .fd_size_max = FD_BLOCKSTORE_ARCHIVE_MIN_SIZE,
95 0 : .head = FD_BLOCKSTORE_ARCHIVE_START,
96 0 : .tail = FD_BLOCKSTORE_ARCHIVE_START,
97 0 : .num_blocks = 0,
98 0 : };
99 :
100 0 : blockstore_shmem->lps = FD_SLOT_NULL;
101 0 : blockstore_shmem->hcs = FD_SLOT_NULL;
102 0 : blockstore_shmem->wmk = FD_SLOT_NULL;
103 :
104 0 : blockstore_shmem->shred_max = shred_max;
105 0 : blockstore_shmem->block_max = block_max;
106 0 : blockstore_shmem->idx_max = idx_max;
107 :
108 0 : FD_COMPILER_MFENCE();
109 0 : FD_VOLATILE( blockstore_shmem->magic ) = FD_BLOCKSTORE_MAGIC;
110 0 : FD_COMPILER_MFENCE();
111 :
112 0 : return (void *)blockstore_shmem;
113 0 : }
114 :
115 : fd_blockstore_t *
116 0 : fd_blockstore_join( void * ljoin, void * shblockstore ) {
117 0 : fd_blockstore_t * join = (fd_blockstore_t *)ljoin;
118 0 : fd_blockstore_shmem_t * blockstore = (fd_blockstore_shmem_t *)shblockstore;
119 :
120 0 : if( FD_UNLIKELY( !join ) ) {
121 0 : FD_LOG_WARNING(( "NULL ljoin" ));
122 0 : return NULL;
123 0 : }
124 :
125 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)join, alignof(fd_blockstore_t) ) ) ) {
126 0 : FD_LOG_WARNING(( "misaligned ljoin" ));
127 0 : return NULL;
128 0 : }
129 :
130 0 : if( FD_UNLIKELY( !blockstore ) ) {
131 0 : FD_LOG_WARNING(( "NULL shblockstore" ));
132 0 : return NULL;
133 0 : }
134 :
135 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)blockstore, fd_blockstore_align() ) )) {
136 0 : FD_LOG_WARNING(( "misaligned shblockstore" ));
137 0 : return NULL;
138 0 : }
139 :
140 0 : if( FD_UNLIKELY( blockstore->magic != FD_BLOCKSTORE_MAGIC ) ) {
141 0 : FD_LOG_WARNING(( "bad magic" ));
142 0 : return NULL;
143 0 : }
144 :
145 0 : FD_SCRATCH_ALLOC_INIT( l, shblockstore );
146 0 : blockstore = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_blockstore_shmem_t), sizeof(fd_blockstore_shmem_t) );
147 0 : void * shreds = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_buf_shred_t), sizeof(fd_buf_shred_t) * blockstore->shred_max );
148 0 : void * shred_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_buf_shred_pool_align(), fd_buf_shred_pool_footprint() );
149 0 : void * shred_map = FD_SCRATCH_ALLOC_APPEND( l, fd_buf_shred_map_align(), fd_buf_shred_map_footprint( blockstore->shred_max ) );
150 0 : void * blocks = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_block_info_t), sizeof(fd_block_info_t) * blockstore->block_max );
151 0 : void * block_map = FD_SCRATCH_ALLOC_APPEND( l, fd_block_map_align(), fd_block_map_footprint( blockstore->block_max,
152 0 : fd_ulong_min(blockstore->block_max, BLOCK_INFO_LOCK_CNT),
153 0 : BLOCK_INFO_PROBE_CNT ) );
154 0 : FD_SCRATCH_ALLOC_FINI( l, fd_blockstore_align() );
155 :
156 0 : join->shmem = blockstore;
157 0 : fd_buf_shred_pool_join( join->shred_pool, shred_pool, shreds, blockstore->shred_max );
158 0 : fd_buf_shred_map_join ( join->shred_map, shred_map, shreds, blockstore->shred_max );
159 0 : fd_block_map_join ( join->block_map, block_map, blocks );
160 :
161 0 : FD_TEST( fd_buf_shred_pool_verify( join->shred_pool ) == FD_POOL_SUCCESS );
162 0 : FD_TEST( fd_buf_shred_map_verify ( join->shred_map ) == FD_MAP_SUCCESS );
163 0 : FD_TEST( fd_block_map_verify ( join->block_map ) == FD_MAP_SUCCESS );
164 :
165 0 : return join;
166 0 : }
167 :
168 : void *
169 0 : fd_blockstore_leave( fd_blockstore_t * blockstore ) {
170 :
171 0 : if( FD_UNLIKELY( !blockstore ) ) {
172 0 : FD_LOG_WARNING(( "NULL blockstore" ));
173 0 : return NULL;
174 0 : }
175 :
176 0 : fd_wksp_t * wksp = fd_wksp_containing( blockstore );
177 0 : if( FD_UNLIKELY( !wksp ) ) {
178 0 : FD_LOG_WARNING(( "shmem must be part of a workspace" ));
179 0 : return NULL;
180 0 : }
181 :
182 0 : FD_TEST( fd_buf_shred_pool_leave( blockstore->shred_pool ) );
183 0 : FD_TEST( fd_buf_shred_map_leave( blockstore->shred_map ) );
184 0 : FD_TEST( fd_block_map_leave( blockstore->block_map ) );
185 0 : FD_TEST( fd_block_idx_leave( fd_blockstore_block_idx( blockstore ) ) );
186 0 : FD_TEST( fd_slot_deque_leave( fd_blockstore_slot_deque( blockstore ) ) );
187 0 : FD_TEST( fd_alloc_leave( fd_blockstore_alloc( blockstore ) ) );
188 :
189 0 : return (void *)blockstore;
190 0 : }
191 :
192 : void *
193 0 : fd_blockstore_delete( void * shblockstore ) {
194 0 : fd_blockstore_t * blockstore = (fd_blockstore_t *)shblockstore;
195 :
196 0 : if( FD_UNLIKELY( !blockstore ) ) {
197 0 : FD_LOG_WARNING(( "NULL shblockstore" ));
198 0 : return NULL;
199 0 : }
200 :
201 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)blockstore, fd_blockstore_align() ) )) {
202 0 : FD_LOG_WARNING(( "misaligned shblockstore" ));
203 0 : return NULL;
204 0 : }
205 :
206 0 : if( FD_UNLIKELY( blockstore->shmem->magic != FD_BLOCKSTORE_MAGIC ) ) {
207 0 : FD_LOG_WARNING(( "bad magic" ));
208 0 : return NULL;
209 0 : }
210 :
211 0 : fd_wksp_t * wksp = fd_wksp_containing( blockstore );
212 0 : if( FD_UNLIKELY( !wksp ) ) {
213 0 : FD_LOG_WARNING(( "shmem must be part of a workspace" ));
214 0 : return NULL;
215 0 : }
216 :
217 : /* Delete all structures. */
218 :
219 0 : FD_TEST( fd_buf_shred_pool_delete( &blockstore->shred_pool ) );
220 0 : FD_TEST( fd_buf_shred_map_delete( &blockstore->shred_map ) );
221 0 : FD_TEST( fd_block_map_delete( &blockstore->block_map ) );
222 0 : FD_TEST( fd_block_idx_delete( fd_blockstore_block_idx( blockstore ) ) );
223 0 : FD_TEST( fd_slot_deque_delete( fd_blockstore_slot_deque( blockstore ) ) );
224 0 : FD_TEST( fd_alloc_delete( fd_blockstore_alloc( blockstore ) ) );
225 :
226 0 : FD_COMPILER_MFENCE();
227 0 : FD_VOLATILE( blockstore->shmem->magic ) = 0UL;
228 0 : FD_COMPILER_MFENCE();
229 :
230 0 : return blockstore;
231 0 : }
232 :
233 : #define check_read_err_safe( cond, msg ) \
234 : do { \
235 : if( FD_UNLIKELY( cond ) ) { \
236 : FD_LOG_WARNING(( "[%s] %s", __func__, msg )); \
237 : return FD_BLOCKSTORE_ERR_SLOT_MISSING; \
238 : } \
239 : } while(0);
240 :
241 : fd_blockstore_t *
242 : fd_blockstore_init( fd_blockstore_t * blockstore,
243 : int fd,
244 : ulong fd_size_max,
245 0 : ulong slot ) {
246 :
247 0 : if( fd_size_max < FD_BLOCKSTORE_ARCHIVE_MIN_SIZE ) {
248 0 : FD_LOG_ERR(( "archive file size too small" ));
249 0 : return NULL;
250 0 : }
251 0 : blockstore->shmem->archiver.fd_size_max = fd_size_max;
252 :
253 : //build_idx( blockstore, fd );
254 0 : lseek( fd, 0, SEEK_END );
255 :
256 : /* initialize fields using slot bank */
257 :
258 0 : ulong smr = slot;
259 :
260 0 : blockstore->shmem->lps = smr;
261 0 : blockstore->shmem->hcs = smr;
262 0 : blockstore->shmem->wmk = smr;
263 :
264 0 : fd_block_map_query_t query[1];
265 :
266 0 : int err = fd_block_map_prepare( blockstore->block_map, &smr, NULL, query, FD_MAP_FLAG_BLOCKING );
267 0 : fd_block_info_t * ele = fd_block_map_query_ele( query );
268 0 : if ( FD_UNLIKELY( err ) ) FD_LOG_ERR(( "failed to prepare block map for slot %lu", smr ));
269 :
270 0 : ele->slot = smr;
271 0 : memset( ele->child_slots, UCHAR_MAX, FD_BLOCKSTORE_CHILD_SLOT_MAX * sizeof( ulong ) );
272 0 : ele->child_slot_cnt = 0;
273 0 : ele->flags = fd_uchar_set_bit(
274 0 : fd_uchar_set_bit(
275 0 : fd_uchar_set_bit(
276 0 : fd_uchar_set_bit(
277 0 : fd_uchar_set_bit( ele->flags,
278 0 : FD_BLOCK_FLAG_COMPLETED ),
279 0 : FD_BLOCK_FLAG_PROCESSED ),
280 0 : FD_BLOCK_FLAG_EQVOCSAFE ),
281 0 : FD_BLOCK_FLAG_CONFIRMED ),
282 0 : FD_BLOCK_FLAG_FINALIZED );
283 : // ele->ref_tick = 0;
284 0 : ele->ts = 0;
285 0 : ele->consumed_idx = 0;
286 0 : ele->received_idx = 0;
287 0 : ele->buffered_idx = 0;
288 0 : ele->data_complete_idx = 0;
289 0 : ele->slot_complete_idx = 0;
290 0 : ele->ticks_consumed = 0;
291 0 : ele->tick_hash_count_accum = 0;
292 0 : fd_block_set_null( ele->data_complete_idxs );
293 :
294 : /* Set all fields to 0. Caller's responsibility to check gaddr and sz != 0. */
295 :
296 0 : fd_block_map_publish( query );
297 :
298 0 : return blockstore;
299 0 : }
300 :
301 : void
302 0 : fd_blockstore_fini( fd_blockstore_t * blockstore ) {
303 : /* Free all allocations by removing all slots (whether they are
304 : complete or not). */
305 0 : fd_block_info_t * ele0 = (fd_block_info_t *)fd_block_map_shele( blockstore->block_map );
306 0 : ulong block_max = fd_block_map_ele_max( blockstore->block_map );
307 0 : for( ulong ele_idx=0; ele_idx<block_max; ele_idx++ ) {
308 0 : fd_block_info_t * ele = ele0 + ele_idx;
309 0 : if( ele->slot == 0 ) continue; /* unused */
310 0 : fd_blockstore_slot_remove( blockstore, ele->slot );
311 0 : }
312 0 : }
313 :
314 : /* Remove a slot from blockstore */
315 : void
316 0 : fd_blockstore_slot_remove( fd_blockstore_t * blockstore, ulong slot ) {
317 0 : FD_LOG_DEBUG(( "[%s] slot: %lu", __func__, slot ));
318 :
319 : /* It is not safe to remove a replaying block. */
320 0 : fd_block_map_query_t query[1] = { 0 };
321 0 : ulong parent_slot = FD_SLOT_NULL;
322 0 : ulong received_idx = 0;
323 0 : int err = FD_MAP_ERR_AGAIN;
324 0 : while( err == FD_MAP_ERR_AGAIN ) {
325 0 : err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, query, 0 );
326 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
327 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) return; /* slot not found */
328 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
329 0 : if( FD_UNLIKELY( fd_uchar_extract_bit( block_info->flags, FD_BLOCK_FLAG_REPLAYING ) ) ) {
330 0 : FD_LOG_WARNING(( "[%s] slot %lu has replay in progress. not removing.", __func__, slot ));
331 0 : return;
332 0 : }
333 0 : parent_slot = block_info->parent_slot;
334 0 : received_idx = block_info->received_idx;
335 0 : err = fd_block_map_query_test( query );
336 0 : }
337 :
338 0 : err = fd_block_map_remove( blockstore->block_map, &slot, query, FD_MAP_FLAG_BLOCKING );
339 : /* not possible to fail */
340 0 : FD_TEST( !fd_blockstore_block_info_test( blockstore, slot ) );
341 :
342 : /* Unlink slot from its parent only if it is not published. */
343 0 : err = fd_block_map_prepare( blockstore->block_map, &parent_slot, NULL, query, FD_MAP_FLAG_BLOCKING );
344 0 : fd_block_info_t * parent_block_info = fd_block_map_query_ele( query );
345 0 : if( FD_LIKELY( parent_block_info ) ) {
346 0 : for( ulong i = 0; i < parent_block_info->child_slot_cnt; i++ ) {
347 0 : if( FD_LIKELY( parent_block_info->child_slots[i] == slot ) ) {
348 0 : parent_block_info->child_slots[i] =
349 0 : parent_block_info->child_slots[--parent_block_info->child_slot_cnt];
350 0 : }
351 0 : }
352 0 : }
353 0 : fd_block_map_publish( query );
354 :
355 : /* Remove buf_shreds. */
356 0 : for( uint idx = 0; idx < received_idx; idx++ ) {
357 0 : fd_blockstore_shred_remove( blockstore, slot, idx );
358 0 : }
359 :
360 0 : return;
361 0 : }
362 :
363 : void
364 : fd_blockstore_publish( fd_blockstore_t * blockstore,
365 : int fd FD_PARAM_UNUSED,
366 0 : ulong wmk ) {
367 0 : FD_LOG_NOTICE(( "[%s] wmk %lu => smr %lu", __func__, blockstore->shmem->wmk, wmk ));
368 :
369 : /* Caller is incorrectly calling publish. */
370 :
371 0 : if( FD_UNLIKELY( blockstore->shmem->wmk == wmk ) ) {
372 0 : FD_LOG_WARNING(( "[%s] attempting to re-publish when wmk %lu already at smr %lu", __func__, blockstore->shmem->wmk, wmk ));
373 0 : return;
374 0 : }
375 :
376 : /* q uses the slot_deque as the BFS queue */
377 :
378 0 : ulong * q = fd_blockstore_slot_deque( blockstore );
379 :
380 : /* Clear the deque, preparing it to be reused. */
381 :
382 0 : fd_slot_deque_remove_all( q );
383 :
384 : /* Push the watermark onto the queue. */
385 :
386 0 : fd_slot_deque_push_tail( q, blockstore->shmem->wmk );
387 :
388 : /* Conduct a BFS to find slots to prune or archive. */
389 :
390 0 : while( !fd_slot_deque_empty( q ) ) {
391 0 : ulong slot = fd_slot_deque_pop_head( q );
392 0 : fd_block_map_query_t query[1];
393 : /* Blocking read -- we need the block_info ptr to be valid for the
394 : whole time that we are writing stuff to the archiver file. */
395 0 : int err = fd_block_map_prepare( blockstore->block_map, &slot, NULL, query, FD_MAP_FLAG_BLOCKING );
396 0 : if( FD_UNLIKELY( err ) ) {
397 0 : FD_LOG_WARNING(( "[%s] failed to prepare block map for blockstore publishing %lu", __func__, slot ));
398 0 : continue;
399 0 : }
400 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
401 :
402 : /* Add slot's children to the queue. */
403 :
404 0 : for( ulong i = 0; i < block_info->child_slot_cnt; i++ ) {
405 :
406 : /* Stop upon reaching the SMR. */
407 :
408 0 : if( FD_LIKELY( block_info->child_slots[i] != wmk ) ) {
409 0 : fd_slot_deque_push_tail( q, block_info->child_slots[i] );
410 0 : }
411 0 : }
412 :
413 : /* Archive the block into a file if it is finalized. */
414 :
415 : /* if( fd_uchar_extract_bit( block_info->flags, FD_BLOCK_FLAG_FINALIZED ) ) {
416 : fd_block_t * block = fd_wksp_laddr_fast( fd_blockstore_wksp( blockstore ), block_info->block_gaddr );
417 : uchar * data = fd_wksp_laddr_fast( fd_blockstore_wksp( blockstore ), block->data_gaddr );
418 :
419 : fd_block_idx_t * block_idx = fd_blockstore_block_idx( blockstore );
420 :
421 : if( FD_UNLIKELY( fd_block_idx_query( block_idx, slot, NULL ) ) ) {
422 : FD_LOG_ERR(( "[%s] invariant violation. attempted to re-archive finalized block: %lu", __func__, slot ));
423 : } else {
424 : fd_blockstore_ser_t ser = {
425 : .block_map = block_info,
426 : .block = block,
427 : .data = data
428 : };
429 : fd_blockstore_block_checkpt( blockstore, &ser, fd, slot );
430 : }
431 : } */
432 0 : fd_block_map_cancel( query ); // TODO: maybe we should not make prepare so large and instead call prepare again in helpers
433 0 : fd_blockstore_slot_remove( blockstore, slot );
434 0 : }
435 :
436 : /* Scan to clean up any orphaned blocks or shreds < new SMR. */
437 :
438 0 : for (ulong slot = blockstore->shmem->wmk; slot < wmk; slot++) {
439 0 : fd_blockstore_slot_remove( blockstore, slot );
440 0 : }
441 :
442 0 : blockstore->shmem->wmk = wmk;
443 :
444 0 : return;
445 0 : }
446 :
447 : void
448 0 : fd_blockstore_shred_remove( fd_blockstore_t * blockstore, ulong slot, uint idx ) {
449 0 : fd_shred_key_t key = { slot, idx };
450 :
451 0 : fd_buf_shred_map_query_t query[1] = { 0 };
452 0 : int err = fd_buf_shred_map_remove( blockstore->shred_map, &key, NULL, query, FD_MAP_FLAG_BLOCKING );
453 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_CORRUPT ) ) FD_LOG_ERR(( "[%s] map corrupt: shred %lu %u", __func__, slot, idx ));
454 :
455 0 : if( FD_LIKELY( err == FD_MAP_SUCCESS ) ) {
456 0 : fd_buf_shred_t * shred = fd_buf_shred_map_query_ele( query );
457 0 : int err = fd_buf_shred_pool_release( blockstore->shred_pool, shred, 1 );
458 0 : if( FD_UNLIKELY( err == FD_POOL_ERR_INVAL ) ) FD_LOG_ERR(( "[%s] pool error: shred %lu %u not in pool", __func__, slot, idx ));
459 0 : if( FD_UNLIKELY( err == FD_POOL_ERR_CORRUPT ) ) FD_LOG_ERR(( "[%s] pool corrupt: shred %lu %u", __func__, slot, idx ));
460 0 : FD_TEST( !err );
461 0 : }
462 : // FD_TEST( fd_buf_shred_pool_verify( blockstore->shred_pool ) == FD_POOL_SUCCESS );
463 : // FD_TEST( fd_buf_shred_map_verify ( blockstore->shred_map ) == FD_MAP_SUCCESS );
464 0 : }
465 :
466 : void
467 0 : fd_blockstore_shred_insert( fd_blockstore_t * blockstore, fd_shred_t const * shred ) {
468 : // FD_LOG_NOTICE(( "[%s] slot %lu idx %u", __func__, shred->slot, shred->idx ));
469 :
470 0 : ulong slot = shred->slot;
471 :
472 0 : if( FD_UNLIKELY( !fd_shred_is_data( shred->variant ) ) ) FD_LOG_ERR(( "Expected data shred" ));
473 :
474 0 : if( FD_UNLIKELY( slot < blockstore->shmem->wmk ) ) {
475 0 : FD_LOG_DEBUG(( "[%s] slot %lu < wmk %lu. not inserting shred", __func__, slot, blockstore->shmem->wmk ));
476 0 : return;
477 0 : }
478 :
479 0 : fd_shred_key_t key = { slot, .idx = shred->idx };
480 :
481 : /* Test if the blockstore already contains this shred key. */
482 :
483 0 : if( FD_UNLIKELY( fd_blockstore_shred_test( blockstore, slot, shred->idx ) ) ) {
484 :
485 : /* If we receive a shred with the same key (slot and shred idx) but
486 : different payload as one we already have, we'll only keep the
487 : first. Once we receive the full block, we'll use merkle chaining
488 : from the last FEC set to determine whether we have the correct
489 : shred at every index.
490 :
491 : Later, if the block fails to replay (dead block) or the block
492 : hash doesn't match the one we observe from votes, we'll dump the
493 : entire block and use repair to recover the one a majority (52%)
494 : of the cluster has voted on. */
495 :
496 0 : for(;;) {
497 0 : fd_buf_shred_map_query_t query[1] = { 0 };
498 0 : int err = fd_buf_shred_map_query_try( blockstore->shred_map, &key, NULL, query, 0 );
499 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_CORRUPT ) ) FD_LOG_ERR(( "[%s] %s. shred: (%lu, %u)", __func__, fd_buf_shred_map_strerror( err ), slot, shred->idx ));
500 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
501 0 : fd_buf_shred_t * buf_shred = fd_buf_shred_map_query_ele( query );
502 : /* An existing shred has the same key. Eqvoc iff the payload is different */
503 0 : buf_shred->eqvoc = fd_shred_payload_sz( &buf_shred->hdr ) != fd_shred_payload_sz( shred ) ||
504 0 : 0!=memcmp( fd_shred_data_payload( &buf_shred->hdr ), fd_shred_data_payload( shred ), fd_shred_payload_sz( shred ) );
505 0 : err = fd_buf_shred_map_query_test( query );
506 0 : if( FD_LIKELY( err == FD_MAP_SUCCESS) ) break;
507 0 : }
508 0 : return;
509 0 : }
510 :
511 : /* Insert the new shred. */
512 :
513 0 : int err;
514 0 : fd_buf_shred_t * ele = fd_buf_shred_pool_acquire( blockstore->shred_pool, NULL, 1, &err );
515 0 : if( FD_UNLIKELY( err == FD_POOL_ERR_EMPTY ) ) FD_LOG_ERR(( "[%s] %s. increase blockstore shred_max.", __func__, fd_buf_shred_pool_strerror( err ) ));
516 0 : if( FD_UNLIKELY( err == FD_POOL_ERR_CORRUPT ) ) FD_LOG_ERR(( "[%s] %s.", __func__, fd_buf_shred_pool_strerror( err ) ));
517 :
518 0 : ele->key = key;
519 0 : ele->hdr = *shred;
520 0 : fd_memcpy( &ele->buf, shred, fd_shred_sz( shred ) );
521 0 : err = fd_buf_shred_map_insert( blockstore->shred_map, ele, FD_MAP_FLAG_BLOCKING );
522 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_INVAL ) ) FD_LOG_ERR(( "[%s] map error. ele not in pool.", __func__ ));
523 :
524 : /* Update shred's associated slot meta */
525 :
526 0 : if( FD_UNLIKELY( !fd_blockstore_block_info_test( blockstore, slot ) ) ) {
527 0 : fd_block_map_query_t query[1] = { 0 };
528 : /* Prepare will succeed regardless of if the key is in the map or not. It either returns
529 : the element at that idx, or it will return a spot to insert new stuff. So we need to check
530 : if that space is actually unused, to signify that we are adding a new entry. */
531 :
532 : /* Try to insert slot into block_map TODO make non blocking? */
533 :
534 0 : err = fd_block_map_prepare( blockstore->block_map, &slot, NULL, query, FD_MAP_FLAG_BLOCKING );
535 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
536 :
537 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_FULL ) ){
538 0 : FD_LOG_ERR(( "[%s] OOM: failed to insert new block map entry. blockstore needs to save metadata for all slots >= SMR, so increase memory or check for issues with publishing new SMRs.", __func__ ));
539 0 : }
540 :
541 : /* Initialize the block_info. Note some fields are initialized
542 : to dummy values because we do not have all the necessary metadata
543 : yet. */
544 :
545 0 : block_info->slot = slot;
546 :
547 0 : block_info->parent_slot = slot - shred->data.parent_off;
548 0 : memset( block_info->child_slots, UCHAR_MAX, FD_BLOCKSTORE_CHILD_SLOT_MAX * sizeof(ulong) );
549 0 : block_info->child_slot_cnt = 0;
550 :
551 0 : block_info->block_hash = ( fd_hash_t ){ 0 };
552 0 : block_info->bank_hash = ( fd_hash_t ){ 0 };
553 0 : block_info->flags = fd_uchar_set_bit( 0, FD_BLOCK_FLAG_RECEIVING );
554 0 : block_info->ts = 0;
555 : // block_info->ref_tick = (uchar)( (int)shred->data.flags &
556 : // (int)FD_SHRED_DATA_REF_TICK_MASK );
557 0 : block_info->buffered_idx = UINT_MAX;
558 0 : block_info->received_idx = 0;
559 0 : block_info->consumed_idx = UINT_MAX;
560 :
561 0 : block_info->data_complete_idx = UINT_MAX;
562 0 : block_info->slot_complete_idx = UINT_MAX;
563 :
564 0 : block_info->ticks_consumed = 0;
565 0 : block_info->tick_hash_count_accum = 0;
566 :
567 0 : fd_block_set_null( block_info->data_complete_idxs );
568 :
569 0 : block_info->block_gaddr = 0;
570 :
571 0 : fd_block_map_publish( query );
572 :
573 0 : FD_TEST( fd_blockstore_block_info_test( blockstore, slot ) );
574 0 : }
575 0 : fd_block_map_query_t query[1] = { 0 };
576 0 : err = fd_block_map_prepare( blockstore->block_map, &slot, NULL, query, FD_MAP_FLAG_BLOCKING );
577 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query ); /* should be impossible for this to fail */
578 :
579 : /* Advance the buffered_idx watermark. */
580 :
581 0 : uint prev_buffered_idx = block_info->buffered_idx;
582 0 : while( FD_LIKELY( fd_blockstore_shred_test( blockstore, slot, block_info->buffered_idx + 1 ) ) ) {
583 0 : block_info->buffered_idx++;
584 0 : }
585 :
586 : /* Mark the ending shred idxs of entry batches. */
587 :
588 0 : fd_block_set_insert_if( block_info->data_complete_idxs, shred->data.flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE, shred->idx );
589 :
590 : /* Advance the data_complete_idx watermark using the shreds in between
591 : the previous consumed_idx and current consumed_idx. */
592 :
593 0 : for (uint idx = prev_buffered_idx + 1; block_info->buffered_idx != FD_SHRED_IDX_NULL && idx <= block_info->buffered_idx; idx++) {
594 0 : if( FD_UNLIKELY( fd_block_set_test( block_info->data_complete_idxs, idx ) ) ) {
595 0 : block_info->data_complete_idx = idx;
596 0 : }
597 0 : }
598 :
599 : /* Update received_idx and slot_complete_idx. */
600 :
601 0 : block_info->received_idx = fd_uint_max( block_info->received_idx, shred->idx + 1 );
602 0 : if( FD_UNLIKELY( shred->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE ) ) {
603 : // FD_LOG_NOTICE(( "slot %lu %u complete", slot, shred->idx ));
604 0 : block_info->slot_complete_idx = shred->idx;
605 0 : }
606 :
607 0 : ulong parent_slot = block_info->parent_slot;
608 :
609 : // FD_LOG_DEBUG(( "shred: (%lu, %u). consumed: %u, received: %u, complete: %u",
610 : // slot,
611 : // shred->idx,
612 : // block_info->buffered_idx,
613 : // block_info->received_idx,
614 : // block_info->slot_complete_idx ));
615 0 : fd_block_map_publish( query );
616 :
617 : /* Update ancestry metadata: parent_slot, is_connected, next_slot.
618 :
619 : If the parent_slot happens to be very old, there's a chance that
620 : it's hash probe could collide with an existing slot in the block
621 : map, and cause what looks like an OOM. Instead of using map_prepare
622 : and hitting this collision, we can either check that the
623 : parent_slot lives in the map with a block_info_test, or use the
624 : shmem wmk value as a more general guard against querying for
625 : parents that are too old. */
626 :
627 0 : if( FD_LIKELY( parent_slot < blockstore->shmem->wmk ) ) return;
628 :
629 0 : err = fd_block_map_prepare( blockstore->block_map, &parent_slot, NULL, query, FD_MAP_FLAG_BLOCKING );
630 0 : fd_block_info_t * parent_block_info = fd_block_map_query_ele( query );
631 :
632 : /* Add this slot to its parent's child slots if not already there. */
633 :
634 0 : if( FD_LIKELY( parent_block_info && parent_block_info->slot == parent_slot ) ) {
635 0 : int found = 0;
636 0 : for( ulong i = 0; i < parent_block_info->child_slot_cnt; i++ ) {
637 0 : if( FD_LIKELY( parent_block_info->child_slots[i] == slot ) ) {
638 0 : found = 1;
639 0 : break;
640 0 : }
641 0 : }
642 0 : if( FD_UNLIKELY( !found ) ) { /* add to parent's child slots if not already there */
643 0 : if( FD_UNLIKELY( parent_block_info->child_slot_cnt == FD_BLOCKSTORE_CHILD_SLOT_MAX ) ) {
644 0 : FD_LOG_ERR(( "failed to add slot %lu to parent %lu's children. exceeding child slot max",
645 0 : slot,
646 0 : parent_block_info->slot ));
647 0 : }
648 0 : parent_block_info->child_slots[parent_block_info->child_slot_cnt++] = slot;
649 0 : }
650 0 : }
651 0 : if( FD_LIKELY( err == FD_MAP_SUCCESS ) ) {
652 0 : fd_block_map_publish( query );
653 0 : } else {
654 : /* err is FD_MAP_ERR_FULL. Not in a valid prepare. Can happen if we
655 : are about to OOM, or if the parents are so far away that it just
656 : happens to chain longer than the probe_max. Somewhat covered by
657 : the early return, but there are some edge cases where we reach
658 : here, and it shouldn't be a LOG_ERR */
659 0 : FD_LOG_WARNING(( "block info not found for parent slot %lu. Have we seen it before?", parent_slot ));
660 0 : }
661 :
662 : //FD_TEST( fd_block_map_verify( blockstore->block_map ) == FD_MAP_SUCCESS );
663 0 : }
664 :
665 : int
666 0 : fd_blockstore_shred_test( fd_blockstore_t * blockstore, ulong slot, uint idx ) {
667 0 : fd_shred_key_t key = { slot, idx };
668 0 : fd_buf_shred_map_query_t query[1] = { 0 };
669 :
670 0 : for(;;) {
671 0 : int err = fd_buf_shred_map_query_try( blockstore->shred_map, &key, NULL, query, 0 );
672 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_CORRUPT ) ) FD_LOG_ERR(( "[%s] slot: %lu idx: %u. %s", __func__, slot, idx, fd_buf_shred_map_strerror( err ) ));
673 0 : if( FD_LIKELY( !fd_buf_shred_map_query_test( query ) ) ) return err != FD_MAP_ERR_KEY;
674 0 : }
675 0 : }
676 :
677 : int
678 0 : fd_blockstore_block_info_test( fd_blockstore_t * blockstore, ulong slot ) {
679 0 : int err = FD_MAP_ERR_AGAIN;
680 0 : while( err == FD_MAP_ERR_AGAIN ){
681 0 : fd_block_map_query_t query[1] = { 0 };
682 0 : err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, query, 0 );
683 0 : if( err == FD_MAP_ERR_AGAIN ) continue;
684 0 : if( err == FD_MAP_ERR_KEY ) return 0;
685 0 : err = fd_block_map_query_test( query );
686 0 : }
687 0 : return 1;
688 0 : }
689 :
690 : fd_block_info_t *
691 0 : fd_blockstore_block_map_query( fd_blockstore_t * blockstore, ulong slot ){
692 0 : fd_block_map_query_t quer[1] = { 0 };
693 0 : int err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, quer, FD_MAP_FLAG_BLOCKING );
694 0 : fd_block_info_t * meta = fd_block_map_query_ele( quer );
695 0 : if( err ) return NULL;
696 0 : return meta;
697 0 : }
698 :
699 : int
700 0 : fd_blockstore_block_info_remove( fd_blockstore_t * blockstore, ulong slot ){
701 0 : int err = FD_MAP_ERR_AGAIN;
702 0 : while( err == FD_MAP_ERR_AGAIN ){
703 0 : err = fd_block_map_remove( blockstore->block_map, &slot, NULL, 0 );
704 0 : if( err == FD_MAP_ERR_KEY ) return FD_BLOCKSTORE_ERR_SLOT_MISSING;
705 0 : }
706 0 : return FD_BLOCKSTORE_SUCCESS;
707 0 : }
708 :
709 : long
710 0 : fd_buf_shred_query_copy_data( fd_blockstore_t * blockstore, ulong slot, uint idx, void * buf, ulong buf_sz ) {
711 0 : if( buf_sz < FD_SHRED_MAX_SZ ) return -1;
712 0 : fd_shred_key_t key = { slot, idx };
713 0 : ulong sz = 0;
714 0 : int err = FD_MAP_ERR_AGAIN;
715 0 : while( err == FD_MAP_ERR_AGAIN ) {
716 0 : fd_buf_shred_map_query_t query[1] = { 0 };
717 0 : err = fd_buf_shred_map_query_try( blockstore->shred_map, &key, NULL, query, 0 );
718 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) return -1;
719 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_CORRUPT ) ) FD_LOG_ERR(( "[%s] map corrupt. shred %lu %u", __func__, slot, idx ));
720 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
721 0 : fd_buf_shred_t const * shred = fd_buf_shred_map_query_ele_const( query );
722 0 : sz = fd_shred_sz( &shred->hdr );
723 0 : memcpy( buf, shred->buf, sz );
724 0 : err = fd_buf_shred_map_query_test( query );
725 0 : }
726 0 : FD_TEST( !err );
727 0 : return (long)sz;
728 0 : }
729 :
730 : int
731 0 : fd_blockstore_block_hash_query( fd_blockstore_t * blockstore, ulong slot, fd_hash_t * hash_out ) {
732 0 : for(;;) { /* Speculate */
733 0 : fd_block_map_query_t query[1] = { 0 };
734 0 : int err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, query, 0 );
735 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) return FD_BLOCKSTORE_ERR_KEY;
736 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
737 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
738 0 : *hash_out = block_info->block_hash;
739 0 : if( FD_LIKELY( fd_block_map_query_test( query ) == FD_MAP_SUCCESS ) ) return FD_BLOCKSTORE_SUCCESS;
740 0 : }
741 0 : }
742 :
743 : int
744 0 : fd_blockstore_bank_hash_query( fd_blockstore_t * blockstore, ulong slot, fd_hash_t * hash_out ) {
745 0 : for(;;) { /* Speculate */
746 0 : fd_block_map_query_t query[1] = { 0 };
747 0 : int err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, query, 0 );
748 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) return FD_BLOCKSTORE_ERR_KEY;
749 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
750 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
751 0 : *hash_out = block_info->bank_hash;
752 0 : if( FD_LIKELY( fd_block_map_query_test( query ) == FD_MAP_SUCCESS ) ) return FD_BLOCKSTORE_SUCCESS;
753 0 : }
754 0 : }
755 :
756 : ulong
757 0 : fd_blockstore_parent_slot_query( fd_blockstore_t * blockstore, ulong slot ) {
758 0 : int err = FD_MAP_ERR_AGAIN;
759 0 : ulong parent_slot = FD_SLOT_NULL;
760 0 : while( err == FD_MAP_ERR_AGAIN ){
761 0 : fd_block_map_query_t query[1] = { 0 };
762 0 : err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, query, 0 );
763 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
764 :
765 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) return FD_SLOT_NULL;
766 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
767 :
768 0 : parent_slot = block_info->parent_slot;
769 0 : err = fd_block_map_query_test( query );
770 0 : }
771 0 : return parent_slot;
772 0 : }
773 :
774 : int
775 : fd_blockstore_slice_query( fd_blockstore_t * blockstore,
776 : ulong slot,
777 : uint start_idx,
778 : uint end_idx /* inclusive */,
779 : ulong max,
780 : uchar * buf,
781 0 : ulong * buf_sz ) {
782 : /* verify that the batch idxs provided is at batch boundaries*/
783 :
784 : // FD_LOG_NOTICE(( "querying for %lu %u %u", slot, start_idx, end_idx ));
785 :
786 0 : ulong off = 0;
787 0 : for(uint idx = start_idx; idx <= end_idx; idx++) {
788 0 : ulong payload_sz = 0;
789 :
790 0 : for(;;) { /* speculative copy one shred */
791 0 : fd_shred_key_t key = { slot, idx };
792 0 : fd_buf_shred_map_query_t query[1] = { 0 };
793 0 : int err = fd_buf_shred_map_query_try( blockstore->shred_map, &key, NULL, query, 0 );
794 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_CORRUPT ) ){
795 0 : FD_LOG_WARNING(( "[%s] key: (%lu, %u) %s", __func__, slot, idx, fd_buf_shred_map_strerror( err ) ));
796 0 : return FD_BLOCKSTORE_ERR_CORRUPT;
797 0 : }
798 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ){
799 0 : FD_LOG_WARNING(( "[%s] key: (%lu, %u) %s", __func__, slot, idx, fd_buf_shred_map_strerror( err ) ));
800 0 : return FD_BLOCKSTORE_ERR_KEY;
801 0 : }
802 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
803 :
804 0 : fd_buf_shred_t const * shred = fd_buf_shred_map_query_ele_const( query );
805 0 : uchar const * payload = fd_shred_data_payload( &shred->hdr );
806 0 : payload_sz = fd_shred_payload_sz( &shred->hdr );
807 0 : if( FD_UNLIKELY( off + payload_sz > max ) ) {
808 0 : FD_LOG_WARNING(( "[%s] increase `max`", __func__ )); /* caller needs to increase max */
809 0 : return FD_BLOCKSTORE_ERR_INVAL;
810 0 : }
811 :
812 0 : if( FD_UNLIKELY( payload_sz > FD_SHRED_DATA_PAYLOAD_MAX ) ) return FD_BLOCKSTORE_ERR_SHRED_INVALID;
813 0 : if( FD_UNLIKELY( off + payload_sz > max ) ) return FD_BLOCKSTORE_ERR_NO_MEM;
814 0 : fd_memcpy( buf + off, payload, payload_sz );
815 0 : err = fd_buf_shred_map_query_test( query );
816 0 : if( FD_LIKELY( err == FD_MAP_SUCCESS ) ) break;
817 0 : }; /* successful speculative copy */
818 :
819 0 : off += payload_sz;
820 0 : }
821 0 : *buf_sz = off;
822 0 : return FD_BLOCKSTORE_SUCCESS;
823 0 : }
824 :
825 : int
826 0 : fd_blockstore_shreds_complete( fd_blockstore_t * blockstore, ulong slot ){
827 : //fd_block_t * block_exists = fd_blockstore_block_query( blockstore, slot );
828 0 : fd_block_map_query_t query[1];
829 0 : int complete = 0;
830 0 : int err = FD_MAP_ERR_AGAIN;
831 0 : while( err == FD_MAP_ERR_AGAIN ){
832 0 : err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, query, 0 );
833 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
834 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) return 0;
835 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
836 0 : complete = ( block_info->buffered_idx != FD_SHRED_IDX_NULL ) &&
837 0 : ( block_info->slot_complete_idx == block_info->buffered_idx );
838 0 : err = fd_block_map_query_test( query );
839 0 : }
840 0 : return complete;
841 :
842 : /* When replacing block_query( slot ) != NULL with this function:
843 : There are other things verified in a successful deshred & scan block that are not verified here.
844 : scan_block does a round of well-formedness checks like parsing txns, and no premature end of batch
845 : like needing cnt, microblock, microblock format.
846 :
847 : This maybe should be fine in places where we check both
848 : shreds_complete and flag PROCESSED/REPLAYING is set, because validation has been for sure done
849 : if the block has been replayed
850 :
851 : Should be careful in places that call this now that happen before the block is replayed, if we want
852 : to assume the shreds are well-formed we can't. */
853 :
854 0 : }
855 :
856 : int
857 : fd_blockstore_block_map_query_volatile( fd_blockstore_t * blockstore,
858 : int fd,
859 : ulong slot,
860 0 : fd_block_info_t * block_info_out ) {
861 :
862 : /* WARNING: this code is extremely delicate. Do NOT modify without
863 : understanding all the invariants. In particular, we must never
864 : dereference through a corrupt pointer. It's OK for the destination
865 : data to be overwritten/invalid as long as the memory location is
866 : valid. As long as we don't crash, we can validate the data after it
867 : is read. */
868 :
869 0 : fd_block_idx_t * block_idx = fd_blockstore_block_idx( blockstore );
870 :
871 0 : ulong off = ULONG_MAX;
872 0 : for( ;; ) {
873 0 : fd_block_idx_t * idx_entry = fd_block_idx_query( block_idx, slot, NULL );
874 0 : if( FD_LIKELY( idx_entry ) ) off = idx_entry->off;
875 0 : break;
876 0 : }
877 :
878 0 : if( FD_UNLIKELY( off < ULONG_MAX ) ) { /* optimize for non-archival queries */
879 0 : if( FD_UNLIKELY( lseek( fd, (long)off, SEEK_SET ) == -1 ) ) {
880 0 : FD_LOG_WARNING(( "failed to seek" ));
881 0 : return FD_BLOCKSTORE_ERR_SLOT_MISSING;
882 0 : }
883 0 : ulong rsz;
884 0 : int err = fd_io_read( fd, block_info_out, sizeof( fd_block_info_t ), sizeof( fd_block_info_t ), &rsz );
885 0 : if( FD_UNLIKELY( err ) ) {
886 0 : FD_LOG_WARNING(( "failed to read block map entry" ));
887 0 : return FD_BLOCKSTORE_ERR_SLOT_MISSING;
888 0 : }
889 0 : return FD_BLOCKSTORE_SUCCESS;
890 0 : }
891 :
892 0 : int err = FD_MAP_ERR_AGAIN;
893 0 : while( err == FD_MAP_ERR_AGAIN ) {
894 0 : fd_block_map_query_t quer[1] = { 0 };
895 0 : err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, quer, 0 );
896 0 : fd_block_info_t const * query = fd_block_map_query_ele_const( quer );
897 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) return FD_BLOCKSTORE_ERR_SLOT_MISSING;
898 :
899 0 : *block_info_out = *query;
900 :
901 0 : err = fd_block_map_query_test( quer );
902 0 : }
903 0 : return FD_BLOCKSTORE_SUCCESS;
904 0 : }
905 :
906 : void
907 0 : fd_blockstore_log_block_status( fd_blockstore_t * blockstore, ulong around_slot ) {
908 0 : fd_block_map_query_t query[1] = { 0 };
909 0 : uint received_idx = 0;
910 0 : uint buffered_idx = 0;
911 0 : uint slot_complete_idx = 0;
912 :
913 0 : for( ulong i = around_slot - 5; i < around_slot + 20; ++i ) {
914 0 : int err = FD_MAP_ERR_AGAIN;
915 0 : while( err == FD_MAP_ERR_AGAIN ){
916 0 : err = fd_block_map_query_try( blockstore->block_map, &i, NULL, query, 0 );
917 0 : fd_block_info_t * slot_entry = fd_block_map_query_ele( query );
918 0 : if( err == FD_MAP_ERR_KEY ) break;
919 0 : if( err == FD_MAP_ERR_AGAIN ) continue;
920 0 : received_idx = slot_entry->received_idx;
921 0 : buffered_idx = slot_entry->buffered_idx;
922 0 : slot_complete_idx = slot_entry->slot_complete_idx;
923 0 : err = fd_block_map_query_test( query );
924 0 : if( err == FD_MAP_ERR_KEY ) break;
925 0 : }
926 :
927 0 : if( err == FD_MAP_ERR_KEY ) continue;
928 :
929 0 : FD_LOG_NOTICE(( "%sslot=%lu received=%u consumed=%u finished=%u",
930 0 : ( i == around_slot ? "*" : " " ),
931 0 : i,
932 0 : received_idx,
933 0 : buffered_idx,
934 0 : slot_complete_idx ));
935 0 : }
936 0 : }
937 :
938 : static char *
939 0 : fd_smart_size( ulong sz, char * tmp, size_t tmpsz ) {
940 0 : if( sz <= (1UL<<7) )
941 0 : snprintf( tmp, tmpsz, "%lu B", sz );
942 0 : else if( sz <= (1UL<<17) )
943 0 : snprintf( tmp, tmpsz, "%.3f KB", ((double)sz/((double)(1UL<<10))) );
944 0 : else if( sz <= (1UL<<27) )
945 0 : snprintf( tmp, tmpsz, "%.3f MB", ((double)sz/((double)(1UL<<20))) );
946 0 : else
947 0 : snprintf( tmp, tmpsz, "%.3f GB", ((double)sz/((double)(1UL<<30))) );
948 0 : return tmp;
949 0 : }
950 :
951 : void
952 0 : fd_blockstore_log_mem_usage( fd_blockstore_t * blockstore ) {
953 0 : char tmp1[100];
954 :
955 0 : FD_LOG_NOTICE(( "blockstore base footprint: %s",
956 0 : fd_smart_size( sizeof(fd_blockstore_t), tmp1, sizeof(tmp1) ) ));
957 0 : ulong shred_max = fd_buf_shred_pool_ele_max( blockstore->shred_pool );
958 0 : FD_LOG_NOTICE(( "shred pool footprint: %s %lu entries)",
959 0 : fd_smart_size( fd_buf_shred_pool_footprint(), tmp1, sizeof(tmp1) ),
960 0 : shred_max ));
961 0 : ulong shred_map_cnt = fd_buf_shred_map_chain_cnt( blockstore->shred_map );
962 0 : FD_LOG_NOTICE(( "shred map footprint: %s (%lu chains, load is %.3f)",
963 0 : fd_smart_size( fd_buf_shred_map_footprint( shred_map_cnt ), tmp1, sizeof(tmp1) ),
964 0 : shred_map_cnt,
965 0 : (double)shred_map_cnt) );
966 :
967 : /*fd_block_info_t * slot_map = fd_blockstore_block_map( blockstore );
968 : ulong slot_map_cnt = fd_block_map_key_cnt( slot_map );
969 : ulong slot_map_max = fd_block_map_key_max( slot_map );
970 : FD_LOG_NOTICE(( "slot map footprint: %s (%lu entries used out of %lu, %lu%%)",
971 : fd_smart_size( fd_block_map_footprint( slot_map_max ), tmp1, sizeof(tmp1) ),
972 : slot_map_cnt,
973 : slot_map_max,
974 : (100U*slot_map_cnt)/slot_map_max )); */
975 :
976 0 : ulong block_cnt = 0;
977 :
978 0 : ulong * q = fd_blockstore_slot_deque( blockstore );
979 0 : fd_slot_deque_remove_all( q );
980 0 : fd_slot_deque_push_tail( q, blockstore->shmem->wmk );
981 0 : while( !fd_slot_deque_empty( q ) ) {
982 0 : ulong curr = fd_slot_deque_pop_head( q );
983 :
984 0 : fd_block_map_query_t query[1] = { 0 };
985 0 : int err = fd_block_map_query_try( blockstore->block_map, &curr, NULL, query, FD_MAP_FLAG_BLOCKING );
986 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
987 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY || !block_info ) ) continue;
988 :
989 0 : for( ulong i = 0; i < block_info->child_slot_cnt; i++ ) {
990 0 : fd_slot_deque_push_tail( q, block_info->child_slots[i] );
991 0 : }
992 0 : }
993 :
994 0 : if( block_cnt )
995 0 : FD_LOG_NOTICE(( "block cnt: %lu",
996 0 : block_cnt ));
997 0 : }
|