Line data Source code
1 : /* Store tile manages a blockstore and serves requests to repair and replay. */
2 : #include "fd_store.h"
3 : #define _GNU_SOURCE
4 :
5 : #include "generated/fd_storei_tile_seccomp.h"
6 :
7 : #include "fd_trusted_slots.h"
8 : #include "../shred/fd_shred_cap.h"
9 :
10 : #include "../../disco/tiles.h"
11 : #include "../../disco/metrics/fd_metrics.h"
12 : #include "../../flamenco/runtime/fd_blockstore.h"
13 : #include "../../disco/shred/fd_stake_ci.h"
14 : #include "../../disco/keyguard/fd_keyload.h"
15 : #include "../../disco/topo/fd_pod_format.h"
16 : #include "../../flamenco/runtime/fd_runtime.h"
17 : #include "../../disco/metrics/fd_metrics.h"
18 :
19 : #include <fcntl.h>
20 : #include <unistd.h>
21 : #include <arpa/inet.h>
22 : #include <linux/unistd.h>
23 : #include <sys/random.h>
24 : #include <netdb.h>
25 : #include <netinet/in.h>
26 : #include <sys/socket.h>
27 :
28 0 : #define STAKE_IN_IDX 0
29 0 : #define REPAIR_IN_IDX 1
30 0 : #define RESTART_IN_IDX 2
31 0 : #define NON_SHRED_LINKS 3 /* stake, repair, and replay are the 3 links not from shred tile */
32 :
33 0 : #define REPLAY_OUT_IDX 0
34 0 : #define REPAIR_OUT_IDX 1
35 0 : #define RESTART_OUT_IDX 2
36 :
37 : /* TODO: Determine/justify optimal number of repair requests */
38 0 : #define MAX_REPAIR_REQS ( (ulong)USHORT_MAX / sizeof(fd_repair_request_t) )
39 :
40 0 : #define SCRATCH_SMAX (512UL << 21UL)
41 0 : #define SCRATCH_SDEPTH (128UL)
42 :
43 : struct fd_txn_iter {
44 : ulong slot;
45 : fd_raw_block_txn_iter_t iter;
46 : };
47 :
48 : typedef struct fd_txn_iter fd_txn_iter_t;
49 :
50 : #define MAP_NAME fd_txn_iter_map
51 0 : #define MAP_T fd_txn_iter_t
52 : #define MAP_KEY_T ulong
53 0 : #define MAP_KEY slot
54 0 : #define MAP_KEY_NULL FD_SLOT_NULL
55 : #define MAP_KEY_INVAL(k) MAP_KEY_EQUAL(k, FD_SLOT_NULL)
56 : #define MAP_KEY_EQUAL(k0,k1) (k0==k1)
57 : #define MAP_KEY_EQUAL_IS_SLOW 0
58 : #define MAP_MEMOIZE 0
59 : #define MAP_KEY_HASH(key) ((uint)fd_ulong_hash( key ))
60 0 : #define MAP_LG_SLOT_CNT 5
61 : #include "../../util/tmpl/fd_map.c"
62 :
63 : struct fd_store_in_ctx {
64 : fd_wksp_t * mem;
65 : ulong chunk0;
66 : ulong wmark;
67 : };
68 : typedef struct fd_store_in_ctx fd_store_in_ctx_t;
69 :
70 : struct fd_store_tile_metrics {
71 : ulong first_turbine_slot;
72 : ulong current_turbine_slot;
73 : };
74 : typedef struct fd_store_tile_metrics fd_store_tile_metrics_t;
75 : #define FD_STORE_TILE_METRICS_FOOTPRINT ( sizeof( fd_store_tile_metrics_t ) )
76 :
77 : struct fd_store_tile_ctx {
78 : fd_wksp_t * wksp;
79 : fd_wksp_t * blockstore_wksp;
80 :
81 : fd_pubkey_t identity_key[1]; /* Just the public key */
82 :
83 : fd_store_t * store;
84 : fd_blockstore_t blockstore_ljoin;
85 : int blockstore_fd; /* file descriptor for archival file */
86 : fd_blockstore_t * blockstore;
87 :
88 : fd_wksp_t * stake_in_mem;
89 : ulong stake_in_chunk0;
90 : ulong stake_in_wmark;
91 :
92 : fd_wksp_t * repair_in_mem;
93 : ulong repair_in_chunk0;
94 : ulong repair_in_wmark;
95 :
96 : fd_wksp_t * restart_in_mem;
97 : ulong restart_in_chunk0;
98 : ulong restart_in_wmark;
99 :
100 : ulong shred_in_cnt;
101 : fd_store_in_ctx_t shred_in[ 32 ];
102 :
103 : fd_frag_meta_t * repair_req_out_mcache;
104 : ulong * repair_req_out_sync;
105 : ulong repair_req_out_depth;
106 : ulong repair_req_out_seq;
107 :
108 : fd_wksp_t * repair_req_out_mem;
109 : ulong repair_req_out_chunk0;
110 : ulong repair_req_out_wmark;
111 : ulong repair_req_out_chunk;
112 :
113 : fd_frag_meta_t * replay_out_mcache;
114 : ulong * replay_out_sync;
115 : ulong replay_out_depth;
116 : ulong replay_out_seq;
117 :
118 : fd_wksp_t * replay_out_mem;
119 : ulong replay_out_chunk0;
120 : ulong replay_out_wmark;
121 : ulong replay_out_chunk;
122 :
123 : fd_frag_meta_t * restart_out_mcache;
124 : ulong * restart_out_sync;
125 : ulong restart_out_depth;
126 : ulong restart_out_seq;
127 :
128 : fd_wksp_t * restart_out_mem;
129 : ulong restart_out_chunk0;
130 : ulong restart_out_wmark;
131 : ulong restart_out_chunk;
132 :
133 : fd_shred34_t s34_buffer[1];
134 : uchar shred_buffer[FD_SHRED_MAX_SZ];
135 : fd_txn_p_t pack_buffer[MAX_TXN_PER_MICROBLOCK];
136 :
137 : fd_repair_request_t * repair_req_buffer;
138 :
139 : fd_stake_ci_t * stake_ci;
140 :
141 : ulong * root_slot_fseq;
142 :
143 : int sim;
144 : ulong sim_end_slot;
145 :
146 : fd_shred_cap_ctx_t shred_cap_ctx;
147 :
148 : fd_trusted_slots_t * trusted_slots;
149 : int is_trusted;
150 :
151 : fd_txn_iter_t * txn_iter_map;
152 :
153 : ulong restart_funk_root;
154 : ulong restart_heaviest_fork_slot;
155 :
156 : /* Metrics */
157 : fd_store_tile_metrics_t metrics;
158 :
159 : ulong turbine_cnt;
160 : ulong repair_cnt;
161 : };
162 : typedef struct fd_store_tile_ctx fd_store_tile_ctx_t;
163 :
164 :
165 : FD_FN_CONST static inline ulong
166 0 : scratch_align( void ) {
167 0 : return 128UL;
168 0 : }
169 :
170 : FD_FN_PURE static inline ulong
171 0 : loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
172 0 : return 4UL * FD_SHMEM_GIGANTIC_PAGE_SZ;
173 0 : }
174 :
175 : FD_FN_PURE static inline ulong
176 0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED) {
177 0 : ulong l = FD_LAYOUT_INIT;
178 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_store_tile_ctx_t), sizeof(fd_store_tile_ctx_t) );
179 0 : l = FD_LAYOUT_APPEND( l, fd_store_align(), fd_store_footprint() );
180 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_repair_request_t), MAX_REPAIR_REQS * sizeof(fd_repair_request_t) );
181 0 : l = FD_LAYOUT_APPEND( l, fd_stake_ci_align(), fd_stake_ci_footprint() );
182 0 : l = FD_LAYOUT_APPEND( l, fd_trusted_slots_align(), fd_trusted_slots_footprint( MAX_SLOTS_PER_EPOCH ) );
183 0 : l = FD_LAYOUT_APPEND( l, fd_txn_iter_map_align(), fd_txn_iter_map_footprint() );
184 0 : l = FD_LAYOUT_APPEND( l, fd_scratch_smem_align(), fd_scratch_smem_footprint( SCRATCH_SMAX ) );
185 0 : l = FD_LAYOUT_APPEND( l, fd_scratch_fmem_align(), fd_scratch_fmem_footprint( SCRATCH_SDEPTH ) );
186 0 : return FD_LAYOUT_FINI( l, scratch_align() );
187 0 : }
188 :
189 : static void
190 : during_frag( fd_store_tile_ctx_t * ctx,
191 : ulong in_idx,
192 : ulong seq FD_PARAM_UNUSED,
193 : ulong sig,
194 : ulong chunk,
195 : ulong sz,
196 0 : ulong ctl FD_PARAM_UNUSED ) {
197 :
198 0 : if( FD_UNLIKELY( in_idx==STAKE_IN_IDX ) ) {
199 0 : if( FD_UNLIKELY( chunk<ctx->stake_in_chunk0 || chunk>ctx->stake_in_wmark ) )
200 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
201 0 : ctx->stake_in_chunk0, ctx->stake_in_wmark ));
202 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->stake_in_mem, chunk );
203 0 : fd_stake_ci_stake_msg_init( ctx->stake_ci, dcache_entry );
204 0 : return;
205 0 : }
206 :
207 0 : if( FD_UNLIKELY( in_idx==REPAIR_IN_IDX ) ) {
208 0 : if( FD_UNLIKELY( chunk<ctx->repair_in_chunk0 || chunk>ctx->repair_in_wmark || sz > FD_SHRED_MAX_SZ ) ) {
209 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->repair_in_chunk0, ctx->repair_in_wmark ));
210 0 : }
211 :
212 0 : uchar const * shred = fd_chunk_to_laddr_const( ctx->repair_in_mem, chunk );
213 :
214 0 : memcpy( ctx->shred_buffer, shred, sz );
215 0 : return;
216 0 : }
217 :
218 0 : if( FD_UNLIKELY( in_idx==RESTART_IN_IDX ) ) {
219 0 : if( FD_UNLIKELY( chunk<ctx->restart_in_chunk0 || chunk>ctx->restart_in_wmark || sz>sizeof(ulong)*2 ) ) {
220 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->restart_in_chunk0, ctx->restart_in_wmark ));
221 0 : }
222 :
223 0 : FD_TEST( sz==sizeof(ulong)*2 );
224 0 : if( FD_UNLIKELY( ctx->restart_heaviest_fork_slot!=0 ) ) {
225 0 : FD_LOG_ERR(( "Store tile should only receive heaviest_fork_slot once during wen-restart. Something may have corrupted." ));
226 0 : }
227 0 : const uchar * buf = fd_chunk_to_laddr_const( ctx->restart_in_mem, chunk );
228 0 : ctx->restart_heaviest_fork_slot = FD_LOAD( ulong, buf );
229 0 : ctx->restart_funk_root = FD_LOAD( ulong, buf+sizeof(ulong) );
230 :
231 0 : return;
232 0 : }
233 :
234 : /* everything else is shred tiles */
235 0 : fd_store_in_ctx_t * shred_in = &ctx->shred_in[ in_idx-NON_SHRED_LINKS ];
236 0 : if( FD_UNLIKELY( chunk<shred_in->chunk0 || chunk>shred_in->wmark || sz > sizeof(fd_shred34_t) ) ) {
237 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, shred_in->chunk0 , shred_in->wmark ));
238 0 : }
239 :
240 0 : ctx->is_trusted = sig==1;
241 0 : fd_shred34_t const * s34 = fd_chunk_to_laddr_const( shred_in->mem, chunk );
242 :
243 0 : memcpy( ctx->s34_buffer, s34, sz );
244 0 : }
245 :
246 : static void
247 : after_frag( fd_store_tile_ctx_t * ctx,
248 : ulong in_idx,
249 : ulong seq FD_PARAM_UNUSED,
250 : ulong sig FD_PARAM_UNUSED,
251 : ulong sz FD_PARAM_UNUSED,
252 : ulong tsorig FD_PARAM_UNUSED,
253 : ulong tspub FD_PARAM_UNUSED,
254 0 : fd_stem_context_t * stem FD_PARAM_UNUSED ) {
255 0 : if( FD_UNLIKELY( in_idx==STAKE_IN_IDX ) ) {
256 0 : fd_stake_ci_stake_msg_fini( ctx->stake_ci );
257 0 : return;
258 0 : }
259 :
260 0 : if( FD_UNLIKELY( in_idx==REPAIR_IN_IDX ) ) {
261 0 : fd_shred_t const * shred = (fd_shred_t const *)fd_type_pun_const( ctx->shred_buffer );
262 0 : if( !fd_pending_slots_check( ctx->store->pending_slots, shred->slot ) ) {
263 0 : FD_LOG_WARNING(("received repair shred %lu that would overrun pending queue. skipping.", shred->slot));
264 0 : return;
265 0 : }
266 :
267 0 : if( FD_UNLIKELY( (long)(ctx->store->curr_turbine_slot - shred->slot) > (long)8192 ) ) {
268 0 : FD_LOG_WARNING(("received repair shred with slot %lu that would overrun pending queue. skipping.", shred->slot));
269 0 : return;
270 0 : }
271 :
272 0 : if( fd_store_shred_insert( ctx->store, shred ) < FD_BLOCKSTORE_SUCCESS ) {
273 0 : FD_LOG_ERR(( "failed inserting to blockstore" ));
274 0 : } else if ( ctx->shred_cap_ctx.is_archive ) {
275 0 : uchar shred_cap_flag = FD_SHRED_CAP_FLAG_MARK_REPAIR( 0 );
276 0 : if( fd_shred_cap_archive( &ctx->shred_cap_ctx, shred, shred_cap_flag ) < FD_SHRED_CAP_OK ) {
277 0 : FD_LOG_ERR( ( "failed at archiving repair shred to file" ) );
278 0 : }
279 0 : }
280 0 : ctx->repair_cnt++;
281 0 : return;
282 0 : }
283 :
284 0 : if( FD_UNLIKELY( in_idx==RESTART_IN_IDX ) ) {
285 0 : FD_LOG_NOTICE(( "Store tile starts to repair backwards from slot%lu, which should be on the same fork as slot%lu",
286 0 : ctx->restart_heaviest_fork_slot, ctx->restart_funk_root ));
287 0 : fd_store_add_pending( ctx->store, ctx->restart_heaviest_fork_slot, (long)5e6, 0, 0 );
288 0 : return;
289 0 : }
290 :
291 : /* everything else is shred */
292 0 : FD_TEST( ctx->s34_buffer->shred_cnt>0UL );
293 :
294 0 : if( FD_UNLIKELY( ctx->is_trusted ) ) {
295 : /* this slot is coming from our leader pipeline */
296 0 : fd_trusted_slots_add( ctx->trusted_slots, ctx->s34_buffer->pkts[ 0 ].shred.slot );
297 0 : }
298 0 : for( ulong i = 0; i < ctx->s34_buffer->shred_cnt; i++ ) {
299 0 : fd_shred_t * shred = &ctx->s34_buffer->pkts[i].shred;
300 : // TODO: these checks are not great as they assume a lot about the distance of shreds.
301 0 : if( !fd_pending_slots_check( ctx->store->pending_slots, shred->slot ) ) {
302 0 : FD_LOG_WARNING(("received shred %lu that would overrun pending queue. skipping.", shred->slot));
303 0 : continue;
304 0 : }
305 :
306 0 : if( FD_UNLIKELY( (long)(ctx->store->curr_turbine_slot - shred->slot) > (long)8192 ) ) {
307 0 : FD_LOG_WARNING(("received shred with slot %lu that would overrun pending queue. skipping.", shred->slot));
308 0 : continue;
309 0 : }
310 : // TODO: improve return value of api to not use < OK
311 :
312 0 : if( fd_store_shred_insert( ctx->store, shred ) < FD_BLOCKSTORE_SUCCESS ) {
313 0 : FD_LOG_ERR(( "failed inserting to blockstore" ));
314 0 : } else if ( ctx->shred_cap_ctx.is_archive ) {
315 0 : uchar shred_cap_flag = FD_SHRED_CAP_FLAG_MARK_TURBINE(0);
316 0 : if ( fd_shred_cap_archive(&ctx->shred_cap_ctx, shred, shred_cap_flag) < FD_SHRED_CAP_OK ) {
317 0 : FD_LOG_ERR(( "failed at archiving turbine shred to file" ));
318 0 : }
319 0 : }
320 0 : ctx->turbine_cnt++;
321 :
322 0 : fd_store_shred_update_with_shred_from_turbine( ctx->store, shred );
323 0 : }
324 0 : }
325 :
326 : static void
327 : fd_store_tile_slot_prepare( fd_store_tile_ctx_t * ctx,
328 : fd_stem_context_t * stem,
329 : int store_slot_prepare_mode,
330 0 : ulong slot ) {
331 0 : ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
332 0 : fd_repair_request_t * repair_reqs = fd_chunk_to_laddr( ctx->repair_req_out_mem, ctx->repair_req_out_chunk );
333 : /* We are leader at this slot and the slot is newer than turbine! */
334 : // FIXME: I dont think that this `ctx->store->curr_turbine_slot >= slot`
335 : // check works on fork switches to lower slot numbers. Use a given fork height
336 : // instead
337 : // if( ctx->store->curr_turbine_slot >= slot
338 : // && memcmp( ctx->identity_key, slot_leader, sizeof(fd_pubkey_t) ) == 0 ) {
339 : // if( store_slot_prepare_mode == FD_STORE_SLOT_PREPARE_CONTINUE ) {
340 : // fd_block_t * block = fd_blockstore_block_query( ctx->blockstore, slot );
341 : // if( FD_LIKELY( block ) ) {
342 : // block->flags = fd_uchar_set_bit( block->flags, FD_BLOCK_FLAG_PROCESSED );
343 : // }
344 : // } else {
345 : // return;
346 : // }
347 : // }
348 :
349 0 : ulong repair_req_cnt = 0;
350 0 : switch( store_slot_prepare_mode ) {
351 0 : case FD_STORE_SLOT_PREPARE_CONTINUE: {
352 0 : ulong root = fd_fseq_query( ctx->root_slot_fseq );
353 0 : if( root!=ULONG_MAX ) {
354 : // FD_LOG_WARNING(("CONTINUE: %lu", root));
355 0 : fd_store_set_root( ctx->store, root );
356 0 : }
357 0 : break;
358 0 : }
359 0 : case FD_STORE_SLOT_PREPARE_NEED_PARENT_EXEC: {
360 0 : break;
361 0 : }
362 0 : case FD_STORE_SLOT_PREPARE_NEED_REPAIR: {
363 0 : repair_req_cnt = fd_store_slot_repair( ctx->store, slot, repair_reqs, MAX_REPAIR_REQS );
364 0 : break;
365 0 : }
366 0 : case FD_STORE_SLOT_PREPARE_NEED_ORPHAN: {
367 0 : fd_repair_request_t * repair_req = &repair_reqs[0];
368 0 : repair_req->slot = slot;
369 0 : repair_req->shred_index = UINT_MAX;
370 0 : repair_req->type = FD_REPAIR_REQ_TYPE_NEED_ORPHAN;
371 0 : repair_req_cnt = 1;
372 0 : break;
373 0 : }
374 0 : case FD_STORE_SLOT_PREPARE_ALREADY_EXECUTED: {
375 0 : return;
376 0 : }
377 0 : default: {
378 0 : FD_LOG_ERR(( "unrecognized store slot prepare mode" ));
379 0 : return;
380 0 : }
381 0 : }
382 :
383 0 : if( store_slot_prepare_mode == FD_STORE_SLOT_PREPARE_CONTINUE ) {
384 :
385 0 : if ( FD_UNLIKELY( ctx->sim && slot>=ctx->sim_end_slot ) ) {
386 0 : FD_LOG_ERR(( "Finished simulation to slot %lu", ctx->sim_end_slot ));
387 0 : }
388 :
389 0 : FD_LOG_NOTICE( ( "\n\n[Store]\n"
390 0 : "slot: %lu\n"
391 0 : "current turbine: %lu\n"
392 0 : "first turbine: %lu\n"
393 0 : "slots behind: %lu\n"
394 0 : "live: %d\n",
395 0 : slot,
396 0 : ctx->store->curr_turbine_slot,
397 0 : ctx->store->first_turbine_slot,
398 0 : ctx->store->curr_turbine_slot - slot,
399 0 : ( ctx->store->curr_turbine_slot - slot ) < 5 ) );
400 :
401 0 : uchar * out_buf = fd_chunk_to_laddr( ctx->replay_out_mem, ctx->replay_out_chunk );
402 :
403 0 : if( !fd_blockstore_shreds_complete( ctx->blockstore, slot ) ) {
404 0 : FD_LOG_ERR(( "could not find block - slot: %lu", slot ));
405 0 : }
406 :
407 0 : ulong parent_slot = fd_blockstore_parent_slot_query( ctx->blockstore, slot );
408 0 : if ( FD_UNLIKELY( parent_slot == FD_SLOT_NULL ) ) FD_LOG_ERR(( "could not find slot %lu meta", slot ));
409 :
410 0 : FD_STORE( ulong, out_buf, parent_slot );
411 0 : out_buf += sizeof(ulong);
412 0 : int err = fd_blockstore_block_hash_query( ctx->blockstore, slot, (fd_hash_t *)fd_type_pun( out_buf ) );
413 0 : if( FD_UNLIKELY( err ) ) FD_LOG_ERR(( "could not find slot meta" ));
414 0 : out_buf += sizeof(fd_hash_t);
415 :
416 0 : FD_SCRATCH_SCOPE_BEGIN {
417 0 : ctx->metrics.first_turbine_slot = ctx->store->first_turbine_slot;
418 0 : ctx->metrics.current_turbine_slot = ctx->store->curr_turbine_slot;
419 :
420 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
421 0 : ulong caught_up_flag = (ctx->store->curr_turbine_slot - slot)<4 ? 0UL : REPLAY_FLAG_CATCHING_UP;
422 0 : ulong replay_sig = fd_disco_replay_old_sig( slot, REPLAY_FLAG_MICROBLOCK | caught_up_flag );
423 :
424 0 : ulong txn_cnt = 0;
425 0 : if( FD_UNLIKELY( fd_trusted_slots_find( ctx->trusted_slots, slot ) ) ) {
426 : /* if is caught up and is leader */
427 0 : replay_sig = fd_disco_replay_old_sig( slot, REPLAY_FLAG_FINISHED_BLOCK );
428 0 : FD_LOG_INFO(( "packed block prepared - slot: %lu", slot ));
429 0 : } else {
430 0 : replay_sig = fd_disco_replay_old_sig( slot, REPLAY_FLAG_FINISHED_BLOCK | REPLAY_FLAG_MICROBLOCK | caught_up_flag );
431 0 : }
432 :
433 0 : out_buf += sizeof(ulong);
434 :
435 0 : ulong out_sz = sizeof(ulong) + sizeof(fd_hash_t) + ( txn_cnt * sizeof(fd_txn_p_t) );
436 0 : fd_stem_publish( stem, 0UL, replay_sig, ctx->replay_out_chunk, txn_cnt, 0UL, tsorig, tspub );
437 0 : ctx->replay_out_chunk = fd_dcache_compact_next( ctx->replay_out_chunk, out_sz, ctx->replay_out_chunk0, ctx->replay_out_wmark );
438 0 : } FD_SCRATCH_SCOPE_END;
439 0 : }
440 :
441 0 : if( repair_req_cnt != 0 ) {
442 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
443 0 : ulong repair_req_sig = 50UL;
444 0 : ulong repair_req_sz = repair_req_cnt * sizeof(fd_repair_request_t);
445 0 : FD_TEST( repair_req_sz<=USHORT_MAX );
446 0 : fd_mcache_publish( ctx->repair_req_out_mcache, ctx->repair_req_out_depth, ctx->repair_req_out_seq, repair_req_sig, ctx->repair_req_out_chunk,
447 0 : repair_req_sz, 0UL, tsorig, tspub );
448 0 : ctx->repair_req_out_seq = fd_seq_inc( ctx->repair_req_out_seq, 1UL );
449 0 : ctx->repair_req_out_chunk = fd_dcache_compact_next( ctx->repair_req_out_chunk, repair_req_sz, ctx->repair_req_out_chunk0, ctx->repair_req_out_wmark );
450 0 : }
451 :
452 0 : return;
453 0 : }
454 :
455 : static void
456 : after_credit( fd_store_tile_ctx_t * ctx,
457 : fd_stem_context_t * stem,
458 : int * opt_poll_in FD_PARAM_UNUSED,
459 0 : int * charge_busy ) {
460 : /* TODO: Don't charge the tile as busy if after_credit isn't actually
461 : doing any work. */
462 0 : *charge_busy = 1;
463 :
464 0 : fd_mcache_seq_update( ctx->replay_out_sync, ctx->replay_out_seq );
465 0 : fd_mcache_seq_update( ctx->repair_req_out_sync, ctx->repair_req_out_seq );
466 :
467 0 : if( FD_UNLIKELY( ctx->sim &&
468 0 : ctx->store->pending_slots->start == ctx->store->pending_slots->end ) ) {
469 : // FD_LOG_WARNING(( "Sim is complete." ));
470 0 : }
471 :
472 0 : for( ulong i = 0; i<fd_txn_iter_map_slot_cnt(); i++ ) {
473 0 : if( ctx->txn_iter_map[i].slot != FD_SLOT_NULL ) {
474 0 : fd_store_tile_slot_prepare( ctx, stem, FD_STORE_SLOT_PREPARE_CONTINUE, ctx->txn_iter_map[i].slot );
475 0 : }
476 0 : }
477 :
478 0 : for( ulong i = fd_pending_slots_iter_init( ctx->store->pending_slots );
479 0 : (i = fd_pending_slots_iter_next( ctx->store->pending_slots, ctx->store->now, i )) != ULONG_MAX; ) {
480 0 : ulong repair_slot = FD_SLOT_NULL;
481 0 : int store_slot_prepare_mode = fd_store_slot_prepare( ctx->store, i, &repair_slot );
482 :
483 0 : ulong slot = repair_slot == 0 ? i : repair_slot;
484 0 : FD_LOG_DEBUG(( "store slot - mode: %d, slot: %lu, repair_slot: %lu", store_slot_prepare_mode, i, repair_slot ));
485 0 : fd_store_tile_slot_prepare( ctx, stem, store_slot_prepare_mode, slot );
486 :
487 0 : if( FD_UNLIKELY( ctx->restart_heaviest_fork_slot &&
488 0 : i==ctx->restart_heaviest_fork_slot ) ) {
489 0 : if( FD_LIKELY( store_slot_prepare_mode!=FD_STORE_SLOT_PREPARE_ALREADY_EXECUTED ) ) {
490 0 : fd_store_add_pending( ctx->store, ctx->restart_heaviest_fork_slot, (long)5e6, 0, 0 );
491 0 : } else {
492 0 : fd_hash_t blk_hash;
493 0 : int err = fd_blockstore_block_hash_query( ctx->blockstore,
494 0 : ctx->restart_heaviest_fork_slot,
495 0 : &blk_hash );
496 0 : if( FD_UNLIKELY( err ) ){
497 0 : FD_LOG_ERR(( "Wen-restart cannot get the block hash of HeaviestForkSlot %lu", ctx->restart_heaviest_fork_slot ));
498 0 : }
499 0 : fd_funk_txn_xid_t xid;
500 0 : fd_memcpy( &xid, &blk_hash, sizeof(fd_funk_txn_xid_t) );
501 0 : xid.ul[0] = ctx->restart_heaviest_fork_slot;
502 :
503 : /* Send xid to restart tile */
504 0 : uchar * buf = fd_chunk_to_laddr( ctx->restart_out_mem, ctx->restart_out_chunk );
505 0 : ulong buf_len = sizeof(fd_funk_txn_xid_t);
506 0 : fd_memcpy( buf, &xid, sizeof(fd_funk_txn_xid_t) );
507 0 : fd_mcache_publish( ctx->restart_out_mcache, ctx->restart_out_depth, ctx->restart_out_seq, 1UL, ctx->restart_out_chunk,
508 0 : buf_len, 0UL, 0, 0 );
509 0 : ctx->restart_out_seq = fd_seq_inc( ctx->restart_out_seq, 1UL );
510 0 : ctx->restart_out_chunk = fd_dcache_compact_next( ctx->restart_out_chunk, buf_len, ctx->restart_out_chunk0, ctx->restart_out_wmark );
511 0 : }
512 0 : }
513 0 : }
514 0 : }
515 :
516 : static inline void
517 0 : during_housekeeping( fd_store_tile_ctx_t * ctx ) {
518 0 : ctx->store->now = fd_log_wallclock();
519 0 : }
520 :
521 : static void
522 : privileged_init( fd_topo_t * topo,
523 0 : fd_topo_tile_t * tile ) {
524 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
525 :
526 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
527 0 : fd_store_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_store_tile_ctx_t), sizeof(fd_store_tile_ctx_t) );
528 0 : FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
529 :
530 0 : if( FD_UNLIKELY( !strcmp( tile->store_int.identity_key_path, "" ) ) )
531 0 : FD_LOG_ERR(( "identity_key_path not set" ));
532 :
533 0 : ctx->identity_key[ 0 ] = *(fd_pubkey_t const *)fd_type_pun_const( fd_keyload_load( tile->store_int.identity_key_path, /* pubkey only: */ 1 ) );
534 0 : ctx->blockstore_fd = open( tile->store_int.blockstore_file, O_RDONLY );
535 0 : }
536 :
537 : static void
538 : unprivileged_init( fd_topo_t * topo,
539 0 : fd_topo_tile_t * tile ) {
540 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
541 :
542 0 : if( FD_UNLIKELY( tile->in_cnt < 3 ||
543 0 : strcmp( topo->links[ tile->in_link_id[ STAKE_IN_IDX ] ].name, "stake_out" ) ||
544 0 : strcmp( topo->links[ tile->in_link_id[ REPAIR_IN_IDX ] ].name, "repair_store") ||
545 0 : strcmp( topo->links[ tile->in_link_id[ RESTART_IN_IDX ] ].name,"rstart_store") ) )
546 0 : FD_LOG_ERR(( "store tile has none or unexpected input links %lu %s %s",
547 0 : tile->in_cnt, topo->links[ tile->in_link_id[ 0 ] ].name, topo->links[ tile->in_link_id[ 1 ] ].name ));
548 :
549 0 : if( FD_UNLIKELY( tile->out_cnt != 3 ||
550 0 : strcmp( topo->links[ tile->out_link_id[ REPLAY_OUT_IDX ] ].name, "store_replay" ) ||
551 0 : strcmp( topo->links[ tile->out_link_id[ REPAIR_OUT_IDX ] ].name, "store_repair" ) ||
552 0 : strcmp( topo->links[ tile->out_link_id[ RESTART_OUT_IDX ] ].name, "store_rstart" )) )
553 0 : FD_LOG_ERR(( "store tile has none or unexpected output links %lu %s",
554 0 : tile->out_cnt, topo->links[ tile->out_link_id[ 0 ] ].name ));
555 :
556 : /* Scratch mem setup */
557 :
558 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
559 0 : fd_store_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_store_tile_ctx_t), sizeof(fd_store_tile_ctx_t) );
560 0 : ctx->blockstore = &ctx->blockstore_ljoin;
561 : // TODO: set the lo_mark_slot to the actual snapshot slot!
562 0 : ctx->store = fd_store_join( fd_store_new( FD_SCRATCH_ALLOC_APPEND( l, fd_store_align(), fd_store_footprint() ), 1 ) );
563 0 : ctx->repair_req_buffer = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_repair_request_t), MAX_REPAIR_REQS * sizeof(fd_repair_request_t) );
564 0 : ctx->stake_ci = fd_stake_ci_join( fd_stake_ci_new( FD_SCRATCH_ALLOC_APPEND( l, fd_stake_ci_align(), fd_stake_ci_footprint() ), ctx->identity_key ) );
565 :
566 0 : void * trusted_slots_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_trusted_slots_align(), fd_trusted_slots_footprint( MAX_SLOTS_PER_EPOCH ) );
567 0 : ctx->trusted_slots = fd_trusted_slots_join( fd_trusted_slots_new( trusted_slots_mem, MAX_SLOTS_PER_EPOCH ) );
568 0 : FD_TEST( ctx->trusted_slots!=NULL );
569 :
570 0 : void * iter_map_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_txn_iter_map_align(), fd_txn_iter_map_footprint() );
571 0 : ctx->txn_iter_map = fd_txn_iter_map_join( fd_txn_iter_map_new( iter_map_mem ) );
572 :
573 0 : ctx->wksp = topo->workspaces[ topo->objs[ tile->tile_obj_id ].wksp_id ].wksp;
574 :
575 0 : ulong blockstore_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "blockstore" );
576 0 : FD_TEST( blockstore_obj_id!=ULONG_MAX );
577 0 : ctx->blockstore_wksp = topo->workspaces[ topo->objs[ blockstore_obj_id ].wksp_id ].wksp;
578 :
579 0 : if( ctx->blockstore_wksp == NULL ) {
580 0 : FD_LOG_ERR(( "blockstore_wksp must be defined in topo." ));
581 0 : }
582 :
583 : /**********************************************************************/
584 : /* root_slot fseq */
585 : /**********************************************************************/
586 :
587 0 : ulong root_slot_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "root_slot" );
588 0 : FD_TEST( root_slot_obj_id!=ULONG_MAX );
589 0 : ctx->root_slot_fseq = fd_fseq_join( fd_topo_obj_laddr( topo, root_slot_obj_id ) );
590 0 : if( FD_UNLIKELY( !ctx->root_slot_fseq ) ) FD_LOG_ERR(( "replay tile has no root_slot fseq" ));
591 0 : FD_TEST( ULONG_MAX==fd_fseq_query( ctx->root_slot_fseq ) );
592 :
593 : /* Prevent blockstore from being created until we know the shred version */
594 0 : ulong expected_shred_version = tile->store_int.expected_shred_version;
595 0 : if( FD_LIKELY( !expected_shred_version ) ) {
596 0 : ulong busy_obj_id = fd_pod_query_ulong( topo->props, "poh_shred", ULONG_MAX );
597 0 : FD_TEST( busy_obj_id!=ULONG_MAX );
598 0 : ulong * gossip_shred_version = fd_fseq_join( fd_topo_obj_laddr( topo, busy_obj_id ) );
599 0 : FD_LOG_INFO(( "waiting for shred version to be determined via gossip." ));
600 0 : do {
601 0 : expected_shred_version = fd_fseq_query( gossip_shred_version );
602 0 : } while( expected_shred_version==ULONG_MAX );
603 0 : FD_LOG_NOTICE(( "using shred version %lu", expected_shred_version ));
604 0 : }
605 0 : if( FD_UNLIKELY( expected_shred_version>USHORT_MAX ) ) FD_LOG_ERR(( "invalid shred version %lu", expected_shred_version ));
606 0 : FD_TEST( expected_shred_version );
607 0 : fd_store_expected_shred_version( ctx->store, expected_shred_version );
608 :
609 0 : if( FD_UNLIKELY( strlen( tile->store_int.blockstore_restore ) > 0 ) ) {
610 0 : FD_LOG_NOTICE(( "starting blockstore_wksp restore %s", tile->store_int.blockstore_restore ));
611 0 : int rc = fd_wksp_restore( ctx->blockstore_wksp, tile->store_int.blockstore_restore, (uint)FD_BLOCKSTORE_MAGIC );
612 0 : if( rc ) {
613 0 : FD_LOG_ERR(( "failed to restore %s: error %d.", tile->store_int.blockstore_restore, rc ));
614 0 : }
615 0 : FD_LOG_NOTICE(( "finished blockstore_wksp restore %s", tile->store_int.blockstore_restore ));
616 0 : fd_wksp_tag_query_info_t info;
617 0 : ulong tag = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "obj.%lu.wksp_tag", blockstore_obj_id );
618 0 : if( FD_LIKELY( fd_wksp_tag_query( ctx->blockstore_wksp, &tag, 1, &info, 1 ) > 0 ) ) {
619 0 : void * blockstore_mem = fd_wksp_laddr_fast( ctx->blockstore_wksp, info.gaddr_lo );
620 0 : ctx->blockstore = fd_blockstore_join( &ctx->blockstore_ljoin, blockstore_mem );
621 0 : } else {
622 0 : FD_LOG_WARNING(( "failed to find blockstore in workspace. making new blockstore." ));
623 0 : }
624 0 : } else {
625 0 : void * blockstore_shmem = fd_topo_obj_laddr( topo, blockstore_obj_id );
626 0 : if( blockstore_shmem == NULL ) {
627 0 : FD_LOG_ERR(( "failed to find blockstore" ));
628 0 : }
629 :
630 0 : ctx->blockstore = fd_blockstore_join( &ctx->blockstore_ljoin, blockstore_shmem );
631 0 : }
632 :
633 0 : FD_LOG_NOTICE(( "blockstore: %s", tile->store_int.blockstore_file ));
634 :
635 0 : FD_TEST( ctx->blockstore );
636 0 : ctx->store->blockstore = ctx->blockstore;
637 :
638 0 : void * alloc_shmem = fd_wksp_alloc_laddr( ctx->wksp, fd_alloc_align(), fd_alloc_footprint(), 3UL );
639 0 : if( FD_UNLIKELY( !alloc_shmem ) ) {
640 0 : FD_LOG_ERR( ( "fd_alloc too large for workspace" ) );
641 0 : }
642 :
643 : /* Set up stake tile input */
644 0 : fd_topo_link_t * stake_in_link = &topo->links[ tile->in_link_id[ STAKE_IN_IDX ] ];
645 0 : ctx->stake_in_mem = topo->workspaces[ topo->objs[ stake_in_link->dcache_obj_id ].wksp_id ].wksp;
646 0 : ctx->stake_in_chunk0 = fd_dcache_compact_chunk0( ctx->stake_in_mem, stake_in_link->dcache );
647 0 : ctx->stake_in_wmark = fd_dcache_compact_wmark( ctx->stake_in_mem, stake_in_link->dcache, stake_in_link->mtu );
648 :
649 : /* Set up repair tile input */
650 0 : fd_topo_link_t * repair_in_link = &topo->links[ tile->in_link_id[ REPAIR_IN_IDX ] ];
651 0 : ctx->repair_in_mem = topo->workspaces[ topo->objs[ repair_in_link->dcache_obj_id ].wksp_id ].wksp;
652 0 : ctx->repair_in_chunk0 = fd_dcache_compact_chunk0( ctx->repair_in_mem, repair_in_link->dcache );
653 0 : ctx->repair_in_wmark = fd_dcache_compact_wmark( ctx->repair_in_mem, repair_in_link->dcache, repair_in_link->mtu );
654 :
655 : /* Set up replay tile input (for wen-restart) */
656 0 : fd_topo_link_t * restart_in_link = &topo->links[ tile->in_link_id[ RESTART_IN_IDX ] ];
657 0 : ctx->restart_in_mem = topo->workspaces[ topo->objs[ restart_in_link->dcache_obj_id ].wksp_id ].wksp;
658 0 : ctx->restart_in_chunk0 = fd_dcache_compact_chunk0( ctx->restart_in_mem, restart_in_link->dcache );
659 0 : ctx->restart_in_wmark = fd_dcache_compact_wmark( ctx->restart_in_mem, restart_in_link->dcache, restart_in_link->mtu );
660 :
661 : /* Set up ctx states for wen-restart */
662 0 : ctx->restart_funk_root = 0;
663 0 : ctx->restart_heaviest_fork_slot = 0;
664 :
665 : /* Set up shred tile inputs */
666 0 : ctx->shred_in_cnt = tile->in_cnt-NON_SHRED_LINKS;
667 0 : for( ulong i = 0; i<ctx->shred_in_cnt; i++ ) {
668 0 : fd_topo_link_t * shred_in_link = &topo->links[ tile->in_link_id[ i+NON_SHRED_LINKS ] ];
669 0 : ctx->shred_in[ i ].mem = topo->workspaces[ topo->objs[ shred_in_link->dcache_obj_id ].wksp_id ].wksp;
670 0 : ctx->shred_in[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->shred_in[ i ].mem, shred_in_link->dcache );
671 0 : ctx->shred_in[ i ].wmark = fd_dcache_compact_wmark( ctx->shred_in[ i ].mem, shred_in_link->dcache, shred_in_link->mtu );
672 0 : }
673 :
674 : /* Set up repair request output */
675 0 : fd_topo_link_t * repair_req_out = &topo->links[ tile->out_link_id[ REPAIR_OUT_IDX ] ];
676 0 : ctx->repair_req_out_mcache = repair_req_out->mcache;
677 0 : ctx->repair_req_out_sync = fd_mcache_seq_laddr( ctx->repair_req_out_mcache );
678 0 : ctx->repair_req_out_depth = fd_mcache_depth( ctx->repair_req_out_mcache );
679 0 : ctx->repair_req_out_seq = fd_mcache_seq_query( ctx->repair_req_out_sync );
680 0 : ctx->repair_req_out_mem = topo->workspaces[ topo->objs[ repair_req_out->dcache_obj_id ].wksp_id ].wksp;
681 0 : ctx->repair_req_out_chunk0 = fd_dcache_compact_chunk0( ctx->repair_req_out_mem, repair_req_out->dcache );
682 0 : ctx->repair_req_out_wmark = fd_dcache_compact_wmark ( ctx->repair_req_out_mem, repair_req_out->dcache, repair_req_out->mtu );
683 0 : ctx->repair_req_out_chunk = ctx->repair_req_out_chunk0;
684 :
685 : /* Set up replay output */
686 0 : fd_topo_link_t * replay_out = &topo->links[ tile->out_link_id[ REPLAY_OUT_IDX ] ];
687 0 : ctx->replay_out_mcache = replay_out->mcache;
688 0 : ctx->replay_out_sync = fd_mcache_seq_laddr( ctx->replay_out_mcache );
689 0 : ctx->replay_out_depth = fd_mcache_depth( ctx->replay_out_mcache );
690 0 : ctx->replay_out_seq = fd_mcache_seq_query( ctx->replay_out_sync );
691 0 : ctx->replay_out_mem = topo->workspaces[ topo->objs[ replay_out->dcache_obj_id ].wksp_id ].wksp;
692 0 : ctx->replay_out_chunk0 = fd_dcache_compact_chunk0( ctx->replay_out_mem, replay_out->dcache );
693 0 : ctx->replay_out_wmark = fd_dcache_compact_wmark ( ctx->replay_out_mem, replay_out->dcache, replay_out->mtu );
694 0 : ctx->replay_out_chunk = ctx->replay_out_chunk0;
695 :
696 : /* Set up replay output */
697 0 : fd_topo_link_t * restart_out = &topo->links[ tile->out_link_id[ RESTART_OUT_IDX ] ];
698 0 : ctx->restart_out_mcache = restart_out->mcache;
699 0 : ctx->restart_out_sync = fd_mcache_seq_laddr( ctx->restart_out_mcache );
700 0 : ctx->restart_out_depth = fd_mcache_depth( ctx->restart_out_mcache );
701 0 : ctx->restart_out_seq = fd_mcache_seq_query( ctx->restart_out_sync );
702 0 : ctx->restart_out_mem = topo->workspaces[ topo->objs[ restart_out->dcache_obj_id ].wksp_id ].wksp;
703 0 : ctx->restart_out_chunk0 = fd_dcache_compact_chunk0( ctx->restart_out_mem, restart_out->dcache );
704 0 : ctx->restart_out_wmark = fd_dcache_compact_wmark ( ctx->restart_out_mem, restart_out->dcache, restart_out->mtu );
705 0 : ctx->restart_out_chunk = ctx->restart_out_chunk0;
706 :
707 0 : void * smem = FD_SCRATCH_ALLOC_APPEND( l, fd_scratch_smem_align(), fd_scratch_smem_footprint( SCRATCH_SMAX ) );
708 0 : void * fmem = FD_SCRATCH_ALLOC_APPEND( l, fd_scratch_fmem_align(), fd_scratch_fmem_footprint( SCRATCH_SDEPTH ) );
709 :
710 : /* Create scratch region */
711 0 : FD_TEST( (!!smem) & (!!fmem) );
712 0 : fd_scratch_attach( smem, fmem, SCRATCH_SMAX, SCRATCH_SDEPTH );
713 :
714 0 : ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
715 0 : if( FD_UNLIKELY( scratch_top != (ulong)scratch + scratch_footprint( tile ) ) ) {
716 0 : FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
717 0 : }
718 :
719 0 : if( FD_UNLIKELY( strlen( tile->store_int.slots_pending ) > 0 ) ) {
720 0 : ctx->sim = 1;
721 :
722 0 : const char * split = strchr( tile->store_int.slots_pending, '-' );
723 0 : FD_TEST( split != NULL && *( split + 1 ) != '\0' );
724 0 : const char * snapshot_slot_str = split + 1;
725 0 : char * endptr;
726 0 : ulong snapshot_slot = strtoul( snapshot_slot_str, &endptr, 10 );
727 :
728 0 : FILE * file = fopen( tile->store_int.slots_pending, "r" );
729 0 : char buf[20]; /* max # of digits for a ulong */
730 :
731 0 : ulong cnt = 1;
732 0 : FD_TEST( fd_blockstore_block_info_remove( ctx->blockstore, snapshot_slot ) );
733 :
734 0 : while( fgets( buf, sizeof( buf ), file ) ) {
735 0 : char * endptr;
736 0 : ulong slot = strtoul( buf, &endptr, 10 );
737 0 : fd_block_map_query_t query[1] = { 0 };
738 0 : int err = fd_block_map_prepare( ctx->blockstore->block_map, &slot, NULL, query, FD_MAP_FLAG_BLOCKING );
739 0 : fd_block_info_t * block_map_entry = fd_block_map_query_ele( query );
740 0 : if( err || block_map_entry->slot != slot ) {
741 0 : FD_LOG_ERR(( "init: slot %lu does not match block_map_entry->slot %lu", slot, block_map_entry->slot ));
742 0 : }
743 0 : block_map_entry->flags = 0;
744 0 : fd_block_map_publish( query );
745 0 : fd_store_add_pending( ctx->store, slot, (long)cnt++, 0, 0 );
746 0 : }
747 0 : fclose( file );
748 0 : }
749 :
750 0 : ctx->shred_cap_ctx.is_archive = 0;
751 0 : ctx->shred_cap_ctx.stable_slot_end = 0;
752 0 : ctx->shred_cap_ctx.stable_slot_start = 0;
753 0 : if( strlen( tile->store_int.shred_cap_archive ) > 0 ) {
754 0 : ctx->shred_cap_ctx.is_archive = 1;
755 0 : ctx->shred_cap_ctx.shred_cap_fileno = open( tile->store_int.shred_cap_archive,
756 0 : O_WRONLY | O_CREAT,
757 0 : S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH );
758 0 : if( ctx->shred_cap_ctx.shred_cap_fileno==-1 ) FD_LOG_ERR(( "failed at opening the shredcap file" ));
759 0 : } else if( strlen( tile->store_int.shred_cap_replay )>0 ) {
760 0 : ctx->sim = 1;
761 0 : ctx->sim_end_slot = tile->store_int.shred_cap_end_slot;
762 0 : FD_LOG_WARNING(( "simulating to slot %lu", ctx->sim_end_slot ));
763 0 : ctx->store->blockstore->shmem->wmk = 0UL;
764 0 : while( ctx->store->blockstore->shmem->wmk==0UL ) {
765 0 : FD_LOG_DEBUG(( "Waiting for blockstore to be initialized" ));
766 0 : }
767 0 : FD_TEST( fd_shred_cap_replay( tile->store_int.shred_cap_replay, ctx->store ) == FD_SHRED_CAP_OK );
768 0 : }
769 :
770 0 : }
771 :
772 : static ulong
773 : populate_allowed_seccomp( fd_topo_t const * topo,
774 : fd_topo_tile_t const * tile,
775 : ulong out_cnt,
776 0 : struct sock_filter * out ) {
777 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
778 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
779 0 : fd_store_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_store_tile_ctx_t), sizeof(fd_store_tile_ctx_t) );
780 0 : FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
781 :
782 0 : populate_sock_filter_policy_fd_storei_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->blockstore_fd );
783 0 : return sock_filter_policy_fd_storei_tile_instr_cnt;
784 0 : }
785 :
786 : static ulong
787 : populate_allowed_fds( fd_topo_t const * topo,
788 : fd_topo_tile_t const * tile,
789 : ulong out_fds_cnt,
790 0 : int * out_fds ) {
791 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
792 :
793 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
794 0 : fd_store_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_store_tile_ctx_t), sizeof(fd_store_tile_ctx_t) );
795 0 : FD_SCRATCH_ALLOC_FINI( l, sizeof(fd_store_tile_ctx_t) );
796 :
797 0 : if( FD_UNLIKELY( out_fds_cnt<3UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
798 :
799 0 : ulong out_cnt = 0UL;
800 0 : out_fds[ out_cnt++ ] = STDERR_FILENO;
801 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
802 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
803 0 : out_fds[ out_cnt++ ] = ctx->blockstore_fd;
804 0 : return out_cnt;
805 0 : }
806 :
807 : static inline void
808 0 : metrics_write( fd_store_tile_ctx_t * ctx ) {
809 0 : FD_MGAUGE_SET( STOREI, CURRENT_TURBINE_SLOT, ctx->metrics.current_turbine_slot );
810 0 : FD_MGAUGE_SET( STOREI, FIRST_TURBINE_SLOT, ctx->metrics.first_turbine_slot );
811 0 : }
812 :
813 0 : #define STEM_BURST (1UL)
814 :
815 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_store_tile_ctx_t
816 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_store_tile_ctx_t)
817 :
818 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
819 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
820 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
821 0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
822 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
823 :
824 : #include "../../disco/stem/fd_stem.c"
825 :
826 : fd_topo_run_tile_t fd_tile_store_int = {
827 : .name = "storei",
828 : .loose_footprint = loose_footprint,
829 : .populate_allowed_seccomp = populate_allowed_seccomp,
830 : .populate_allowed_fds = populate_allowed_fds,
831 : .scratch_align = scratch_align,
832 : .scratch_footprint = scratch_footprint,
833 : .privileged_init = privileged_init,
834 : .unprivileged_init = unprivileged_init,
835 : .run = stem_run,
836 : };
|