Line data Source code
1 : #include "fd_blockstore.h"
2 : #include <fcntl.h>
3 : #include <string.h>
4 : #include <stdio.h> /* snprintf */
5 : #include <unistd.h>
6 : #include <errno.h>
7 :
8 : void *
9 : fd_blockstore_new( void * shmem,
10 : ulong wksp_tag,
11 : ulong seed,
12 : ulong shred_max,
13 : ulong block_max,
14 : ulong idx_max,
15 0 : ulong txn_max ) {
16 : /* TODO temporary fix to make sure block_max is a power of 2, as
17 : required for slot map para. We should change to err in config
18 : verification eventually */
19 0 : block_max = fd_ulong_pow2_up( block_max );
20 0 : ulong lock_cnt = fd_ulong_min( block_max, BLOCK_INFO_LOCK_CNT );
21 :
22 0 : fd_blockstore_shmem_t * blockstore_shmem = (fd_blockstore_shmem_t *)shmem;
23 :
24 0 : if( FD_UNLIKELY( !blockstore_shmem ) ) {
25 0 : FD_LOG_WARNING(( "NULL blockstore_shmem" ));
26 0 : return NULL;
27 0 : }
28 :
29 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)blockstore_shmem, fd_blockstore_align() ) )) {
30 0 : FD_LOG_WARNING(( "misaligned blockstore_shmem" ));
31 0 : return NULL;
32 0 : }
33 :
34 0 : if( FD_UNLIKELY( !wksp_tag ) ) {
35 0 : FD_LOG_WARNING(( "bad wksp_tag" ));
36 0 : return NULL;
37 0 : }
38 :
39 0 : fd_wksp_t * wksp = fd_wksp_containing( blockstore_shmem );
40 0 : if( FD_UNLIKELY( !wksp ) ) {
41 0 : FD_LOG_WARNING(( "shmem must be part of a workspace" ));
42 0 : return NULL;
43 0 : }
44 :
45 0 : if( FD_UNLIKELY( !fd_ulong_is_pow2( shred_max ) ) ) {
46 0 : shred_max = fd_ulong_pow2_up( shred_max );
47 0 : FD_LOG_WARNING(( "blockstore implementation requires shred_max to be a power of two, rounding it up to %lu", shred_max ));
48 0 : }
49 :
50 0 : fd_memset( blockstore_shmem, 0, fd_blockstore_footprint( shred_max, block_max, idx_max, txn_max ) );
51 :
52 0 : int lg_idx_max = fd_ulong_find_msb( fd_ulong_pow2_up( idx_max ) );
53 :
54 0 : FD_SCRATCH_ALLOC_INIT( l, shmem );
55 0 : blockstore_shmem = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_blockstore_shmem_t), sizeof(fd_blockstore_shmem_t) );
56 0 : void * shreds = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_buf_shred_t), sizeof(fd_buf_shred_t) * shred_max );
57 0 : void * shred_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_buf_shred_pool_align(), fd_buf_shred_pool_footprint() );
58 0 : void * shred_map = FD_SCRATCH_ALLOC_APPEND( l, fd_buf_shred_map_align(), fd_buf_shred_map_footprint( shred_max ) );
59 0 : void * blocks = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_block_info_t), sizeof(fd_block_info_t) * block_max );
60 0 : void * block_map = FD_SCRATCH_ALLOC_APPEND( l, fd_block_map_align(), fd_block_map_footprint( block_max, lock_cnt, BLOCK_INFO_PROBE_CNT ) );
61 0 : void * block_idx = FD_SCRATCH_ALLOC_APPEND( l, fd_block_idx_align(), fd_block_idx_footprint( lg_idx_max ) );
62 0 : void * slot_deque = FD_SCRATCH_ALLOC_APPEND( l, fd_slot_deque_align(), fd_slot_deque_footprint( block_max ) );
63 0 : void * txn_map = FD_SCRATCH_ALLOC_APPEND( l, fd_txn_map_align(), fd_txn_map_footprint( txn_max ) );
64 0 : void * alloc = FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
65 0 : ulong top = FD_SCRATCH_ALLOC_FINI( l, fd_blockstore_align() );
66 0 : FD_TEST( fd_ulong_align_up( top - (ulong)shmem, fd_alloc_align() ) == fd_ulong_align_up( fd_blockstore_footprint( shred_max, block_max, idx_max, txn_max ), fd_alloc_align() ) );
67 :
68 0 : (void)shreds;
69 0 : fd_buf_shred_pool_new( shred_pool );
70 0 : fd_buf_shred_map_new ( shred_map, shred_max, seed );
71 0 : memset( blocks, 0, sizeof(fd_block_info_t) * block_max );
72 0 : FD_TEST( fd_block_map_new ( block_map, block_max, lock_cnt, BLOCK_INFO_PROBE_CNT, seed ) );
73 :
74 : /* Caller is in charge of freeing map_slot_para element store set.
75 : We need to explicitly do this since blocks is memset to 0, which
76 : is not a "freed" state in map_slot_para (slot 0 is a valid key). */
77 0 : fd_block_info_t * blocks_ = (fd_block_info_t *)blocks;
78 0 : for( ulong i=0UL; i<block_max; i++ ) {
79 0 : fd_block_map_private_ele_free( NULL, /* Not needed, avoids a join on block_map */
80 0 : &blocks_[i] );
81 0 : }
82 :
83 0 : blockstore_shmem->block_idx_gaddr = fd_wksp_gaddr( wksp, fd_block_idx_join( fd_block_idx_new( block_idx, lg_idx_max ) ) );
84 0 : blockstore_shmem->slot_deque_gaddr = fd_wksp_gaddr( wksp, fd_slot_deque_join (fd_slot_deque_new( slot_deque, block_max ) ) );
85 0 : blockstore_shmem->txn_map_gaddr = fd_wksp_gaddr( wksp, fd_txn_map_join (fd_txn_map_new( txn_map, txn_max, seed ) ) );
86 0 : blockstore_shmem->alloc_gaddr = fd_wksp_gaddr( wksp, fd_alloc_join (fd_alloc_new( alloc, wksp_tag ), wksp_tag ) );
87 :
88 0 : FD_TEST( blockstore_shmem->block_idx_gaddr );
89 0 : FD_TEST( blockstore_shmem->slot_deque_gaddr );
90 0 : FD_TEST( blockstore_shmem->txn_map_gaddr );
91 0 : FD_TEST( blockstore_shmem->alloc_gaddr );
92 :
93 0 : blockstore_shmem->blockstore_gaddr = fd_wksp_gaddr_fast( wksp, blockstore_shmem );
94 0 : blockstore_shmem->wksp_tag = wksp_tag;
95 0 : blockstore_shmem->seed = seed;
96 :
97 0 : blockstore_shmem->archiver = (fd_blockstore_archiver_t){
98 0 : .fd_size_max = FD_BLOCKSTORE_ARCHIVE_MIN_SIZE,
99 0 : .head = FD_BLOCKSTORE_ARCHIVE_START,
100 0 : .tail = FD_BLOCKSTORE_ARCHIVE_START,
101 0 : .num_blocks = 0,
102 0 : };
103 :
104 0 : blockstore_shmem->lps = FD_SLOT_NULL;
105 0 : blockstore_shmem->hcs = FD_SLOT_NULL;
106 0 : blockstore_shmem->wmk = FD_SLOT_NULL;
107 :
108 0 : blockstore_shmem->shred_max = shred_max;
109 0 : blockstore_shmem->block_max = block_max;
110 0 : blockstore_shmem->idx_max = idx_max;
111 0 : blockstore_shmem->txn_max = txn_max;
112 :
113 0 : FD_COMPILER_MFENCE();
114 0 : FD_VOLATILE( blockstore_shmem->magic ) = FD_BLOCKSTORE_MAGIC;
115 0 : FD_COMPILER_MFENCE();
116 :
117 0 : return (void *)blockstore_shmem;
118 0 : }
119 :
120 : fd_blockstore_t *
121 0 : fd_blockstore_join( void * ljoin, void * shblockstore ) {
122 0 : fd_blockstore_t * join = (fd_blockstore_t *)ljoin;
123 0 : fd_blockstore_shmem_t * blockstore = (fd_blockstore_shmem_t *)shblockstore;
124 :
125 0 : if( FD_UNLIKELY( !join ) ) {
126 0 : FD_LOG_WARNING(( "NULL ljoin" ));
127 0 : return NULL;
128 0 : }
129 :
130 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)join, alignof(fd_blockstore_t) ) ) ) {
131 0 : FD_LOG_WARNING(( "misaligned ljoin" ));
132 0 : return NULL;
133 0 : }
134 :
135 0 : if( FD_UNLIKELY( !blockstore ) ) {
136 0 : FD_LOG_WARNING(( "NULL shblockstore" ));
137 0 : return NULL;
138 0 : }
139 :
140 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)blockstore, fd_blockstore_align() ) )) {
141 0 : FD_LOG_WARNING(( "misaligned shblockstore" ));
142 0 : return NULL;
143 0 : }
144 :
145 0 : if( FD_UNLIKELY( blockstore->magic != FD_BLOCKSTORE_MAGIC ) ) {
146 0 : FD_LOG_WARNING(( "bad magic" ));
147 0 : return NULL;
148 0 : }
149 :
150 0 : FD_SCRATCH_ALLOC_INIT( l, shblockstore );
151 0 : blockstore = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_blockstore_shmem_t), sizeof(fd_blockstore_shmem_t) );
152 0 : void * shreds = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_buf_shred_t), sizeof(fd_buf_shred_t) * blockstore->shred_max );
153 0 : void * shred_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_buf_shred_pool_align(), fd_buf_shred_pool_footprint() );
154 0 : void * shred_map = FD_SCRATCH_ALLOC_APPEND( l, fd_buf_shred_map_align(), fd_buf_shred_map_footprint( blockstore->shred_max ) );
155 0 : void * blocks = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_block_info_t), sizeof(fd_block_info_t) * blockstore->block_max );
156 0 : void * block_map = FD_SCRATCH_ALLOC_APPEND( l, fd_block_map_align(), fd_block_map_footprint( blockstore->block_max,
157 0 : fd_ulong_min(blockstore->block_max, BLOCK_INFO_LOCK_CNT),
158 0 : BLOCK_INFO_PROBE_CNT ) );
159 0 : FD_SCRATCH_ALLOC_FINI( l, fd_blockstore_align() );
160 :
161 0 : join->shmem = blockstore;
162 0 : fd_buf_shred_pool_join( join->shred_pool, shred_pool, shreds, blockstore->shred_max );
163 0 : fd_buf_shred_map_join ( join->shred_map, shred_map, shreds, blockstore->shred_max );
164 0 : fd_block_map_join ( join->block_map, block_map, blocks );
165 :
166 0 : FD_TEST( fd_buf_shred_pool_verify( join->shred_pool ) == FD_POOL_SUCCESS );
167 0 : FD_TEST( fd_buf_shred_map_verify ( join->shred_map ) == FD_MAP_SUCCESS );
168 0 : FD_TEST( fd_block_map_verify ( join->block_map ) == FD_MAP_SUCCESS );
169 :
170 0 : return join;
171 0 : }
172 :
173 : void *
174 0 : fd_blockstore_leave( fd_blockstore_t * blockstore ) {
175 :
176 0 : if( FD_UNLIKELY( !blockstore ) ) {
177 0 : FD_LOG_WARNING(( "NULL blockstore" ));
178 0 : return NULL;
179 0 : }
180 :
181 0 : fd_wksp_t * wksp = fd_wksp_containing( blockstore );
182 0 : if( FD_UNLIKELY( !wksp ) ) {
183 0 : FD_LOG_WARNING(( "shmem must be part of a workspace" ));
184 0 : return NULL;
185 0 : }
186 :
187 0 : FD_TEST( fd_buf_shred_pool_leave( blockstore->shred_pool ) );
188 0 : FD_TEST( fd_buf_shred_map_leave( blockstore->shred_map ) );
189 0 : FD_TEST( fd_block_map_leave( blockstore->block_map ) );
190 0 : FD_TEST( fd_block_idx_leave( fd_blockstore_block_idx( blockstore ) ) );
191 0 : FD_TEST( fd_slot_deque_leave( fd_blockstore_slot_deque( blockstore ) ) );
192 0 : FD_TEST( fd_txn_map_leave( fd_blockstore_txn_map( blockstore ) ) );
193 0 : FD_TEST( fd_alloc_leave( fd_blockstore_alloc( blockstore ) ) );
194 :
195 0 : return (void *)blockstore;
196 0 : }
197 :
198 : void *
199 0 : fd_blockstore_delete( void * shblockstore ) {
200 0 : fd_blockstore_t * blockstore = (fd_blockstore_t *)shblockstore;
201 :
202 0 : if( FD_UNLIKELY( !blockstore ) ) {
203 0 : FD_LOG_WARNING(( "NULL shblockstore" ));
204 0 : return NULL;
205 0 : }
206 :
207 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)blockstore, fd_blockstore_align() ) )) {
208 0 : FD_LOG_WARNING(( "misaligned shblockstore" ));
209 0 : return NULL;
210 0 : }
211 :
212 0 : if( FD_UNLIKELY( blockstore->shmem->magic != FD_BLOCKSTORE_MAGIC ) ) {
213 0 : FD_LOG_WARNING(( "bad magic" ));
214 0 : return NULL;
215 0 : }
216 :
217 0 : fd_wksp_t * wksp = fd_wksp_containing( blockstore );
218 0 : if( FD_UNLIKELY( !wksp ) ) {
219 0 : FD_LOG_WARNING(( "shmem must be part of a workspace" ));
220 0 : return NULL;
221 0 : }
222 :
223 : /* Delete all structures. */
224 :
225 0 : FD_TEST( fd_buf_shred_pool_delete( &blockstore->shred_pool ) );
226 0 : FD_TEST( fd_buf_shred_map_delete( &blockstore->shred_map ) );
227 0 : FD_TEST( fd_block_map_delete( &blockstore->block_map ) );
228 0 : FD_TEST( fd_block_idx_delete( fd_blockstore_block_idx( blockstore ) ) );
229 0 : FD_TEST( fd_slot_deque_delete( fd_blockstore_slot_deque( blockstore ) ) );
230 0 : FD_TEST( fd_txn_map_delete( fd_blockstore_txn_map( blockstore ) ) );
231 0 : FD_TEST( fd_alloc_delete( fd_blockstore_alloc( blockstore ) ) );
232 :
233 0 : FD_COMPILER_MFENCE();
234 0 : FD_VOLATILE( blockstore->shmem->magic ) = 0UL;
235 0 : FD_COMPILER_MFENCE();
236 :
237 0 : return blockstore;
238 0 : }
239 :
240 : #define check_read_err_safe( cond, msg ) \
241 : do { \
242 : if( FD_UNLIKELY( cond ) ) { \
243 : FD_LOG_WARNING(( "[%s] %s", __func__, msg )); \
244 : return FD_BLOCKSTORE_ERR_SLOT_MISSING; \
245 : } \
246 : } while(0);
247 :
248 : fd_blockstore_t *
249 : fd_blockstore_init( fd_blockstore_t * blockstore,
250 : int fd,
251 : ulong fd_size_max,
252 0 : ulong slot ) {
253 :
254 0 : if( fd_size_max < FD_BLOCKSTORE_ARCHIVE_MIN_SIZE ) {
255 0 : FD_LOG_ERR(( "archive file size too small" ));
256 0 : return NULL;
257 0 : }
258 0 : blockstore->shmem->archiver.fd_size_max = fd_size_max;
259 :
260 : //build_idx( blockstore, fd );
261 0 : lseek( fd, 0, SEEK_END );
262 :
263 : /* initialize fields using slot bank */
264 :
265 0 : ulong smr = slot;
266 :
267 0 : blockstore->shmem->lps = smr;
268 0 : blockstore->shmem->hcs = smr;
269 0 : blockstore->shmem->wmk = smr;
270 :
271 0 : fd_block_map_query_t query[1];
272 :
273 0 : int err = fd_block_map_prepare( blockstore->block_map, &smr, NULL, query, FD_MAP_FLAG_BLOCKING );
274 0 : fd_block_info_t * ele = fd_block_map_query_ele( query );
275 0 : if ( FD_UNLIKELY( err ) ) FD_LOG_ERR(( "failed to prepare block map for slot %lu", smr ));
276 :
277 0 : ele->slot = smr;
278 0 : memset( ele->child_slots, UCHAR_MAX, FD_BLOCKSTORE_CHILD_SLOT_MAX * sizeof( ulong ) );
279 0 : ele->child_slot_cnt = 0;
280 0 : ele->flags = fd_uchar_set_bit(
281 0 : fd_uchar_set_bit(
282 0 : fd_uchar_set_bit(
283 0 : fd_uchar_set_bit(
284 0 : fd_uchar_set_bit( ele->flags,
285 0 : FD_BLOCK_FLAG_COMPLETED ),
286 0 : FD_BLOCK_FLAG_PROCESSED ),
287 0 : FD_BLOCK_FLAG_EQVOCSAFE ),
288 0 : FD_BLOCK_FLAG_CONFIRMED ),
289 0 : FD_BLOCK_FLAG_FINALIZED );
290 : // ele->ref_tick = 0;
291 0 : ele->ts = 0;
292 0 : ele->consumed_idx = 0;
293 0 : ele->received_idx = 0;
294 0 : ele->buffered_idx = 0;
295 0 : ele->data_complete_idx = 0;
296 0 : ele->slot_complete_idx = 0;
297 0 : ele->ticks_consumed = 0;
298 0 : ele->tick_hash_count_accum = 0;
299 0 : fd_block_set_null( ele->data_complete_idxs );
300 :
301 : /* Set all fields to 0. Caller's responsibility to check gaddr and sz != 0. */
302 :
303 0 : fd_block_map_publish( query );
304 :
305 0 : return blockstore;
306 0 : }
307 :
308 : void
309 0 : fd_blockstore_fini( fd_blockstore_t * blockstore ) {
310 : /* Free all allocations by removing all slots (whether they are
311 : complete or not). */
312 0 : fd_block_info_t * ele0 = (fd_block_info_t *)fd_block_map_shele( blockstore->block_map );
313 0 : ulong block_max = fd_block_map_ele_max( blockstore->block_map );
314 0 : for( ulong ele_idx=0; ele_idx<block_max; ele_idx++ ) {
315 0 : fd_block_info_t * ele = ele0 + ele_idx;
316 0 : if( ele->slot == 0 ) continue; /* unused */
317 0 : fd_blockstore_slot_remove( blockstore, ele->slot );
318 0 : }
319 0 : }
320 :
321 : /* txn map helpers */
322 :
323 : FD_FN_PURE int
324 0 : fd_txn_key_equal( fd_txn_key_t const * k0, fd_txn_key_t const * k1 ) {
325 0 : for( ulong i = 0; i < FD_ED25519_SIG_SZ / sizeof( ulong ); ++i )
326 0 : if( k0->v[i] != k1->v[i] ) return 0;
327 0 : return 1;
328 0 : }
329 :
330 : FD_FN_PURE ulong
331 0 : fd_txn_key_hash( fd_txn_key_t const * k, ulong seed ) {
332 0 : ulong h = seed;
333 0 : for( ulong i = 0; i < FD_ED25519_SIG_SZ / sizeof( ulong ); ++i )
334 0 : h ^= k->v[i];
335 0 : return h;
336 0 : }
337 :
338 : /* Remove a slot from blockstore. Needs to currently be under a blockstore_write
339 : lock due to txn_map access. */
340 : void
341 0 : fd_blockstore_slot_remove( fd_blockstore_t * blockstore, ulong slot ) {
342 0 : FD_LOG_DEBUG(( "[%s] slot: %lu", __func__, slot ));
343 :
344 : /* It is not safe to remove a replaying block. */
345 0 : fd_block_map_query_t query[1] = { 0 };
346 0 : ulong parent_slot = FD_SLOT_NULL;
347 0 : ulong received_idx = 0;
348 0 : int err = FD_MAP_ERR_AGAIN;
349 0 : while( err == FD_MAP_ERR_AGAIN ) {
350 0 : err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, query, 0 );
351 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
352 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) return; /* slot not found */
353 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
354 0 : if( FD_UNLIKELY( fd_uchar_extract_bit( block_info->flags, FD_BLOCK_FLAG_REPLAYING ) ) ) {
355 0 : FD_LOG_WARNING(( "[%s] slot %lu has replay in progress. not removing.", __func__, slot ));
356 0 : return;
357 0 : }
358 0 : parent_slot = block_info->parent_slot;
359 0 : received_idx = block_info->received_idx;
360 0 : err = fd_block_map_query_test( query );
361 0 : }
362 :
363 0 : err = fd_block_map_remove( blockstore->block_map, &slot, query, FD_MAP_FLAG_BLOCKING );
364 : /* not possible to fail */
365 0 : FD_TEST( !fd_blockstore_block_info_test( blockstore, slot ) );
366 :
367 : /* Unlink slot from its parent only if it is not published. */
368 0 : err = fd_block_map_prepare( blockstore->block_map, &parent_slot, NULL, query, FD_MAP_FLAG_BLOCKING );
369 0 : fd_block_info_t * parent_block_info = fd_block_map_query_ele( query );
370 0 : if( FD_LIKELY( parent_block_info ) ) {
371 0 : for( ulong i = 0; i < parent_block_info->child_slot_cnt; i++ ) {
372 0 : if( FD_LIKELY( parent_block_info->child_slots[i] == slot ) ) {
373 0 : parent_block_info->child_slots[i] =
374 0 : parent_block_info->child_slots[--parent_block_info->child_slot_cnt];
375 0 : }
376 0 : }
377 0 : }
378 0 : fd_block_map_publish( query );
379 :
380 : /* Remove buf_shreds. */
381 0 : for( uint idx = 0; idx < received_idx; idx++ ) {
382 0 : fd_blockstore_shred_remove( blockstore, slot, idx );
383 0 : }
384 :
385 0 : return;
386 0 : }
387 :
388 : void
389 : fd_blockstore_publish( fd_blockstore_t * blockstore,
390 : int fd FD_PARAM_UNUSED,
391 0 : ulong wmk ) {
392 0 : FD_LOG_NOTICE(( "[%s] wmk %lu => smr %lu", __func__, blockstore->shmem->wmk, wmk ));
393 :
394 : /* Caller is incorrectly calling publish. */
395 :
396 0 : if( FD_UNLIKELY( blockstore->shmem->wmk == wmk ) ) {
397 0 : FD_LOG_WARNING(( "[%s] attempting to re-publish when wmk %lu already at smr %lu", __func__, blockstore->shmem->wmk, wmk ));
398 0 : return;
399 0 : }
400 :
401 : /* q uses the slot_deque as the BFS queue */
402 :
403 0 : ulong * q = fd_blockstore_slot_deque( blockstore );
404 :
405 : /* Clear the deque, preparing it to be reused. */
406 :
407 0 : fd_slot_deque_remove_all( q );
408 :
409 : /* Push the watermark onto the queue. */
410 :
411 0 : fd_slot_deque_push_tail( q, blockstore->shmem->wmk );
412 :
413 : /* Conduct a BFS to find slots to prune or archive. */
414 :
415 0 : while( !fd_slot_deque_empty( q ) ) {
416 0 : ulong slot = fd_slot_deque_pop_head( q );
417 0 : fd_block_map_query_t query[1];
418 : /* Blocking read -- we need the block_info ptr to be valid for the
419 : whole time that we are writing stuff to the archiver file. */
420 0 : int err = fd_block_map_prepare( blockstore->block_map, &slot, NULL, query, FD_MAP_FLAG_BLOCKING );
421 0 : if( FD_UNLIKELY( err ) ) {
422 0 : FD_LOG_WARNING(( "[%s] failed to prepare block map for blockstore publishing %lu", __func__, slot ));
423 0 : continue;
424 0 : }
425 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
426 :
427 : /* Add slot's children to the queue. */
428 :
429 0 : for( ulong i = 0; i < block_info->child_slot_cnt; i++ ) {
430 :
431 : /* Stop upon reaching the SMR. */
432 :
433 0 : if( FD_LIKELY( block_info->child_slots[i] != wmk ) ) {
434 0 : fd_slot_deque_push_tail( q, block_info->child_slots[i] );
435 0 : }
436 0 : }
437 :
438 : /* Archive the block into a file if it is finalized. */
439 :
440 : /* if( fd_uchar_extract_bit( block_info->flags, FD_BLOCK_FLAG_FINALIZED ) ) {
441 : fd_block_t * block = fd_wksp_laddr_fast( fd_blockstore_wksp( blockstore ), block_info->block_gaddr );
442 : uchar * data = fd_wksp_laddr_fast( fd_blockstore_wksp( blockstore ), block->data_gaddr );
443 :
444 : fd_block_idx_t * block_idx = fd_blockstore_block_idx( blockstore );
445 :
446 : if( FD_UNLIKELY( fd_block_idx_query( block_idx, slot, NULL ) ) ) {
447 : FD_LOG_ERR(( "[%s] invariant violation. attempted to re-archive finalized block: %lu", __func__, slot ));
448 : } else {
449 : fd_blockstore_ser_t ser = {
450 : .block_map = block_info,
451 : .block = block,
452 : .data = data
453 : };
454 : fd_blockstore_block_checkpt( blockstore, &ser, fd, slot );
455 : }
456 : } */
457 0 : fd_block_map_cancel( query ); // TODO: maybe we should not make prepare so large and instead call prepare again in helpers
458 0 : fd_blockstore_slot_remove( blockstore, slot );
459 0 : }
460 :
461 : /* Scan to clean up any orphaned blocks or shreds < new SMR. */
462 :
463 0 : for (ulong slot = blockstore->shmem->wmk; slot < wmk; slot++) {
464 0 : fd_blockstore_slot_remove( blockstore, slot );
465 0 : }
466 :
467 0 : blockstore->shmem->wmk = wmk;
468 :
469 0 : return;
470 0 : }
471 :
472 : void
473 0 : fd_blockstore_shred_remove( fd_blockstore_t * blockstore, ulong slot, uint idx ) {
474 0 : fd_shred_key_t key = { slot, idx };
475 :
476 0 : fd_buf_shred_map_query_t query[1] = { 0 };
477 0 : int err = fd_buf_shred_map_remove( blockstore->shred_map, &key, NULL, query, FD_MAP_FLAG_BLOCKING );
478 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_CORRUPT ) ) FD_LOG_ERR(( "[%s] map corrupt: shred %lu %u", __func__, slot, idx ));
479 :
480 0 : if( FD_LIKELY( err == FD_MAP_SUCCESS ) ) {
481 0 : fd_buf_shred_t * shred = fd_buf_shred_map_query_ele( query );
482 0 : int err = fd_buf_shred_pool_release( blockstore->shred_pool, shred, 1 );
483 0 : if( FD_UNLIKELY( err == FD_POOL_ERR_INVAL ) ) FD_LOG_ERR(( "[%s] pool error: shred %lu %u not in pool", __func__, slot, idx ));
484 0 : if( FD_UNLIKELY( err == FD_POOL_ERR_CORRUPT ) ) FD_LOG_ERR(( "[%s] pool corrupt: shred %lu %u", __func__, slot, idx ));
485 0 : FD_TEST( !err );
486 0 : }
487 : // FD_TEST( fd_buf_shred_pool_verify( blockstore->shred_pool ) == FD_POOL_SUCCESS );
488 : // FD_TEST( fd_buf_shred_map_verify ( blockstore->shred_map ) == FD_MAP_SUCCESS );
489 0 : }
490 :
491 : void
492 0 : fd_blockstore_shred_insert( fd_blockstore_t * blockstore, fd_shred_t const * shred ) {
493 : // FD_LOG_NOTICE(( "[%s] slot %lu idx %u", __func__, shred->slot, shred->idx ));
494 :
495 0 : ulong slot = shred->slot;
496 :
497 0 : if( FD_UNLIKELY( !fd_shred_is_data( shred->variant ) ) ) FD_LOG_ERR(( "Expected data shred" ));
498 :
499 0 : if( FD_UNLIKELY( slot < blockstore->shmem->wmk ) ) {
500 0 : FD_LOG_DEBUG(( "[%s] slot %lu < wmk %lu. not inserting shred", __func__, slot, blockstore->shmem->wmk ));
501 0 : return;
502 0 : }
503 :
504 0 : fd_shred_key_t key = { slot, .idx = shred->idx };
505 :
506 : /* Test if the blockstore already contains this shred key. */
507 :
508 0 : if( FD_UNLIKELY( fd_blockstore_shred_test( blockstore, slot, shred->idx ) ) ) {
509 :
510 : /* If we receive a shred with the same key (slot and shred idx) but
511 : different payload as one we already have, we'll only keep the
512 : first. Once we receive the full block, we'll use merkle chaining
513 : from the last FEC set to determine whether we have the correct
514 : shred at every index.
515 :
516 : Later, if the block fails to replay (dead block) or the block
517 : hash doesn't match the one we observe from votes, we'll dump the
518 : entire block and use repair to recover the one a majority (52%)
519 : of the cluster has voted on. */
520 :
521 0 : for(;;) {
522 0 : fd_buf_shred_map_query_t query[1] = { 0 };
523 0 : int err = fd_buf_shred_map_query_try( blockstore->shred_map, &key, NULL, query, 0 );
524 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_CORRUPT ) ) FD_LOG_ERR(( "[%s] %s. shred: (%lu, %u)", __func__, fd_buf_shred_map_strerror( err ), slot, shred->idx ));
525 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
526 0 : fd_buf_shred_t * buf_shred = fd_buf_shred_map_query_ele( query );
527 : /* An existing shred has the same key. Eqvoc iff the payload is different */
528 0 : buf_shred->eqvoc = fd_shred_payload_sz( &buf_shred->hdr ) != fd_shred_payload_sz( shred ) ||
529 0 : 0!=memcmp( fd_shred_data_payload( &buf_shred->hdr ), fd_shred_data_payload( shred ), fd_shred_payload_sz( shred ) );
530 0 : err = fd_buf_shred_map_query_test( query );
531 0 : if( FD_LIKELY( err == FD_MAP_SUCCESS) ) break;
532 0 : }
533 0 : return;
534 0 : }
535 :
536 : /* Insert the new shred. */
537 :
538 0 : int err;
539 0 : fd_buf_shred_t * ele = fd_buf_shred_pool_acquire( blockstore->shred_pool, NULL, 1, &err );
540 0 : if( FD_UNLIKELY( err == FD_POOL_ERR_EMPTY ) ) FD_LOG_ERR(( "[%s] %s. increase blockstore shred_max.", __func__, fd_buf_shred_pool_strerror( err ) ));
541 0 : if( FD_UNLIKELY( err == FD_POOL_ERR_CORRUPT ) ) FD_LOG_ERR(( "[%s] %s.", __func__, fd_buf_shred_pool_strerror( err ) ));
542 :
543 0 : ele->key = key;
544 0 : ele->hdr = *shred;
545 0 : fd_memcpy( &ele->buf, shred, fd_shred_sz( shred ) );
546 0 : err = fd_buf_shred_map_insert( blockstore->shred_map, ele, FD_MAP_FLAG_BLOCKING );
547 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_INVAL ) ) FD_LOG_ERR(( "[%s] map error. ele not in pool.", __func__ ));
548 :
549 : /* Update shred's associated slot meta */
550 :
551 0 : if( FD_UNLIKELY( !fd_blockstore_block_info_test( blockstore, slot ) ) ) {
552 0 : fd_block_map_query_t query[1] = { 0 };
553 : /* Prepare will succeed regardless of if the key is in the map or not. It either returns
554 : the element at that idx, or it will return a spot to insert new stuff. So we need to check
555 : if that space is actually unused, to signify that we are adding a new entry. */
556 :
557 : /* Try to insert slot into block_map TODO make non blocking? */
558 :
559 0 : err = fd_block_map_prepare( blockstore->block_map, &slot, NULL, query, FD_MAP_FLAG_BLOCKING );
560 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
561 :
562 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_FULL ) ){
563 0 : FD_LOG_ERR(( "[%s] OOM: failed to insert new block map entry. blockstore needs to save metadata for all slots >= SMR, so increase memory or check for issues with publishing new SMRs.", __func__ ));
564 0 : }
565 :
566 : /* Initialize the block_info. Note some fields are initialized
567 : to dummy values because we do not have all the necessary metadata
568 : yet. */
569 :
570 0 : block_info->slot = slot;
571 :
572 0 : block_info->parent_slot = slot - shred->data.parent_off;
573 0 : memset( block_info->child_slots, UCHAR_MAX, FD_BLOCKSTORE_CHILD_SLOT_MAX * sizeof(ulong) );
574 0 : block_info->child_slot_cnt = 0;
575 :
576 0 : block_info->block_height = 0;
577 0 : block_info->block_hash = ( fd_hash_t ){ 0 };
578 0 : block_info->bank_hash = ( fd_hash_t ){ 0 };
579 0 : block_info->flags = fd_uchar_set_bit( 0, FD_BLOCK_FLAG_RECEIVING );
580 0 : block_info->ts = 0;
581 : // block_info->ref_tick = (uchar)( (int)shred->data.flags &
582 : // (int)FD_SHRED_DATA_REF_TICK_MASK );
583 0 : block_info->buffered_idx = UINT_MAX;
584 0 : block_info->received_idx = 0;
585 0 : block_info->consumed_idx = UINT_MAX;
586 :
587 0 : block_info->data_complete_idx = UINT_MAX;
588 0 : block_info->slot_complete_idx = UINT_MAX;
589 :
590 0 : block_info->ticks_consumed = 0;
591 0 : block_info->tick_hash_count_accum = 0;
592 :
593 0 : fd_block_set_null( block_info->data_complete_idxs );
594 :
595 0 : block_info->block_gaddr = 0;
596 :
597 0 : fd_block_map_publish( query );
598 :
599 0 : FD_TEST( fd_blockstore_block_info_test( blockstore, slot ) );
600 0 : }
601 0 : fd_block_map_query_t query[1] = { 0 };
602 0 : err = fd_block_map_prepare( blockstore->block_map, &slot, NULL, query, FD_MAP_FLAG_BLOCKING );
603 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query ); /* should be impossible for this to fail */
604 :
605 : /* Advance the buffered_idx watermark. */
606 :
607 0 : uint prev_buffered_idx = block_info->buffered_idx;
608 0 : while( FD_LIKELY( fd_blockstore_shred_test( blockstore, slot, block_info->buffered_idx + 1 ) ) ) {
609 0 : block_info->buffered_idx++;
610 0 : }
611 :
612 : /* Mark the ending shred idxs of entry batches. */
613 :
614 0 : fd_block_set_insert_if( block_info->data_complete_idxs, shred->data.flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE, shred->idx );
615 :
616 : /* Advance the data_complete_idx watermark using the shreds in between
617 : the previous consumed_idx and current consumed_idx. */
618 :
619 0 : for (uint idx = prev_buffered_idx + 1; block_info->buffered_idx != FD_SHRED_IDX_NULL && idx <= block_info->buffered_idx; idx++) {
620 0 : if( FD_UNLIKELY( fd_block_set_test( block_info->data_complete_idxs, idx ) ) ) {
621 0 : block_info->data_complete_idx = idx;
622 0 : }
623 0 : }
624 :
625 : /* Update received_idx and slot_complete_idx. */
626 :
627 0 : block_info->received_idx = fd_uint_max( block_info->received_idx, shred->idx + 1 );
628 0 : if( FD_UNLIKELY( shred->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE ) ) {
629 : // FD_LOG_NOTICE(( "slot %lu %u complete", slot, shred->idx ));
630 0 : block_info->slot_complete_idx = shred->idx;
631 0 : }
632 :
633 0 : ulong parent_slot = block_info->parent_slot;
634 :
635 0 : FD_LOG_DEBUG(( "shred: (%lu, %u). consumed: %u, received: %u, complete: %u",
636 0 : slot,
637 0 : shred->idx,
638 0 : block_info->buffered_idx,
639 0 : block_info->received_idx,
640 0 : block_info->slot_complete_idx ));
641 0 : fd_block_map_publish( query );
642 :
643 : /* Update ancestry metadata: parent_slot, is_connected, next_slot.
644 :
645 : If the parent_slot happens to be very old, there's a chance that
646 : it's hash probe could collide with an existing slot in the block
647 : map, and cause what looks like an OOM. Instead of using map_prepare
648 : and hitting this collision, we can either check that the
649 : parent_slot lives in the map with a block_info_test, or use the
650 : shmem wmk value as a more general guard against querying for
651 : parents that are too old. */
652 :
653 0 : if( FD_LIKELY( parent_slot < blockstore->shmem->wmk ) ) return;
654 :
655 0 : err = fd_block_map_prepare( blockstore->block_map, &parent_slot, NULL, query, FD_MAP_FLAG_BLOCKING );
656 0 : fd_block_info_t * parent_block_info = fd_block_map_query_ele( query );
657 :
658 : /* Add this slot to its parent's child slots if not already there. */
659 :
660 0 : if( FD_LIKELY( parent_block_info && parent_block_info->slot == parent_slot ) ) {
661 0 : int found = 0;
662 0 : for( ulong i = 0; i < parent_block_info->child_slot_cnt; i++ ) {
663 0 : if( FD_LIKELY( parent_block_info->child_slots[i] == slot ) ) {
664 0 : found = 1;
665 0 : break;
666 0 : }
667 0 : }
668 0 : if( FD_UNLIKELY( !found ) ) { /* add to parent's child slots if not already there */
669 0 : if( FD_UNLIKELY( parent_block_info->child_slot_cnt == FD_BLOCKSTORE_CHILD_SLOT_MAX ) ) {
670 0 : FD_LOG_ERR(( "failed to add slot %lu to parent %lu's children. exceeding child slot max",
671 0 : slot,
672 0 : parent_block_info->slot ));
673 0 : }
674 0 : parent_block_info->child_slots[parent_block_info->child_slot_cnt++] = slot;
675 0 : }
676 0 : }
677 0 : if( FD_LIKELY( err == FD_MAP_SUCCESS ) ) {
678 0 : fd_block_map_publish( query );
679 0 : } else {
680 : /* err is FD_MAP_ERR_FULL. Not in a valid prepare. Can happen if we
681 : are about to OOM, or if the parents are so far away that it just
682 : happens to chain longer than the probe_max. Somewhat covered by
683 : the early return, but there are some edge cases where we reach
684 : here, and it shouldn't be a LOG_ERR */
685 0 : FD_LOG_WARNING(( "block info not found for parent slot %lu. Have we seen it before?", parent_slot ));
686 0 : }
687 :
688 : //FD_TEST( fd_block_map_verify( blockstore->block_map ) == FD_MAP_SUCCESS );
689 0 : }
690 :
691 : int
692 0 : fd_blockstore_shred_test( fd_blockstore_t * blockstore, ulong slot, uint idx ) {
693 0 : fd_shred_key_t key = { slot, idx };
694 0 : fd_buf_shred_map_query_t query[1] = { 0 };
695 :
696 0 : for(;;) {
697 0 : int err = fd_buf_shred_map_query_try( blockstore->shred_map, &key, NULL, query, 0 );
698 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_CORRUPT ) ) FD_LOG_ERR(( "[%s] slot: %lu idx: %u. %s", __func__, slot, idx, fd_buf_shred_map_strerror( err ) ));
699 0 : if( FD_LIKELY( !fd_buf_shred_map_query_test( query ) ) ) return err != FD_MAP_ERR_KEY;
700 0 : }
701 0 : }
702 :
703 : int
704 0 : fd_blockstore_block_info_test( fd_blockstore_t * blockstore, ulong slot ) {
705 0 : int err = FD_MAP_ERR_AGAIN;
706 0 : while( err == FD_MAP_ERR_AGAIN ){
707 0 : fd_block_map_query_t query[1] = { 0 };
708 0 : err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, query, 0 );
709 0 : if( err == FD_MAP_ERR_AGAIN ) continue;
710 0 : if( err == FD_MAP_ERR_KEY ) return 0;
711 0 : err = fd_block_map_query_test( query );
712 0 : }
713 0 : return 1;
714 0 : }
715 :
716 : fd_block_info_t *
717 0 : fd_blockstore_block_map_query( fd_blockstore_t * blockstore, ulong slot ){
718 0 : fd_block_map_query_t quer[1] = { 0 };
719 0 : int err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, quer, FD_MAP_FLAG_BLOCKING );
720 0 : fd_block_info_t * meta = fd_block_map_query_ele( quer );
721 0 : if( err ) return NULL;
722 0 : return meta;
723 0 : }
724 :
725 : int
726 0 : fd_blockstore_block_info_remove( fd_blockstore_t * blockstore, ulong slot ){
727 0 : int err = FD_MAP_ERR_AGAIN;
728 0 : while( err == FD_MAP_ERR_AGAIN ){
729 0 : err = fd_block_map_remove( blockstore->block_map, &slot, NULL, 0 );
730 0 : if( err == FD_MAP_ERR_KEY ) return FD_BLOCKSTORE_ERR_SLOT_MISSING;
731 0 : }
732 0 : return FD_BLOCKSTORE_SUCCESS;
733 0 : }
734 :
735 : long
736 0 : fd_buf_shred_query_copy_data( fd_blockstore_t * blockstore, ulong slot, uint idx, void * buf, ulong buf_sz ) {
737 0 : if( buf_sz < FD_SHRED_MAX_SZ ) return -1;
738 0 : fd_shred_key_t key = { slot, idx };
739 0 : ulong sz = 0;
740 0 : int err = FD_MAP_ERR_AGAIN;
741 0 : while( err == FD_MAP_ERR_AGAIN ) {
742 0 : fd_buf_shred_map_query_t query[1] = { 0 };
743 0 : err = fd_buf_shred_map_query_try( blockstore->shred_map, &key, NULL, query, 0 );
744 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) return -1;
745 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_CORRUPT ) ) FD_LOG_ERR(( "[%s] map corrupt. shred %lu %u", __func__, slot, idx ));
746 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
747 0 : fd_buf_shred_t const * shred = fd_buf_shred_map_query_ele_const( query );
748 0 : sz = fd_shred_sz( &shred->hdr );
749 0 : memcpy( buf, shred->buf, sz );
750 0 : err = fd_buf_shred_map_query_test( query );
751 0 : }
752 0 : FD_TEST( !err );
753 0 : return (long)sz;
754 0 : }
755 :
756 : int
757 0 : fd_blockstore_block_hash_query( fd_blockstore_t * blockstore, ulong slot, fd_hash_t * hash_out ) {
758 0 : for(;;) { /* Speculate */
759 0 : fd_block_map_query_t query[1] = { 0 };
760 0 : int err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, query, 0 );
761 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) return FD_BLOCKSTORE_ERR_KEY;
762 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
763 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
764 0 : *hash_out = block_info->block_hash;
765 0 : if( FD_LIKELY( fd_block_map_query_test( query ) == FD_MAP_SUCCESS ) ) return FD_BLOCKSTORE_SUCCESS;
766 0 : }
767 0 : }
768 :
769 : int
770 0 : fd_blockstore_bank_hash_query( fd_blockstore_t * blockstore, ulong slot, fd_hash_t * hash_out ) {
771 0 : for(;;) { /* Speculate */
772 0 : fd_block_map_query_t query[1] = { 0 };
773 0 : int err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, query, 0 );
774 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) return FD_BLOCKSTORE_ERR_KEY;
775 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
776 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
777 0 : *hash_out = block_info->bank_hash;
778 0 : if( FD_LIKELY( fd_block_map_query_test( query ) == FD_MAP_SUCCESS ) ) return FD_BLOCKSTORE_SUCCESS;
779 0 : }
780 0 : }
781 :
782 : ulong
783 0 : fd_blockstore_parent_slot_query( fd_blockstore_t * blockstore, ulong slot ) {
784 0 : int err = FD_MAP_ERR_AGAIN;
785 0 : ulong parent_slot = FD_SLOT_NULL;
786 0 : while( err == FD_MAP_ERR_AGAIN ){
787 0 : fd_block_map_query_t query[1] = { 0 };
788 0 : err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, query, 0 );
789 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
790 :
791 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) return FD_SLOT_NULL;
792 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
793 :
794 0 : parent_slot = block_info->parent_slot;
795 0 : err = fd_block_map_query_test( query );
796 0 : }
797 0 : return parent_slot;
798 0 : }
799 :
800 : int
801 : fd_blockstore_slice_query( fd_blockstore_t * blockstore,
802 : ulong slot,
803 : uint start_idx,
804 : uint end_idx /* inclusive */,
805 : ulong max,
806 : uchar * buf,
807 0 : ulong * buf_sz ) {
808 : /* verify that the batch idxs provided is at batch boundaries*/
809 :
810 : // FD_LOG_NOTICE(( "querying for %lu %u %u", slot, start_idx, end_idx ));
811 :
812 0 : ulong off = 0;
813 0 : for(uint idx = start_idx; idx <= end_idx; idx++) {
814 0 : ulong payload_sz = 0;
815 :
816 0 : for(;;) { /* speculative copy one shred */
817 0 : fd_shred_key_t key = { slot, idx };
818 0 : fd_buf_shred_map_query_t query[1] = { 0 };
819 0 : int err = fd_buf_shred_map_query_try( blockstore->shred_map, &key, NULL, query, 0 );
820 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_CORRUPT ) ){
821 0 : FD_LOG_WARNING(( "[%s] key: (%lu, %u) %s", __func__, slot, idx, fd_buf_shred_map_strerror( err ) ));
822 0 : return FD_BLOCKSTORE_ERR_CORRUPT;
823 0 : }
824 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ){
825 0 : FD_LOG_WARNING(( "[%s] key: (%lu, %u) %s", __func__, slot, idx, fd_buf_shred_map_strerror( err ) ));
826 0 : return FD_BLOCKSTORE_ERR_KEY;
827 0 : }
828 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
829 :
830 0 : fd_buf_shred_t const * shred = fd_buf_shred_map_query_ele_const( query );
831 0 : uchar const * payload = fd_shred_data_payload( &shred->hdr );
832 0 : payload_sz = fd_shred_payload_sz( &shred->hdr );
833 0 : if( FD_UNLIKELY( off + payload_sz > max ) ) {
834 0 : FD_LOG_WARNING(( "[%s] increase `max`", __func__ )); /* caller needs to increase max */
835 0 : return FD_BLOCKSTORE_ERR_INVAL;
836 0 : }
837 :
838 0 : if( FD_UNLIKELY( payload_sz > FD_SHRED_DATA_PAYLOAD_MAX ) ) return FD_BLOCKSTORE_ERR_SHRED_INVALID;
839 0 : if( FD_UNLIKELY( off + payload_sz > max ) ) return FD_BLOCKSTORE_ERR_NO_MEM;
840 0 : fd_memcpy( buf + off, payload, payload_sz );
841 0 : err = fd_buf_shred_map_query_test( query );
842 0 : if( FD_LIKELY( err == FD_MAP_SUCCESS ) ) break;
843 0 : }; /* successful speculative copy */
844 :
845 0 : off += payload_sz;
846 0 : }
847 0 : *buf_sz = off;
848 0 : return FD_BLOCKSTORE_SUCCESS;
849 0 : }
850 :
851 : int
852 0 : fd_blockstore_shreds_complete( fd_blockstore_t * blockstore, ulong slot ){
853 : //fd_block_t * block_exists = fd_blockstore_block_query( blockstore, slot );
854 0 : fd_block_map_query_t query[1];
855 0 : int complete = 0;
856 0 : int err = FD_MAP_ERR_AGAIN;
857 0 : while( err == FD_MAP_ERR_AGAIN ){
858 0 : err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, query, 0 );
859 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
860 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) return 0;
861 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
862 0 : complete = ( block_info->buffered_idx != FD_SHRED_IDX_NULL ) &&
863 0 : ( block_info->slot_complete_idx == block_info->buffered_idx );
864 0 : err = fd_block_map_query_test( query );
865 0 : }
866 0 : return complete;
867 :
868 : /* When replacing block_query( slot ) != NULL with this function:
869 : There are other things verified in a successful deshred & scan block that are not verified here.
870 : scan_block does a round of well-formedness checks like parsing txns, and no premature end of batch
871 : like needing cnt, microblock, microblock format.
872 :
873 : This maybe should be fine in places where we check both
874 : shreds_complete and flag PROCESSED/REPLAYING is set, because validation has been for sure done
875 : if the block has been replayed
876 :
877 : Should be careful in places that call this now that happen before the block is replayed, if we want
878 : to assume the shreds are well-formed we can't. */
879 :
880 0 : }
881 :
882 : int
883 : fd_blockstore_block_map_query_volatile( fd_blockstore_t * blockstore,
884 : int fd,
885 : ulong slot,
886 0 : fd_block_info_t * block_info_out ) {
887 :
888 : /* WARNING: this code is extremely delicate. Do NOT modify without
889 : understanding all the invariants. In particular, we must never
890 : dereference through a corrupt pointer. It's OK for the destination
891 : data to be overwritten/invalid as long as the memory location is
892 : valid. As long as we don't crash, we can validate the data after it
893 : is read. */
894 :
895 0 : fd_block_idx_t * block_idx = fd_blockstore_block_idx( blockstore );
896 :
897 0 : ulong off = ULONG_MAX;
898 0 : for( ;; ) {
899 0 : fd_block_idx_t * idx_entry = fd_block_idx_query( block_idx, slot, NULL );
900 0 : if( FD_LIKELY( idx_entry ) ) off = idx_entry->off;
901 0 : break;
902 0 : }
903 :
904 0 : if( FD_UNLIKELY( off < ULONG_MAX ) ) { /* optimize for non-archival queries */
905 0 : if( FD_UNLIKELY( lseek( fd, (long)off, SEEK_SET ) == -1 ) ) {
906 0 : FD_LOG_WARNING(( "failed to seek" ));
907 0 : return FD_BLOCKSTORE_ERR_SLOT_MISSING;
908 0 : }
909 0 : ulong rsz;
910 0 : int err = fd_io_read( fd, block_info_out, sizeof( fd_block_info_t ), sizeof( fd_block_info_t ), &rsz );
911 0 : if( FD_UNLIKELY( err ) ) {
912 0 : FD_LOG_WARNING(( "failed to read block map entry" ));
913 0 : return FD_BLOCKSTORE_ERR_SLOT_MISSING;
914 0 : }
915 0 : return FD_BLOCKSTORE_SUCCESS;
916 0 : }
917 :
918 0 : int err = FD_MAP_ERR_AGAIN;
919 0 : while( err == FD_MAP_ERR_AGAIN ) {
920 0 : fd_block_map_query_t quer[1] = { 0 };
921 0 : err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, quer, 0 );
922 0 : fd_block_info_t const * query = fd_block_map_query_ele_const( quer );
923 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) return FD_BLOCKSTORE_ERR_SLOT_MISSING;
924 :
925 0 : *block_info_out = *query;
926 :
927 0 : err = fd_block_map_query_test( quer );
928 0 : }
929 0 : return FD_BLOCKSTORE_SUCCESS;
930 0 : }
931 :
932 : fd_txn_map_t *
933 0 : fd_blockstore_txn_query( fd_blockstore_t * blockstore, uchar const sig[FD_ED25519_SIG_SZ] ) {
934 0 : fd_txn_key_t key;
935 0 : fd_memcpy( &key, sig, sizeof( key ) );
936 0 : return fd_txn_map_query( fd_blockstore_txn_map( blockstore ), &key, NULL );
937 0 : }
938 :
939 : int
940 : fd_blockstore_txn_query_volatile( fd_blockstore_t * blockstore,
941 : int fd,
942 : uchar const sig[FD_ED25519_SIG_SZ],
943 : fd_txn_map_t * txn_out,
944 : long * blk_ts,
945 : uchar * blk_flags,
946 0 : uchar txn_data_out[FD_TXN_MTU] ) {
947 : /* WARNING: this code is extremely delicate. Do NOT modify without
948 : understanding all the invariants. In particular, we must never
949 : dereference through a corrupt pointer. It's OK for the
950 : destination data to be overwritten/invalid as long as the memory
951 : location is valid. As long as we don't crash, we can validate the
952 : data after it is read. */
953 0 : (void)blockstore;
954 0 : (void)fd;
955 0 : (void)sig;
956 0 : (void)txn_out;
957 0 : (void)blk_ts;
958 0 : (void)blk_flags;
959 0 : (void)txn_data_out;
960 0 : return FD_BLOCKSTORE_ERR_SLOT_MISSING;
961 : #if BLOCK_ARCHIVING
962 : fd_wksp_t * wksp = fd_blockstore_wksp( blockstore );
963 : fd_txn_map_t * txn_map = fd_blockstore_txn_map( blockstore );
964 :
965 : for(;;) {
966 : fd_txn_key_t key;
967 : memcpy( &key, sig, sizeof(key) );
968 : fd_txn_map_t const * txn_map_entry = fd_txn_map_query_safe( txn_map, &key, NULL );
969 : if( FD_UNLIKELY( txn_map_entry == NULL ) ) return FD_BLOCKSTORE_ERR_TXN_MISSING;
970 : memcpy( txn_out, txn_map_entry, sizeof(fd_txn_map_t) );
971 : break;
972 : }
973 :
974 : fd_block_idx_t * block_idx = fd_blockstore_block_idx( blockstore );
975 :
976 : ulong off = ULONG_MAX;
977 : for(;;) {
978 : fd_block_idx_t * idx_entry = fd_block_idx_query( block_idx, txn_out->slot, NULL );
979 : if( FD_LIKELY( idx_entry ) ) off = idx_entry->off;
980 : break;
981 : }
982 :
983 : if ( FD_UNLIKELY( off < ULONG_MAX ) ) { /* optimize for non-archival */
984 : if( FD_UNLIKELY( lseek( fd, (long)off, SEEK_SET ) == -1 ) ) {
985 : FD_LOG_WARNING(( "failed to seek" ));
986 : return FD_BLOCKSTORE_ERR_SLOT_MISSING;
987 : }
988 : fd_block_info_t block_info;
989 : ulong rsz; int err;
990 : err = fd_io_read( fd, &block_info, sizeof(fd_block_info_t), sizeof(fd_block_info_t), &rsz );
991 : check_read_write_err( err );
992 : err = fd_io_read( fd, txn_data_out, txn_out->sz, txn_out->sz, &rsz );
993 : check_read_write_err( err );
994 : err = (int)lseek( fd, (long)off + (long)txn_out->offset, SEEK_SET );
995 : check_read_write_err( err );
996 : err = fd_io_read( fd, txn_data_out, txn_out->sz, txn_out->sz, &rsz );
997 : check_read_write_err( err);
998 : return FD_BLOCKSTORE_SUCCESS;
999 : }
1000 :
1001 : for(;;) {
1002 : fd_block_map_query_t quer[1] = { 0 };
1003 : fd_block_map_query_try( blockstore->block_map, &txn_out->slot, NULL, quer, 0 );
1004 : fd_block_info_t const * query = fd_block_map_query_ele_const( quer );
1005 :
1006 : if( FD_UNLIKELY( !query ) ) return FD_BLOCKSTORE_ERR_TXN_MISSING;
1007 : ulong blk_gaddr = query->block_gaddr;
1008 : if( FD_UNLIKELY( !blk_gaddr ) ) return FD_BLOCKSTORE_ERR_TXN_MISSING;
1009 :
1010 : if( fd_block_map_query_test( quer ) ) continue;
1011 :
1012 : fd_block_t * blk = fd_wksp_laddr_fast( wksp, blk_gaddr );
1013 : if( blk_ts ) *blk_ts = query->ts;
1014 : if( blk_flags ) *blk_flags = query->flags;
1015 : ulong ptr = blk->data_gaddr;
1016 : ulong sz = blk->data_sz;
1017 : if( txn_out->offset + txn_out->sz > sz || txn_out->sz > FD_TXN_MTU ) continue;
1018 :
1019 : if( FD_UNLIKELY( fd_block_map_query_test( quer ) ) ) continue;
1020 :
1021 : if( txn_data_out == NULL ) return FD_BLOCKSTORE_SUCCESS;
1022 : uchar const * data = fd_wksp_laddr_fast( wksp, ptr );
1023 : fd_memcpy( txn_data_out, data + txn_out->offset, txn_out->sz );
1024 :
1025 : if( FD_UNLIKELY( fd_block_map_query_test( quer ) ) ) continue;
1026 :
1027 : return FD_BLOCKSTORE_SUCCESS;
1028 : }
1029 : #endif
1030 0 : }
1031 :
1032 : void
1033 0 : fd_blockstore_block_height_update( fd_blockstore_t * blockstore, ulong slot, ulong height ) {
1034 0 : fd_block_map_query_t query[1] = { 0 };
1035 : // TODO make nonblocking
1036 0 : int err = fd_block_map_prepare( blockstore->block_map, &slot, NULL, query, FD_MAP_FLAG_BLOCKING );
1037 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
1038 0 : if( FD_UNLIKELY( err || block_info->slot != slot ) ) {
1039 0 : fd_block_map_cancel( query );
1040 0 : return;
1041 0 : };
1042 0 : block_info->block_height = height;
1043 0 : fd_block_map_publish( query );
1044 0 : }
1045 :
1046 : ulong
1047 0 : fd_blockstore_block_height_query( fd_blockstore_t * blockstore, ulong slot ) {
1048 0 : ulong block_entry_height = 0;
1049 0 : for(;;){
1050 0 : fd_block_map_query_t query[1] = { 0 };
1051 0 : int err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, query, FD_MAP_FLAG_BLOCKING );
1052 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
1053 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) FD_LOG_ERR(( "Failed to query blockstore for slot %lu", slot ));
1054 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
1055 0 : block_entry_height = block_info->block_height;
1056 0 : if( FD_UNLIKELY( fd_block_map_query_test( query ) == FD_MAP_SUCCESS ) ) break;
1057 0 : }
1058 0 : return block_entry_height;
1059 0 : }
1060 :
1061 : void
1062 0 : fd_blockstore_log_block_status( fd_blockstore_t * blockstore, ulong around_slot ) {
1063 0 : fd_block_map_query_t query[1] = { 0 };
1064 0 : uint received_idx = 0;
1065 0 : uint buffered_idx = 0;
1066 0 : uint slot_complete_idx = 0;
1067 :
1068 0 : for( ulong i = around_slot - 5; i < around_slot + 20; ++i ) {
1069 0 : int err = FD_MAP_ERR_AGAIN;
1070 0 : while( err == FD_MAP_ERR_AGAIN ){
1071 0 : err = fd_block_map_query_try( blockstore->block_map, &i, NULL, query, 0 );
1072 0 : fd_block_info_t * slot_entry = fd_block_map_query_ele( query );
1073 0 : if( err == FD_MAP_ERR_KEY ) break;
1074 0 : if( err == FD_MAP_ERR_AGAIN ) continue;
1075 0 : received_idx = slot_entry->received_idx;
1076 0 : buffered_idx = slot_entry->buffered_idx;
1077 0 : slot_complete_idx = slot_entry->slot_complete_idx;
1078 0 : err = fd_block_map_query_test( query );
1079 0 : if( err == FD_MAP_ERR_KEY ) break;
1080 0 : }
1081 :
1082 0 : if( err == FD_MAP_ERR_KEY ) continue;
1083 :
1084 0 : FD_LOG_NOTICE(( "%sslot=%lu received=%u consumed=%u finished=%u",
1085 0 : ( i == around_slot ? "*" : " " ),
1086 0 : i,
1087 0 : received_idx,
1088 0 : buffered_idx,
1089 0 : slot_complete_idx ));
1090 0 : }
1091 0 : }
1092 :
1093 : static char *
1094 0 : fd_smart_size( ulong sz, char * tmp, size_t tmpsz ) {
1095 0 : if( sz <= (1UL<<7) )
1096 0 : snprintf( tmp, tmpsz, "%lu B", sz );
1097 0 : else if( sz <= (1UL<<17) )
1098 0 : snprintf( tmp, tmpsz, "%.3f KB", ((double)sz/((double)(1UL<<10))) );
1099 0 : else if( sz <= (1UL<<27) )
1100 0 : snprintf( tmp, tmpsz, "%.3f MB", ((double)sz/((double)(1UL<<20))) );
1101 0 : else
1102 0 : snprintf( tmp, tmpsz, "%.3f GB", ((double)sz/((double)(1UL<<30))) );
1103 0 : return tmp;
1104 0 : }
1105 :
1106 : void
1107 0 : fd_blockstore_log_mem_usage( fd_blockstore_t * blockstore ) {
1108 0 : char tmp1[100];
1109 :
1110 0 : FD_LOG_NOTICE(( "blockstore base footprint: %s",
1111 0 : fd_smart_size( sizeof(fd_blockstore_t), tmp1, sizeof(tmp1) ) ));
1112 0 : ulong shred_max = fd_buf_shred_pool_ele_max( blockstore->shred_pool );
1113 0 : FD_LOG_NOTICE(( "shred pool footprint: %s %lu entries)",
1114 0 : fd_smart_size( fd_buf_shred_pool_footprint(), tmp1, sizeof(tmp1) ),
1115 0 : shred_max ));
1116 0 : ulong shred_map_cnt = fd_buf_shred_map_chain_cnt( blockstore->shred_map );
1117 0 : FD_LOG_NOTICE(( "shred map footprint: %s (%lu chains, load is %.3f)",
1118 0 : fd_smart_size( fd_buf_shred_map_footprint( shred_map_cnt ), tmp1, sizeof(tmp1) ),
1119 0 : shred_map_cnt,
1120 0 : (double)shred_map_cnt) );
1121 :
1122 : /*fd_block_info_t * slot_map = fd_blockstore_block_map( blockstore );
1123 : ulong slot_map_cnt = fd_block_map_key_cnt( slot_map );
1124 : ulong slot_map_max = fd_block_map_key_max( slot_map );
1125 : FD_LOG_NOTICE(( "slot map footprint: %s (%lu entries used out of %lu, %lu%%)",
1126 : fd_smart_size( fd_block_map_footprint( slot_map_max ), tmp1, sizeof(tmp1) ),
1127 : slot_map_cnt,
1128 : slot_map_max,
1129 : (100U*slot_map_cnt)/slot_map_max )); */
1130 :
1131 0 : fd_txn_map_t * txn_map = fd_blockstore_txn_map( blockstore );
1132 0 : ulong txn_map_cnt = fd_txn_map_key_cnt( txn_map );
1133 0 : ulong txn_map_max = fd_txn_map_key_max( txn_map );
1134 0 : FD_LOG_NOTICE(( "txn map footprint: %s (%lu entries used out of %lu, %lu%%)",
1135 0 : fd_smart_size( fd_txn_map_footprint( txn_map_max ), tmp1, sizeof(tmp1) ),
1136 0 : txn_map_cnt,
1137 0 : txn_map_max,
1138 0 : (100U*txn_map_cnt)/txn_map_max ));
1139 0 : ulong block_cnt = 0;
1140 :
1141 0 : ulong * q = fd_blockstore_slot_deque( blockstore );
1142 0 : fd_slot_deque_remove_all( q );
1143 0 : fd_slot_deque_push_tail( q, blockstore->shmem->wmk );
1144 0 : while( !fd_slot_deque_empty( q ) ) {
1145 0 : ulong curr = fd_slot_deque_pop_head( q );
1146 :
1147 0 : fd_block_map_query_t query[1] = { 0 };
1148 0 : int err = fd_block_map_query_try( blockstore->block_map, &curr, NULL, query, FD_MAP_FLAG_BLOCKING );
1149 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
1150 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY || !block_info ) ) continue;
1151 :
1152 0 : for( ulong i = 0; i < block_info->child_slot_cnt; i++ ) {
1153 0 : fd_slot_deque_push_tail( q, block_info->child_slots[i] );
1154 0 : }
1155 0 : }
1156 :
1157 0 : if( block_cnt )
1158 0 : FD_LOG_NOTICE(( "block cnt: %lu",
1159 0 : block_cnt ));
1160 0 : }
|