Line data Source code
1 : /* Store tile manages a blockstore and serves requests to repair and replay. */
2 : #include "fd_store.h"
3 : #include <string.h>
4 : #define _GNU_SOURCE
5 :
6 : #include "generated/fd_storei_tile_seccomp.h"
7 :
8 : #include "fd_trusted_slots.h"
9 : #include "../shred/fd_shred_cap.h"
10 :
11 : #include "../../disco/tiles.h"
12 : #include "../../disco/metrics/fd_metrics.h"
13 : #include "../../flamenco/runtime/fd_blockstore.h"
14 : #include "../../disco/shred/fd_stake_ci.h"
15 : #include "../../disco/keyguard/fd_keyload.h"
16 : #include "../../disco/topo/fd_pod_format.h"
17 : #include "../../disco/archiver/fd_archiver.h"
18 : #include "../../flamenco/runtime/fd_runtime.h"
19 : #include "../../disco/metrics/fd_metrics.h"
20 :
21 : #include <fcntl.h>
22 : #include <unistd.h>
23 : #include <arpa/inet.h>
24 : #include <linux/unistd.h>
25 : #include <sys/random.h>
26 : #include <netdb.h>
27 : #include <netinet/in.h>
28 : #include <sys/socket.h>
29 :
30 : #define MAX_IN_LINKS 32
31 0 : #define IN_KIND_STAKE 0
32 0 : #define IN_KIND_REPAIR 1
33 0 : #define IN_KIND_RSTART 2
34 0 : #define IN_KIND_SHRED 3
35 :
36 : #define MAX_OUT_LINKS 4
37 0 : #define OUT_KIND_REPLAY 0
38 0 : #define OUT_KIND_REPAIR 1
39 0 : #define OUT_KIND_RSTART 2
40 0 : #define OUT_KIND_ARCHIVE 3
41 :
42 : /* TODO: Determine/justify optimal number of repair requests */
43 0 : #define MAX_REPAIR_REQS ( (ulong)USHORT_MAX / sizeof(fd_repair_request_t) )
44 :
45 0 : #define SCRATCH_SMAX (512UL << 21UL)
46 0 : #define SCRATCH_SDEPTH (128UL)
47 :
48 : struct fd_txn_iter {
49 : ulong slot;
50 : fd_raw_block_txn_iter_t iter;
51 : };
52 :
53 : typedef struct fd_txn_iter fd_txn_iter_t;
54 :
55 : #define MAP_NAME fd_txn_iter_map
56 0 : #define MAP_T fd_txn_iter_t
57 : #define MAP_KEY_T ulong
58 0 : #define MAP_KEY slot
59 0 : #define MAP_KEY_NULL FD_SLOT_NULL
60 : #define MAP_KEY_INVAL(k) MAP_KEY_EQUAL(k, FD_SLOT_NULL)
61 : #define MAP_KEY_EQUAL(k0,k1) (k0==k1)
62 : #define MAP_KEY_EQUAL_IS_SLOW 0
63 : #define MAP_MEMOIZE 0
64 : #define MAP_KEY_HASH(key) ((uint)fd_ulong_hash( key ))
65 0 : #define MAP_LG_SLOT_CNT 5
66 : #include "../../util/tmpl/fd_map.c"
67 :
68 : struct fd_store_in_ctx {
69 : fd_wksp_t * mem;
70 : ulong chunk0;
71 : ulong wmark;
72 : };
73 : typedef struct fd_store_in_ctx fd_store_in_ctx_t;
74 :
75 : struct fd_store_tile_metrics {
76 : ulong first_turbine_slot;
77 : ulong current_turbine_slot;
78 : };
79 : typedef struct fd_store_tile_metrics fd_store_tile_metrics_t;
80 : #define FD_STORE_TILE_METRICS_FOOTPRINT ( sizeof( fd_store_tile_metrics_t ) )
81 :
82 : struct fd_store_tile_ctx {
83 : fd_wksp_t * wksp;
84 : fd_wksp_t * blockstore_wksp;
85 :
86 : fd_pubkey_t identity_key[1]; /* Just the public key */
87 :
88 : fd_store_in_ctx_t in[ MAX_IN_LINKS ];
89 : uchar in_kind[ MAX_IN_LINKS ];
90 : uchar out_idx[ MAX_OUT_LINKS ];
91 :
92 : fd_store_t * store;
93 : fd_blockstore_t blockstore_ljoin;
94 : int blockstore_fd; /* file descriptor for archival file */
95 : fd_blockstore_t * blockstore;
96 :
97 : fd_frag_meta_t * repair_req_out_mcache;
98 : ulong * repair_req_out_sync;
99 : ulong repair_req_out_depth;
100 : ulong repair_req_out_seq;
101 :
102 : fd_wksp_t * repair_req_out_mem;
103 : ulong repair_req_out_chunk0;
104 : ulong repair_req_out_wmark;
105 : ulong repair_req_out_chunk;
106 :
107 : fd_frag_meta_t * replay_out_mcache;
108 : ulong * replay_out_sync;
109 : ulong replay_out_depth;
110 : ulong replay_out_seq;
111 :
112 : fd_wksp_t * replay_out_mem;
113 : ulong replay_out_chunk0;
114 : ulong replay_out_wmark;
115 : ulong replay_out_chunk;
116 :
117 : fd_frag_meta_t * restart_out_mcache;
118 : ulong restart_out_depth;
119 : ulong restart_out_seq;
120 :
121 : fd_wksp_t * restart_out_mem;
122 : ulong restart_out_chunk0;
123 : ulong restart_out_wmark;
124 : ulong restart_out_chunk;
125 :
126 : fd_wksp_t * archive_out_mem;
127 : ulong archive_out_chunk0;
128 : ulong archive_out_wmark;
129 : ulong archive_out_chunk;
130 :
131 : fd_shred34_t s34_buffer[1];
132 : uchar shred_buffer[FD_SHRED_MAX_SZ];
133 : fd_txn_p_t pack_buffer[MAX_TXN_PER_MICROBLOCK];
134 :
135 : fd_repair_request_t * repair_req_buffer;
136 :
137 : fd_stake_ci_t * stake_ci;
138 :
139 : ulong * root_slot_fseq;
140 :
141 : int sim;
142 : ulong sim_end_slot;
143 :
144 : fd_shred_cap_ctx_t shred_cap_ctx;
145 :
146 : fd_trusted_slots_t * trusted_slots;
147 : int is_trusted;
148 :
149 : fd_txn_iter_t * txn_iter_map;
150 :
151 : ulong restart_funk_root;
152 : ulong restart_heaviest_fork_slot;
153 :
154 : /* Metrics */
155 : fd_store_tile_metrics_t metrics;
156 :
157 : ulong turbine_cnt;
158 : ulong repair_cnt;
159 : };
160 : typedef struct fd_store_tile_ctx fd_store_tile_ctx_t;
161 :
162 :
163 : FD_FN_CONST static inline ulong
164 0 : scratch_align( void ) {
165 0 : return 128UL;
166 0 : }
167 :
168 : FD_FN_PURE static inline ulong
169 0 : loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
170 0 : return 4UL * FD_SHMEM_GIGANTIC_PAGE_SZ;
171 0 : }
172 :
173 : FD_FN_PURE static inline ulong
174 0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED) {
175 0 : ulong l = FD_LAYOUT_INIT;
176 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_store_tile_ctx_t), sizeof(fd_store_tile_ctx_t) );
177 0 : l = FD_LAYOUT_APPEND( l, fd_store_align(), fd_store_footprint() );
178 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_repair_request_t), MAX_REPAIR_REQS * sizeof(fd_repair_request_t) );
179 0 : l = FD_LAYOUT_APPEND( l, fd_stake_ci_align(), fd_stake_ci_footprint() );
180 0 : l = FD_LAYOUT_APPEND( l, fd_trusted_slots_align(), fd_trusted_slots_footprint( MAX_SLOTS_PER_EPOCH ) );
181 0 : l = FD_LAYOUT_APPEND( l, fd_txn_iter_map_align(), fd_txn_iter_map_footprint() );
182 0 : l = FD_LAYOUT_APPEND( l, fd_scratch_smem_align(), fd_scratch_smem_footprint( SCRATCH_SMAX ) );
183 0 : l = FD_LAYOUT_APPEND( l, fd_scratch_fmem_align(), fd_scratch_fmem_footprint( SCRATCH_SDEPTH ) );
184 0 : return FD_LAYOUT_FINI( l, scratch_align() );
185 0 : }
186 :
187 : static void
188 : during_frag( fd_store_tile_ctx_t * ctx,
189 : ulong in_idx,
190 : ulong seq FD_PARAM_UNUSED,
191 : ulong sig,
192 : ulong chunk,
193 : ulong sz,
194 0 : ulong ctl FD_PARAM_UNUSED ) {
195 0 : return;
196 :
197 0 : if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark ) )
198 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
199 :
200 0 : uchar const * src = fd_chunk_to_laddr( ctx->in[ in_idx ].mem, chunk );
201 :
202 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_STAKE ) ) {
203 0 : fd_stake_ci_stake_msg_init( ctx->stake_ci, src );
204 0 : return;
205 0 : }
206 :
207 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_REPAIR ) ) {
208 0 : if( FD_UNLIKELY( sz>FD_SHRED_MAX_SZ ) ) FD_LOG_ERR(( "invalid in frag sz %lu", sz ));
209 :
210 0 : memcpy( ctx->shred_buffer, src, sz );
211 0 : return;
212 0 : }
213 :
214 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_RSTART ) ) {
215 0 : if( FD_UNLIKELY( sz>sizeof(ulong)*2 ) ) FD_LOG_ERR(( "invalid in frag sz %lu", sz ));
216 :
217 0 : FD_TEST( sz==sizeof(ulong)*2 );
218 0 : if( FD_UNLIKELY( ctx->restart_heaviest_fork_slot!=0 ) ) {
219 0 : FD_LOG_ERR(( "Store tile should only receive heaviest_fork_slot once during wen-restart. Something may have corrupted." ));
220 0 : }
221 0 : ctx->restart_heaviest_fork_slot = FD_LOAD( ulong, src );
222 0 : ctx->restart_funk_root = FD_LOAD( ulong, src+sizeof(ulong) );
223 0 : return;
224 0 : }
225 :
226 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_SHRED ) ) {
227 0 : if( FD_UNLIKELY( sz>sizeof(fd_shred34_t) ) ) FD_LOG_ERR(( "invalid in frag sz %lu", sz ));
228 :
229 0 : ctx->is_trusted = sig==1;
230 0 : memcpy( ctx->s34_buffer, src, sz );
231 0 : }
232 0 : }
233 :
234 : static void
235 : after_frag( fd_store_tile_ctx_t * ctx,
236 : ulong in_idx,
237 : ulong seq FD_PARAM_UNUSED,
238 : ulong sig,
239 : ulong sz,
240 : ulong tsorig,
241 : ulong tspub,
242 0 : fd_stem_context_t * stem ) {
243 0 : return;
244 :
245 0 : ulong const in_kind = ctx->in_kind[ in_idx ];
246 :
247 0 : if( FD_UNLIKELY( in_kind==IN_KIND_STAKE ) ) {
248 0 : fd_stake_ci_stake_msg_fini( ctx->stake_ci );
249 0 : return;
250 0 : }
251 :
252 0 : if( FD_UNLIKELY( ctx->archive_out_mem &&
253 0 : ( in_kind==IN_KIND_REPAIR || in_kind==IN_KIND_SHRED ) ) ) {
254 0 : ulong new_sig;
255 0 : if( in_kind==IN_KIND_REPAIR ) {
256 0 : fd_memcpy( fd_chunk_to_laddr( ctx->archive_out_mem, ctx->archive_out_chunk ), ctx->shred_buffer, sz );
257 0 : new_sig = FD_ARCHIVER_SIG_MARK_REPAIR(sig);
258 0 : } else if( in_kind==IN_KIND_SHRED ) {
259 0 : fd_memcpy( fd_chunk_to_laddr( ctx->archive_out_mem, ctx->archive_out_chunk ), ctx->s34_buffer, sz );
260 0 : new_sig = FD_ARCHIVER_SIG_MARK_SHRED(sig);
261 0 : }
262 0 : fd_stem_publish( stem, ctx->out_idx[ OUT_KIND_ARCHIVE ], new_sig, ctx->archive_out_chunk, sz, 0UL, tsorig, tspub );
263 0 : ctx->archive_out_chunk = fd_dcache_compact_next( ctx->archive_out_chunk, sz, ctx->archive_out_chunk0, ctx->archive_out_wmark );
264 0 : }
265 :
266 0 : if( FD_UNLIKELY( in_kind==IN_KIND_REPAIR ) ) {
267 0 : fd_shred_t const * shred = (fd_shred_t const *)fd_type_pun_const( ctx->shred_buffer );
268 0 : if( !fd_pending_slots_check( ctx->store->pending_slots, shred->slot ) ) {
269 0 : FD_LOG_WARNING(("received repair shred %lu that would overrun pending queue. skipping.", shred->slot));
270 0 : return;
271 0 : }
272 :
273 0 : if( FD_UNLIKELY( (long)(ctx->store->curr_turbine_slot - shred->slot) > (long)8192 ) ) {
274 0 : FD_LOG_WARNING(("received repair shred with slot %lu that would overrun pending queue. skipping.", shred->slot));
275 0 : return;
276 0 : }
277 :
278 0 : if( fd_store_shred_insert( ctx->store, shred ) < FD_BLOCKSTORE_SUCCESS ) {
279 0 : FD_LOG_ERR(( "failed inserting to blockstore" ));
280 0 : } else if ( ctx->shred_cap_ctx.is_archive ) {
281 0 : uchar shred_cap_flag = FD_SHRED_CAP_FLAG_MARK_REPAIR( 0 );
282 0 : if( fd_shred_cap_archive( &ctx->shred_cap_ctx, shred, shred_cap_flag ) < FD_SHRED_CAP_OK ) {
283 0 : FD_LOG_ERR( ( "failed at archiving repair shred to file" ) );
284 0 : }
285 0 : }
286 0 : ctx->repair_cnt++;
287 0 : return;
288 0 : }
289 :
290 0 : if( FD_UNLIKELY( in_kind==IN_KIND_RSTART ) ) {
291 0 : FD_LOG_NOTICE(( "Store tile starts to repair backwards from slot%lu, which should be on the same fork as slot%lu",
292 0 : ctx->restart_heaviest_fork_slot, ctx->restart_funk_root ));
293 0 : fd_store_add_pending( ctx->store, ctx->restart_heaviest_fork_slot, (long)5e6, 0, 0 );
294 0 : return;
295 0 : }
296 :
297 : /* everything else is shred */
298 : //FD_TEST( (ctx->s34_buffer->shred_cnt>0UL) & (ctx->s34_buffer->shred_cnt<=34UL) );
299 :
300 0 : if( FD_UNLIKELY( ctx->is_trusted ) ) {
301 : /* this slot is coming from our leader pipeline */
302 0 : fd_trusted_slots_add( ctx->trusted_slots, ctx->s34_buffer->pkts[ 0 ].shred.slot );
303 0 : }
304 0 : for( ulong i = 0; i < ctx->s34_buffer->shred_cnt; i++ ) {
305 0 : fd_shred_t * shred = &ctx->s34_buffer->pkts[i].shred;
306 : // TODO: these checks are not great as they assume a lot about the distance of shreds.
307 0 : if( !fd_pending_slots_check( ctx->store->pending_slots, shred->slot ) ) {
308 0 : FD_LOG_WARNING(("received shred %lu that would overrun pending queue. skipping.", shred->slot));
309 0 : continue;
310 0 : }
311 :
312 0 : if( FD_UNLIKELY( (long)(ctx->store->curr_turbine_slot - shred->slot) > (long)8192 ) ) {
313 0 : FD_LOG_WARNING(("received shred with slot %lu that would overrun pending queue. skipping.", shred->slot));
314 0 : continue;
315 0 : }
316 : // TODO: improve return value of api to not use < OK
317 :
318 0 : if( fd_store_shred_insert( ctx->store, shred ) < FD_BLOCKSTORE_SUCCESS ) {
319 0 : FD_LOG_ERR(( "failed inserting to blockstore" ));
320 0 : } else if ( ctx->shred_cap_ctx.is_archive ) {
321 0 : uchar shred_cap_flag = FD_SHRED_CAP_FLAG_MARK_TURBINE(0);
322 0 : if ( fd_shred_cap_archive(&ctx->shred_cap_ctx, shred, shred_cap_flag) < FD_SHRED_CAP_OK ) {
323 0 : FD_LOG_ERR(( "failed at archiving turbine shred to file" ));
324 0 : }
325 0 : }
326 0 : ctx->turbine_cnt++;
327 :
328 0 : fd_store_shred_update_with_shred_from_turbine( ctx->store, shred );
329 0 : }
330 0 : }
331 :
332 : static void
333 : fd_store_tile_slot_prepare( fd_store_tile_ctx_t * ctx,
334 : fd_stem_context_t * stem,
335 : int store_slot_prepare_mode,
336 0 : ulong slot ) {
337 0 : ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
338 0 : fd_repair_request_t * repair_reqs = fd_chunk_to_laddr( ctx->repair_req_out_mem, ctx->repair_req_out_chunk );
339 : /* We are leader at this slot and the slot is newer than turbine! */
340 : // FIXME: I dont think that this `ctx->store->curr_turbine_slot >= slot`
341 : // check works on fork switches to lower slot numbers. Use a given fork height
342 : // instead
343 : // if( ctx->store->curr_turbine_slot >= slot
344 : // && memcmp( ctx->identity_key, slot_leader, sizeof(fd_pubkey_t) ) == 0 ) {
345 : // if( store_slot_prepare_mode == FD_STORE_SLOT_PREPARE_CONTINUE ) {
346 : // fd_block_t * block = fd_blockstore_block_query( ctx->blockstore, slot );
347 : // if( FD_LIKELY( block ) ) {
348 : // block->flags = fd_uchar_set_bit( block->flags, FD_BLOCK_FLAG_PROCESSED );
349 : // }
350 : // } else {
351 : // return;
352 : // }
353 : // }
354 :
355 0 : ulong repair_req_cnt = 0;
356 0 : switch( store_slot_prepare_mode ) {
357 0 : case FD_STORE_SLOT_PREPARE_CONTINUE: {
358 0 : ulong root = fd_fseq_query( ctx->root_slot_fseq );
359 0 : if( root!=ULONG_MAX ) {
360 : // FD_LOG_WARNING(("CONTINUE: %lu", root));
361 0 : fd_store_set_root( ctx->store, root );
362 0 : }
363 0 : break;
364 0 : }
365 0 : case FD_STORE_SLOT_PREPARE_NEED_PARENT_EXEC: {
366 0 : break;
367 0 : }
368 0 : case FD_STORE_SLOT_PREPARE_NEED_REPAIR: {
369 0 : repair_req_cnt = fd_store_slot_repair( ctx->store, slot, repair_reqs, MAX_REPAIR_REQS );
370 0 : break;
371 0 : }
372 0 : case FD_STORE_SLOT_PREPARE_NEED_ORPHAN: {
373 0 : fd_repair_request_t * repair_req = &repair_reqs[0];
374 0 : repair_req->slot = slot;
375 0 : repair_req->shred_index = UINT_MAX;
376 0 : repair_req->type = FD_REPAIR_REQ_TYPE_NEED_ORPHAN;
377 0 : repair_req_cnt = 1;
378 0 : break;
379 0 : }
380 0 : case FD_STORE_SLOT_PREPARE_ALREADY_EXECUTED: {
381 0 : return;
382 0 : }
383 0 : default: {
384 0 : FD_LOG_ERR(( "unrecognized store slot prepare mode" ));
385 0 : return;
386 0 : }
387 0 : }
388 :
389 0 : if( store_slot_prepare_mode == FD_STORE_SLOT_PREPARE_CONTINUE ) {
390 :
391 0 : if ( FD_UNLIKELY( ctx->sim && slot>=ctx->sim_end_slot ) ) {
392 0 : FD_LOG_ERR(( "Finished simulation to slot %lu", ctx->sim_end_slot ));
393 0 : }
394 :
395 0 : FD_LOG_NOTICE( ( "\n\n[Store]\n"
396 0 : "slot: %lu\n"
397 0 : "current turbine: %lu\n"
398 0 : "first turbine: %lu\n"
399 0 : "slots behind: %lu\n"
400 0 : "live: %d\n",
401 0 : slot,
402 0 : ctx->store->curr_turbine_slot,
403 0 : ctx->store->first_turbine_slot,
404 0 : ctx->store->curr_turbine_slot - slot,
405 0 : ( ctx->store->curr_turbine_slot - slot ) < 5 ) );
406 :
407 0 : if( !fd_blockstore_shreds_complete( ctx->blockstore, slot ) ) {
408 0 : FD_LOG_ERR(( "could not find block - slot: %lu", slot ));
409 0 : }
410 :
411 0 : ulong parent_slot = fd_blockstore_parent_slot_query( ctx->blockstore, slot );
412 0 : if ( FD_UNLIKELY( parent_slot == FD_SLOT_NULL ) ) FD_LOG_ERR(( "could not find slot %lu meta", slot ));
413 :
414 0 : FD_SCRATCH_SCOPE_BEGIN {
415 0 : ctx->metrics.first_turbine_slot = ctx->store->first_turbine_slot;
416 0 : ctx->metrics.current_turbine_slot = ctx->store->curr_turbine_slot;
417 :
418 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
419 0 : ulong caught_up_flag = (ctx->store->curr_turbine_slot - slot)<4 ? 0UL : REPLAY_FLAG_CATCHING_UP;
420 0 : ulong replay_sig = fd_disco_replay_old_sig( slot, REPLAY_FLAG_MICROBLOCK | caught_up_flag );
421 :
422 0 : if( FD_UNLIKELY( fd_trusted_slots_find( ctx->trusted_slots, slot ) ) ) {
423 : /* if is caught up and is leader */
424 0 : replay_sig = fd_disco_replay_old_sig( slot, REPLAY_FLAG_FINISHED_BLOCK );
425 0 : FD_LOG_INFO(( "packed block prepared - slot: %lu", slot ));
426 0 : } else {
427 0 : replay_sig = fd_disco_replay_old_sig( slot, REPLAY_FLAG_FINISHED_BLOCK | REPLAY_FLAG_MICROBLOCK | caught_up_flag );
428 0 : }
429 :
430 0 : fd_block_set_t data_complete_idxs[FD_SHRED_BLK_MAX / sizeof(ulong)] = { 0 };
431 0 : ulong buffered_idx = 0;
432 0 : ulong complete_idx = 0;
433 0 : for(;;) { /* speculative query */
434 0 : fd_block_map_query_t query[1] = { 0 };
435 0 : int err = fd_block_map_query_try( ctx->blockstore->block_map, &slot, NULL, query, 0 );
436 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
437 :
438 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) FD_LOG_ERR(( "could not find block - slot: %lu", slot ));
439 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
440 :
441 0 : memcpy( data_complete_idxs, block_info->data_complete_idxs, sizeof(data_complete_idxs) );
442 0 : complete_idx = block_info->data_complete_idx;
443 0 : buffered_idx = block_info->buffered_idx;
444 :
445 0 : if( FD_UNLIKELY( fd_block_map_query_test( query ) == FD_MAP_SUCCESS ) ) break;
446 0 : }
447 :
448 0 : FD_TEST( complete_idx == buffered_idx );
449 :
450 : /* artificially publishing slices in order. Simulates intended repair
451 : tile behavior. */
452 :
453 0 : uint consumed_idx = UINT_MAX;
454 0 : for( uint idx = 0; idx <= buffered_idx; idx++ ) {
455 0 : if( FD_UNLIKELY( fd_block_set_test( data_complete_idxs, idx ) ) ) {
456 0 : uint data_cnt = consumed_idx != UINT_MAX ? idx - consumed_idx : idx + 1;
457 :
458 0 : replay_sig = fd_disco_repair_replay_sig( slot, (ushort)( slot - parent_slot ), data_cnt, complete_idx == idx );
459 0 : fd_stem_publish( stem, ctx->out_idx[ OUT_KIND_REPLAY ], replay_sig, ctx->replay_out_chunk, 0, 0UL, tsorig, tspub );
460 0 : ctx->replay_out_chunk = fd_dcache_compact_next( ctx->replay_out_chunk, 0, ctx->replay_out_chunk0, ctx->replay_out_wmark );
461 0 : consumed_idx = idx;
462 0 : }
463 0 : }
464 :
465 :
466 0 : } FD_SCRATCH_SCOPE_END;
467 0 : }
468 :
469 0 : if( repair_req_cnt != 0 ) {
470 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
471 0 : ulong repair_req_sig = 50UL;
472 0 : ulong repair_req_sz = repair_req_cnt * sizeof(fd_repair_request_t);
473 0 : FD_TEST( repair_req_sz<=USHORT_MAX );
474 0 : fd_mcache_publish( ctx->repair_req_out_mcache, ctx->repair_req_out_depth, ctx->repair_req_out_seq, repair_req_sig, ctx->repair_req_out_chunk,
475 0 : repair_req_sz, 0UL, tsorig, tspub );
476 0 : ctx->repair_req_out_seq = fd_seq_inc( ctx->repair_req_out_seq, 1UL );
477 0 : ctx->repair_req_out_chunk = fd_dcache_compact_next( ctx->repair_req_out_chunk, repair_req_sz, ctx->repair_req_out_chunk0, ctx->repair_req_out_wmark );
478 0 : }
479 :
480 0 : return;
481 0 : }
482 :
483 : static void
484 : after_credit( fd_store_tile_ctx_t * ctx,
485 : fd_stem_context_t * stem,
486 : int * opt_poll_in FD_PARAM_UNUSED,
487 0 : int * charge_busy ) {
488 : /* TODO: Don't charge the tile as busy if after_credit isn't actually
489 : doing any work. */
490 0 : *charge_busy = 1;
491 :
492 0 : fd_mcache_seq_update( ctx->replay_out_sync, ctx->replay_out_seq );
493 0 : fd_mcache_seq_update( ctx->repair_req_out_sync, ctx->repair_req_out_seq );
494 :
495 0 : if( FD_UNLIKELY( ctx->sim &&
496 0 : ctx->store->pending_slots->start == ctx->store->pending_slots->end ) ) {
497 : // FD_LOG_WARNING(( "Sim is complete." ));
498 0 : }
499 :
500 0 : for( ulong i = 0; i<fd_txn_iter_map_slot_cnt(); i++ ) {
501 0 : if( ctx->txn_iter_map[i].slot != FD_SLOT_NULL ) {
502 0 : fd_store_tile_slot_prepare( ctx, stem, FD_STORE_SLOT_PREPARE_CONTINUE, ctx->txn_iter_map[i].slot );
503 0 : }
504 0 : }
505 :
506 0 : for( ulong i = fd_pending_slots_iter_init( ctx->store->pending_slots );
507 0 : (i = fd_pending_slots_iter_next( ctx->store->pending_slots, ctx->store->now, i )) != ULONG_MAX; ) {
508 0 : ulong repair_slot = FD_SLOT_NULL;
509 0 : int store_slot_prepare_mode = fd_store_slot_prepare( ctx->store, i, &repair_slot );
510 :
511 0 : ulong slot = repair_slot == 0 ? i : repair_slot;
512 0 : FD_LOG_DEBUG(( "store slot - mode: %d, slot: %lu, repair_slot: %lu", store_slot_prepare_mode, i, repair_slot ));
513 0 : fd_store_tile_slot_prepare( ctx, stem, store_slot_prepare_mode, slot );
514 :
515 0 : if( FD_UNLIKELY( ctx->restart_heaviest_fork_slot &&
516 0 : i==ctx->restart_heaviest_fork_slot ) ) {
517 0 : if( FD_LIKELY( store_slot_prepare_mode!=FD_STORE_SLOT_PREPARE_ALREADY_EXECUTED ) ) {
518 0 : fd_store_add_pending( ctx->store, ctx->restart_heaviest_fork_slot, (long)5e6, 0, 0 );
519 0 : } else if( ctx->restart_out_mem ) {
520 0 : fd_hash_t blk_hash;
521 0 : int err = fd_blockstore_block_hash_query( ctx->blockstore,
522 0 : ctx->restart_heaviest_fork_slot,
523 0 : &blk_hash );
524 0 : if( FD_UNLIKELY( err ) ){
525 0 : FD_LOG_ERR(( "Wen-restart cannot get the block hash of HeaviestForkSlot %lu", ctx->restart_heaviest_fork_slot ));
526 0 : }
527 0 : fd_funk_txn_xid_t xid;
528 0 : fd_memcpy( &xid, blk_hash.uc, sizeof(fd_funk_txn_xid_t) );
529 0 : xid.ul[0] = ctx->restart_heaviest_fork_slot;
530 :
531 : /* Send xid to restart tile */
532 0 : uchar * buf = fd_chunk_to_laddr( ctx->restart_out_mem, ctx->restart_out_chunk );
533 0 : ulong buf_len = sizeof(fd_funk_txn_xid_t);
534 0 : fd_memcpy( buf, &xid, sizeof(fd_funk_txn_xid_t) );
535 0 : fd_mcache_publish( ctx->restart_out_mcache, ctx->restart_out_depth, ctx->restart_out_seq, 1UL, ctx->restart_out_chunk,
536 0 : buf_len, 0UL, 0, 0 );
537 0 : ctx->restart_out_seq = fd_seq_inc( ctx->restart_out_seq, 1UL );
538 0 : ctx->restart_out_chunk = fd_dcache_compact_next( ctx->restart_out_chunk, buf_len, ctx->restart_out_chunk0, ctx->restart_out_wmark );
539 0 : }
540 0 : }
541 0 : }
542 0 : }
543 :
544 : static inline void
545 0 : during_housekeeping( fd_store_tile_ctx_t * ctx ) {
546 0 : ctx->store->now = fd_log_wallclock();
547 0 : }
548 :
549 : static void
550 : privileged_init( fd_topo_t * topo,
551 0 : fd_topo_tile_t * tile ) {
552 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
553 :
554 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
555 0 : fd_store_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_store_tile_ctx_t), sizeof(fd_store_tile_ctx_t) );
556 0 : FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
557 0 : memset( ctx, 0, sizeof(fd_store_tile_ctx_t) );
558 :
559 0 : if( FD_UNLIKELY( !strcmp( tile->store_int.identity_key_path, "" ) ) )
560 0 : FD_LOG_ERR(( "identity_key_path not set" ));
561 :
562 0 : ctx->identity_key[ 0 ] = *(fd_pubkey_t const *)fd_type_pun_const( fd_keyload_load( tile->store_int.identity_key_path, /* pubkey only: */ 1 ) );
563 0 : ctx->blockstore_fd = open( tile->store_int.blockstore_file, O_RDONLY );
564 0 : }
565 :
566 : static void
567 : unprivileged_init( fd_topo_t * topo,
568 0 : fd_topo_tile_t * tile ) {
569 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
570 :
571 : /* Scratch mem setup */
572 :
573 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
574 0 : fd_store_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_store_tile_ctx_t), sizeof(fd_store_tile_ctx_t) );
575 0 : ctx->blockstore = &ctx->blockstore_ljoin;
576 : // TODO: set the lo_mark_slot to the actual snapshot slot!
577 0 : ctx->store = fd_store_join( fd_store_new( FD_SCRATCH_ALLOC_APPEND( l, fd_store_align(), fd_store_footprint() ), 1 ) );
578 0 : ctx->repair_req_buffer = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_repair_request_t), MAX_REPAIR_REQS * sizeof(fd_repair_request_t) );
579 0 : ctx->stake_ci = fd_stake_ci_join( fd_stake_ci_new( FD_SCRATCH_ALLOC_APPEND( l, fd_stake_ci_align(), fd_stake_ci_footprint() ), ctx->identity_key ) );
580 :
581 0 : void * trusted_slots_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_trusted_slots_align(), fd_trusted_slots_footprint( MAX_SLOTS_PER_EPOCH ) );
582 0 : ctx->trusted_slots = fd_trusted_slots_join( fd_trusted_slots_new( trusted_slots_mem, MAX_SLOTS_PER_EPOCH ) );
583 0 : FD_TEST( ctx->trusted_slots!=NULL );
584 :
585 0 : void * iter_map_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_txn_iter_map_align(), fd_txn_iter_map_footprint() );
586 0 : ctx->txn_iter_map = fd_txn_iter_map_join( fd_txn_iter_map_new( iter_map_mem ) );
587 :
588 0 : ctx->wksp = topo->workspaces[ topo->objs[ tile->tile_obj_id ].wksp_id ].wksp;
589 :
590 0 : ulong blockstore_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "blockstore" );
591 0 : FD_TEST( blockstore_obj_id!=ULONG_MAX );
592 0 : ctx->blockstore_wksp = topo->workspaces[ topo->objs[ blockstore_obj_id ].wksp_id ].wksp;
593 :
594 0 : if( ctx->blockstore_wksp == NULL ) {
595 0 : FD_LOG_ERR(( "blockstore_wksp must be defined in topo." ));
596 0 : }
597 :
598 0 : FD_TEST( tile->in_cnt<=sizeof( ctx->in )/sizeof( ctx->in[ 0 ] ) );
599 0 : for( ulong i=0UL; i<tile->in_cnt; i++ ) {
600 0 : fd_topo_link_t * link = &topo->links[ tile->in_link_id[ i ] ];
601 0 : fd_topo_wksp_t * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ];
602 :
603 0 : ctx->in[i].mem = link_wksp->wksp;
604 0 : ctx->in[i].chunk0 = fd_dcache_compact_chunk0( ctx->in[i].mem, link->dcache );
605 0 : ctx->in[i].wmark = fd_dcache_compact_wmark ( ctx->in[i].mem, link->dcache, link->mtu );
606 :
607 0 : if( !strcmp( link->name, "stake_out" ) ) {
608 0 : ctx->in_kind[ i ] = IN_KIND_STAKE;
609 0 : } else if( !strcmp( link->name, "repair_store" ) ) {
610 0 : ctx->in_kind[ i ] = IN_KIND_REPAIR;
611 0 : } else if( !strcmp( link->name, "rstart_store" ) ) {
612 0 : ctx->in_kind[ i ] = IN_KIND_RSTART;
613 0 : } else if( !strcmp( link->name, "shred_storei" ) ) {
614 0 : ctx->in_kind[ i ] = IN_KIND_SHRED;
615 0 : } else {
616 0 : FD_LOG_ERR(( "unexpected input link %s", link->name ));
617 0 : }
618 0 : }
619 :
620 0 : FD_TEST( tile->out_cnt<=sizeof( ctx->out_idx )/sizeof( ctx->out_idx[ 0 ] ) );
621 0 : for( ulong i=0UL; i<tile->out_cnt; i++ ) {
622 0 : fd_topo_link_t * link = &topo->links[ tile->out_link_id[ i ] ];
623 0 : fd_wksp_t * link_wksp = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
624 :
625 0 : if( !strcmp( link->name, "store_replay" ) ) {
626 :
627 0 : ctx->out_idx[ OUT_KIND_REPLAY ] = (uchar)i;
628 0 : FD_TEST( !ctx->replay_out_mem );
629 0 : ctx->replay_out_mcache = link->mcache;
630 0 : ctx->replay_out_sync = fd_mcache_seq_laddr( ctx->replay_out_mcache );
631 0 : ctx->replay_out_depth = fd_mcache_depth( ctx->replay_out_mcache );
632 0 : ctx->replay_out_seq = fd_mcache_seq_query( ctx->replay_out_sync );
633 0 : ctx->replay_out_mem = link_wksp;
634 0 : ctx->replay_out_chunk0 = fd_dcache_compact_chunk0( link_wksp, link->dcache );
635 0 : ctx->replay_out_wmark = fd_dcache_compact_wmark ( link_wksp, link->dcache, link->mtu );
636 0 : ctx->replay_out_chunk = ctx->replay_out_chunk0;
637 :
638 0 : } else if( !strcmp( link->name, "store_repair" ) ) {
639 :
640 0 : ctx->out_idx[ OUT_KIND_REPAIR ] = (uchar)i;
641 0 : FD_TEST( !ctx->repair_req_out_mem );
642 0 : ctx->repair_req_out_mcache = link->mcache;
643 0 : ctx->repair_req_out_sync = fd_mcache_seq_laddr( ctx->repair_req_out_mcache );
644 0 : ctx->repair_req_out_depth = fd_mcache_depth( ctx->repair_req_out_mcache );
645 0 : ctx->repair_req_out_seq = fd_mcache_seq_query( ctx->repair_req_out_sync );
646 0 : ctx->repair_req_out_mem = link_wksp;
647 0 : ctx->repair_req_out_chunk0 = fd_dcache_compact_chunk0( link_wksp, link->dcache );
648 0 : ctx->repair_req_out_wmark = fd_dcache_compact_wmark ( link_wksp, link->dcache, link->mtu );
649 0 : ctx->repair_req_out_chunk = ctx->repair_req_out_chunk0;
650 :
651 0 : } else if( !strcmp( link->name, "store_rstart" ) ) {
652 :
653 0 : ctx->out_idx[ OUT_KIND_RSTART ] = (uchar)i;
654 0 : FD_TEST( !ctx->restart_out_mem );
655 0 : ctx->restart_out_mcache = link->mcache;
656 0 : ulong * sync = fd_mcache_seq_laddr( ctx->restart_out_mcache );
657 0 : ctx->restart_out_depth = fd_mcache_depth( ctx->restart_out_mcache );
658 0 : ctx->restart_out_seq = fd_mcache_seq_query( sync );
659 0 : ctx->restart_out_mem = link_wksp;
660 0 : ctx->restart_out_chunk0 = fd_dcache_compact_chunk0( link_wksp, link->dcache );
661 0 : ctx->restart_out_wmark = fd_dcache_compact_wmark ( link_wksp, link->dcache, link->mtu );
662 0 : ctx->restart_out_chunk = ctx->restart_out_chunk0;
663 :
664 0 : } else if( !strcmp( link->name, "store_feeder" ) ||
665 0 : !strcmp( link->name, "storei_notif" ) ) {
666 :
667 0 : ctx->out_idx[ OUT_KIND_ARCHIVE ] = (uchar)i;
668 0 : FD_TEST( !ctx->archive_out_mem );
669 0 : FD_LOG_NOTICE(( "storei tile has output link %s", link->name ));
670 0 : ctx->archive_out_mem = link_wksp;
671 0 : ctx->archive_out_chunk0 = fd_dcache_compact_chunk0( link_wksp, link->dcache );
672 0 : ctx->archive_out_wmark = fd_dcache_compact_wmark ( link_wksp, link->dcache, link->mtu );
673 0 : ctx->archive_out_chunk = ctx->archive_out_chunk0;
674 :
675 0 : } else {
676 0 : FD_LOG_ERR(( "unexpected output link %s", link->name ));
677 0 : }
678 0 : }
679 :
680 0 : if( FD_UNLIKELY( !ctx->replay_out_mem ) ) FD_LOG_ERR(( "Missing replay output link" ));
681 0 : if( FD_UNLIKELY( !ctx->repair_req_out_mem ) ) FD_LOG_ERR(( "Missing repair output link" ));
682 :
683 : /**********************************************************************/
684 : /* root_slot fseq */
685 : /**********************************************************************/
686 :
687 0 : ulong root_slot_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "root_slot" );
688 0 : FD_TEST( root_slot_obj_id!=ULONG_MAX );
689 0 : ctx->root_slot_fseq = fd_fseq_join( fd_topo_obj_laddr( topo, root_slot_obj_id ) );
690 0 : if( FD_UNLIKELY( !ctx->root_slot_fseq ) ) FD_LOG_ERR(( "replay tile has no root_slot fseq" ));
691 0 : FD_TEST( ULONG_MAX==fd_fseq_query( ctx->root_slot_fseq ) );
692 :
693 : /* Prevent blockstore from being created until we know the shred version */
694 0 : ulong expected_shred_version = tile->store_int.expected_shred_version;
695 0 : if( FD_LIKELY( !expected_shred_version ) ) {
696 0 : ulong busy_obj_id = fd_pod_query_ulong( topo->props, "poh_shred", ULONG_MAX );
697 0 : FD_TEST( busy_obj_id!=ULONG_MAX );
698 0 : ulong * gossip_shred_version = fd_fseq_join( fd_topo_obj_laddr( topo, busy_obj_id ) );
699 0 : FD_LOG_INFO(( "waiting for shred version to be determined via gossip." ));
700 0 : do {
701 0 : expected_shred_version = fd_fseq_query( gossip_shred_version );
702 0 : } while( expected_shred_version==ULONG_MAX );
703 0 : FD_LOG_NOTICE(( "using shred version %lu", expected_shred_version ));
704 0 : }
705 0 : if( FD_UNLIKELY( expected_shred_version>USHORT_MAX ) ) FD_LOG_ERR(( "invalid shred version %lu", expected_shred_version ));
706 0 : FD_TEST( expected_shred_version );
707 0 : fd_store_expected_shred_version( ctx->store, expected_shred_version );
708 :
709 0 : if( FD_UNLIKELY( strlen( tile->store_int.blockstore_restore ) > 0 ) ) {
710 0 : FD_LOG_NOTICE(( "starting blockstore_wksp restore %s", tile->store_int.blockstore_restore ));
711 0 : int rc = fd_wksp_restore( ctx->blockstore_wksp, tile->store_int.blockstore_restore, (uint)FD_BLOCKSTORE_MAGIC );
712 0 : if( rc ) {
713 0 : FD_LOG_ERR(( "failed to restore %s: error %d.", tile->store_int.blockstore_restore, rc ));
714 0 : }
715 0 : FD_LOG_NOTICE(( "finished blockstore_wksp restore %s", tile->store_int.blockstore_restore ));
716 0 : fd_wksp_tag_query_info_t info;
717 0 : ulong tag = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "obj.%lu.wksp_tag", blockstore_obj_id );
718 0 : if( FD_LIKELY( fd_wksp_tag_query( ctx->blockstore_wksp, &tag, 1, &info, 1 ) > 0 ) ) {
719 0 : void * blockstore_mem = fd_wksp_laddr_fast( ctx->blockstore_wksp, info.gaddr_lo );
720 0 : ctx->blockstore = fd_blockstore_join( &ctx->blockstore_ljoin, blockstore_mem );
721 0 : } else {
722 0 : FD_LOG_WARNING(( "failed to find blockstore in workspace. making new blockstore." ));
723 0 : }
724 0 : } else {
725 0 : void * blockstore_shmem = fd_topo_obj_laddr( topo, blockstore_obj_id );
726 0 : if( blockstore_shmem == NULL ) {
727 0 : FD_LOG_ERR(( "failed to find blockstore" ));
728 0 : }
729 :
730 0 : ctx->blockstore = fd_blockstore_join( &ctx->blockstore_ljoin, blockstore_shmem );
731 0 : }
732 :
733 0 : FD_LOG_NOTICE(( "blockstore: %s", tile->store_int.blockstore_file ));
734 :
735 0 : FD_TEST( ctx->blockstore );
736 0 : ctx->store->blockstore = ctx->blockstore;
737 :
738 0 : void * alloc_shmem = fd_wksp_alloc_laddr( ctx->wksp, fd_alloc_align(), fd_alloc_footprint(), 3UL );
739 0 : if( FD_UNLIKELY( !alloc_shmem ) ) {
740 0 : FD_LOG_ERR( ( "fd_alloc too large for workspace" ) );
741 0 : }
742 :
743 : /* Set up ctx states for wen-restart */
744 0 : ctx->restart_funk_root = 0;
745 0 : ctx->restart_heaviest_fork_slot = 0;
746 :
747 0 : void * smem = FD_SCRATCH_ALLOC_APPEND( l, fd_scratch_smem_align(), fd_scratch_smem_footprint( SCRATCH_SMAX ) );
748 0 : void * fmem = FD_SCRATCH_ALLOC_APPEND( l, fd_scratch_fmem_align(), fd_scratch_fmem_footprint( SCRATCH_SDEPTH ) );
749 :
750 : /* Create scratch region */
751 0 : FD_TEST( (!!smem) & (!!fmem) );
752 0 : fd_scratch_attach( smem, fmem, SCRATCH_SMAX, SCRATCH_SDEPTH );
753 :
754 0 : ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
755 0 : if( FD_UNLIKELY( scratch_top != (ulong)scratch + scratch_footprint( tile ) ) ) {
756 0 : FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
757 0 : }
758 :
759 0 : if( FD_UNLIKELY( strlen( tile->store_int.slots_pending ) > 0 ) ) {
760 0 : ctx->sim = 1;
761 :
762 0 : const char * split = strchr( tile->store_int.slots_pending, '-' );
763 0 : FD_TEST( split != NULL && *( split + 1 ) != '\0' );
764 0 : const char * snapshot_slot_str = split + 1;
765 0 : char * endptr;
766 0 : ulong snapshot_slot = strtoul( snapshot_slot_str, &endptr, 10 );
767 :
768 0 : FILE * file = fopen( tile->store_int.slots_pending, "r" );
769 0 : char buf[20]; /* max # of digits for a ulong */
770 :
771 0 : ulong cnt = 1;
772 0 : FD_TEST( fd_blockstore_block_info_remove( ctx->blockstore, snapshot_slot ) );
773 :
774 0 : while( fgets( buf, sizeof( buf ), file ) ) {
775 0 : char * endptr;
776 0 : ulong slot = strtoul( buf, &endptr, 10 );
777 0 : fd_block_map_query_t query[1] = { 0 };
778 0 : int err = fd_block_map_prepare( ctx->blockstore->block_map, &slot, NULL, query, FD_MAP_FLAG_BLOCKING );
779 0 : fd_block_info_t * block_map_entry = fd_block_map_query_ele( query );
780 0 : if( err || block_map_entry->slot != slot ) {
781 0 : FD_LOG_ERR(( "init: slot %lu does not match block_map_entry->slot %lu", slot, block_map_entry->slot ));
782 0 : }
783 0 : block_map_entry->flags = 0;
784 0 : fd_block_map_publish( query );
785 0 : fd_store_add_pending( ctx->store, slot, (long)cnt++, 0, 0 );
786 0 : }
787 0 : fclose( file );
788 0 : }
789 :
790 0 : ctx->shred_cap_ctx.is_archive = 0;
791 0 : ctx->shred_cap_ctx.stable_slot_end = 0;
792 0 : ctx->shred_cap_ctx.stable_slot_start = 0;
793 0 : if( strlen( tile->store_int.shred_cap_archive ) > 0 ) {
794 0 : ctx->shred_cap_ctx.is_archive = 1;
795 0 : ctx->shred_cap_ctx.shred_cap_fileno = open( tile->store_int.shred_cap_archive,
796 0 : O_WRONLY | O_CREAT,
797 0 : S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH );
798 0 : if( ctx->shred_cap_ctx.shred_cap_fileno==-1 ) FD_LOG_ERR(( "failed at opening the shredcap file" ));
799 0 : } else if( strlen( tile->store_int.shred_cap_replay )>0 ) {
800 0 : ctx->sim = 1;
801 0 : ctx->sim_end_slot = tile->store_int.shred_cap_end_slot;
802 0 : FD_LOG_WARNING(( "simulating to slot %lu", ctx->sim_end_slot ));
803 0 : ctx->store->blockstore->shmem->wmk = 0UL;
804 0 : while( ctx->store->blockstore->shmem->wmk==0UL ) {
805 0 : FD_LOG_DEBUG(( "Waiting for blockstore to be initialized" ));
806 0 : }
807 0 : FD_TEST( fd_shred_cap_replay( tile->store_int.shred_cap_replay, ctx->store ) == FD_SHRED_CAP_OK );
808 0 : }
809 :
810 0 : }
811 :
812 : static ulong
813 : populate_allowed_seccomp( fd_topo_t const * topo,
814 : fd_topo_tile_t const * tile,
815 : ulong out_cnt,
816 0 : struct sock_filter * out ) {
817 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
818 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
819 0 : fd_store_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_store_tile_ctx_t), sizeof(fd_store_tile_ctx_t) );
820 0 : FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
821 :
822 0 : populate_sock_filter_policy_fd_storei_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->blockstore_fd );
823 0 : return sock_filter_policy_fd_storei_tile_instr_cnt;
824 0 : }
825 :
826 : static ulong
827 : populate_allowed_fds( fd_topo_t const * topo,
828 : fd_topo_tile_t const * tile,
829 : ulong out_fds_cnt,
830 0 : int * out_fds ) {
831 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
832 :
833 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
834 0 : fd_store_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_store_tile_ctx_t), sizeof(fd_store_tile_ctx_t) );
835 0 : FD_SCRATCH_ALLOC_FINI( l, sizeof(fd_store_tile_ctx_t) );
836 :
837 0 : if( FD_UNLIKELY( out_fds_cnt<3UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
838 :
839 0 : ulong out_cnt = 0UL;
840 0 : out_fds[ out_cnt++ ] = STDERR_FILENO;
841 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
842 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
843 0 : out_fds[ out_cnt++ ] = ctx->blockstore_fd;
844 0 : return out_cnt;
845 0 : }
846 :
847 : static inline void
848 0 : metrics_write( fd_store_tile_ctx_t * ctx ) {
849 0 : FD_MGAUGE_SET( STOREI, CURRENT_TURBINE_SLOT, ctx->metrics.current_turbine_slot );
850 0 : FD_MGAUGE_SET( STOREI, FIRST_TURBINE_SLOT, ctx->metrics.first_turbine_slot );
851 0 : }
852 :
853 0 : #define STEM_BURST (64UL) /* This could be bounded by max # of batches in a slot, so wrth discussing after full refactor */
854 :
855 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_store_tile_ctx_t
856 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_store_tile_ctx_t)
857 :
858 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
859 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
860 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
861 0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
862 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
863 :
864 : #include "../../disco/stem/fd_stem.c"
865 :
866 : fd_topo_run_tile_t fd_tile_storei = {
867 : .name = "storei",
868 : .loose_footprint = loose_footprint,
869 : .populate_allowed_seccomp = populate_allowed_seccomp,
870 : .populate_allowed_fds = populate_allowed_fds,
871 : .scratch_align = scratch_align,
872 : .scratch_footprint = scratch_footprint,
873 : .privileged_init = privileged_init,
874 : .unprivileged_init = unprivileged_init,
875 : .run = stem_run,
876 : };
|