LCOV - code coverage report
Current view: top level - discof/store - fd_storei_tile.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 528 0.0 %
Date: 2025-05-28 12:15:57 Functions: 0 13 0.0 %

          Line data    Source code
       1             : /* Store tile manages a blockstore and serves requests to repair and replay. */
       2             : #include "fd_store.h"
       3             : #include <string.h>
       4             : #define _GNU_SOURCE
       5             : 
       6             : #include "generated/fd_storei_tile_seccomp.h"
       7             : 
       8             : #include "fd_trusted_slots.h"
       9             : #include "../shred/fd_shred_cap.h"
      10             : 
      11             : #include "../../disco/tiles.h"
      12             : #include "../../disco/metrics/fd_metrics.h"
      13             : #include "../../flamenco/runtime/fd_blockstore.h"
      14             : #include "../../disco/shred/fd_stake_ci.h"
      15             : #include "../../disco/keyguard/fd_keyload.h"
      16             : #include "../../disco/topo/fd_pod_format.h"
      17             : #include "../../disco/archiver/fd_archiver.h"
      18             : #include "../../flamenco/runtime/fd_runtime.h"
      19             : #include "../../disco/metrics/fd_metrics.h"
      20             : 
      21             : #include <fcntl.h>
      22             : #include <unistd.h>
      23             : #include <arpa/inet.h>
      24             : #include <linux/unistd.h>
      25             : #include <sys/random.h>
      26             : #include <netdb.h>
      27             : #include <netinet/in.h>
      28             : #include <sys/socket.h>
      29             : 
      30             : #define MAX_IN_LINKS 32
      31           0 : #define IN_KIND_STAKE  0
      32           0 : #define IN_KIND_REPAIR 1
      33           0 : #define IN_KIND_RSTART 2
      34           0 : #define IN_KIND_SHRED  3
      35             : 
      36             : #define MAX_OUT_LINKS 4
      37           0 : #define OUT_KIND_REPLAY  0
      38           0 : #define OUT_KIND_REPAIR  1
      39           0 : #define OUT_KIND_RSTART  2
      40           0 : #define OUT_KIND_ARCHIVE 3
      41             : 
      42             : /* TODO: Determine/justify optimal number of repair requests */
      43           0 : #define MAX_REPAIR_REQS  ( (ulong)USHORT_MAX / sizeof(fd_repair_request_t) )
      44             : 
      45           0 : #define SCRATCH_SMAX     (512UL << 21UL)
      46           0 : #define SCRATCH_SDEPTH   (128UL)
      47             : 
      48             : struct fd_txn_iter {
      49             :   ulong slot;
      50             :   fd_raw_block_txn_iter_t iter;
      51             : };
      52             : 
      53             : typedef struct fd_txn_iter fd_txn_iter_t;
      54             : 
      55             : #define MAP_NAME              fd_txn_iter_map
      56           0 : #define MAP_T                 fd_txn_iter_t
      57             : #define MAP_KEY_T             ulong
      58           0 : #define MAP_KEY               slot
      59           0 : #define MAP_KEY_NULL          FD_SLOT_NULL
      60             : #define MAP_KEY_INVAL(k)      MAP_KEY_EQUAL(k, FD_SLOT_NULL)
      61             : #define MAP_KEY_EQUAL(k0,k1)  (k0==k1)
      62             : #define MAP_KEY_EQUAL_IS_SLOW 0
      63             : #define MAP_MEMOIZE           0
      64             : #define MAP_KEY_HASH(key)     ((uint)fd_ulong_hash( key ))
      65           0 : #define MAP_LG_SLOT_CNT       5
      66             : #include "../../util/tmpl/fd_map.c"
      67             : 
      68             : struct fd_store_in_ctx {
      69             :   fd_wksp_t * mem;
      70             :   ulong       chunk0;
      71             :   ulong       wmark;
      72             : };
      73             : typedef struct fd_store_in_ctx fd_store_in_ctx_t;
      74             : 
      75             : struct fd_store_tile_metrics {
      76             :   ulong first_turbine_slot;
      77             :   ulong current_turbine_slot;
      78             : };
      79             : typedef struct fd_store_tile_metrics fd_store_tile_metrics_t;
      80             : #define FD_STORE_TILE_METRICS_FOOTPRINT ( sizeof( fd_store_tile_metrics_t ) )
      81             : 
      82             : struct fd_store_tile_ctx {
      83             :   fd_wksp_t * wksp;
      84             :   fd_wksp_t * blockstore_wksp;
      85             : 
      86             :   fd_pubkey_t identity_key[1]; /* Just the public key */
      87             : 
      88             :   fd_store_in_ctx_t in[ MAX_IN_LINKS ];
      89             :   uchar in_kind[ MAX_IN_LINKS ];
      90             :   uchar out_idx[ MAX_OUT_LINKS ];
      91             : 
      92             :   fd_store_t *      store;
      93             :   fd_blockstore_t   blockstore_ljoin;
      94             :   int               blockstore_fd; /* file descriptor for archival file */
      95             :   fd_blockstore_t * blockstore;
      96             : 
      97             :   fd_frag_meta_t * repair_req_out_mcache;
      98             :   ulong *          repair_req_out_sync;
      99             :   ulong            repair_req_out_depth;
     100             :   ulong            repair_req_out_seq;
     101             : 
     102             :   fd_wksp_t * repair_req_out_mem;
     103             :   ulong       repair_req_out_chunk0;
     104             :   ulong       repair_req_out_wmark;
     105             :   ulong       repair_req_out_chunk;
     106             : 
     107             :   fd_frag_meta_t * replay_out_mcache;
     108             :   ulong *          replay_out_sync;
     109             :   ulong            replay_out_depth;
     110             :   ulong            replay_out_seq;
     111             : 
     112             :   fd_wksp_t * replay_out_mem;
     113             :   ulong       replay_out_chunk0;
     114             :   ulong       replay_out_wmark;
     115             :   ulong       replay_out_chunk;
     116             : 
     117             :   fd_frag_meta_t * restart_out_mcache;
     118             :   ulong            restart_out_depth;
     119             :   ulong            restart_out_seq;
     120             : 
     121             :   fd_wksp_t * restart_out_mem;
     122             :   ulong       restart_out_chunk0;
     123             :   ulong       restart_out_wmark;
     124             :   ulong       restart_out_chunk;
     125             : 
     126             :   fd_wksp_t * archive_out_mem;
     127             :   ulong       archive_out_chunk0;
     128             :   ulong       archive_out_wmark;
     129             :   ulong       archive_out_chunk;
     130             : 
     131             :   fd_shred34_t s34_buffer[1];
     132             :   uchar shred_buffer[FD_SHRED_MAX_SZ];
     133             :   fd_txn_p_t pack_buffer[MAX_TXN_PER_MICROBLOCK];
     134             : 
     135             :   fd_repair_request_t * repair_req_buffer;
     136             : 
     137             :   fd_stake_ci_t * stake_ci;
     138             : 
     139             :   ulong * root_slot_fseq;
     140             : 
     141             :   int sim;
     142             :   ulong sim_end_slot;
     143             : 
     144             :   fd_shred_cap_ctx_t shred_cap_ctx;
     145             : 
     146             :   fd_trusted_slots_t * trusted_slots;
     147             :   int                  is_trusted;
     148             : 
     149             :   fd_txn_iter_t * txn_iter_map;
     150             : 
     151             :   ulong restart_funk_root;
     152             :   ulong restart_heaviest_fork_slot;
     153             : 
     154             :   /* Metrics */
     155             :   fd_store_tile_metrics_t metrics;
     156             : 
     157             :   ulong turbine_cnt;
     158             :   ulong repair_cnt;
     159             : };
     160             : typedef struct fd_store_tile_ctx fd_store_tile_ctx_t;
     161             : 
     162             : 
     163             : FD_FN_CONST static inline ulong
     164           0 : scratch_align( void ) {
     165           0 :   return 128UL;
     166           0 : }
     167             : 
     168             : FD_FN_PURE static inline ulong
     169           0 : loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
     170           0 :   return 4UL * FD_SHMEM_GIGANTIC_PAGE_SZ;
     171           0 : }
     172             : 
     173             : FD_FN_PURE static inline ulong
     174           0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED) {
     175           0 :   ulong l = FD_LAYOUT_INIT;
     176           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_store_tile_ctx_t), sizeof(fd_store_tile_ctx_t) );
     177           0 :   l = FD_LAYOUT_APPEND( l, fd_store_align(), fd_store_footprint() );
     178           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_repair_request_t), MAX_REPAIR_REQS * sizeof(fd_repair_request_t) );
     179           0 :   l = FD_LAYOUT_APPEND( l, fd_stake_ci_align(), fd_stake_ci_footprint() );
     180           0 :   l = FD_LAYOUT_APPEND( l, fd_trusted_slots_align(), fd_trusted_slots_footprint( MAX_SLOTS_PER_EPOCH ) );
     181           0 :   l = FD_LAYOUT_APPEND( l, fd_txn_iter_map_align(), fd_txn_iter_map_footprint() );
     182           0 :   l = FD_LAYOUT_APPEND( l, fd_scratch_smem_align(), fd_scratch_smem_footprint( SCRATCH_SMAX ) );
     183           0 :   l = FD_LAYOUT_APPEND( l, fd_scratch_fmem_align(), fd_scratch_fmem_footprint( SCRATCH_SDEPTH ) );
     184           0 :   return FD_LAYOUT_FINI( l, scratch_align() );
     185           0 : }
     186             : 
     187             : static void
     188             : during_frag( fd_store_tile_ctx_t * ctx,
     189             :              ulong                 in_idx,
     190             :              ulong                 seq FD_PARAM_UNUSED,
     191             :              ulong                 sig,
     192             :              ulong                 chunk,
     193             :              ulong                 sz,
     194           0 :              ulong                 ctl FD_PARAM_UNUSED ) {
     195           0 :   return;
     196             : 
     197           0 :   if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark ) )
     198           0 :     FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
     199             : 
     200           0 :   uchar const * src = fd_chunk_to_laddr( ctx->in[ in_idx ].mem, chunk );
     201             : 
     202           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_STAKE ) ) {
     203           0 :     fd_stake_ci_stake_msg_init( ctx->stake_ci, src );
     204           0 :     return;
     205           0 :   }
     206             : 
     207           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_REPAIR ) ) {
     208           0 :     if( FD_UNLIKELY( sz>FD_SHRED_MAX_SZ ) ) FD_LOG_ERR(( "invalid in frag sz %lu", sz ));
     209             : 
     210           0 :     memcpy( ctx->shred_buffer, src, sz );
     211           0 :     return;
     212           0 :   }
     213             : 
     214           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_RSTART ) ) {
     215           0 :     if( FD_UNLIKELY( sz>sizeof(ulong)*2 ) ) FD_LOG_ERR(( "invalid in frag sz %lu", sz ));
     216             : 
     217           0 :     FD_TEST( sz==sizeof(ulong)*2 );
     218           0 :     if( FD_UNLIKELY( ctx->restart_heaviest_fork_slot!=0 ) ) {
     219           0 :       FD_LOG_ERR(( "Store tile should only receive heaviest_fork_slot once during wen-restart. Something may have corrupted." ));
     220           0 :     }
     221           0 :     ctx->restart_heaviest_fork_slot = FD_LOAD( ulong, src );
     222           0 :     ctx->restart_funk_root          = FD_LOAD( ulong, src+sizeof(ulong) );
     223           0 :     return;
     224           0 :   }
     225             : 
     226           0 :   if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_SHRED ) ) {
     227           0 :     if( FD_UNLIKELY( sz>sizeof(fd_shred34_t) ) ) FD_LOG_ERR(( "invalid in frag sz %lu", sz ));
     228             : 
     229           0 :     ctx->is_trusted = sig==1;
     230           0 :     memcpy( ctx->s34_buffer, src, sz );
     231           0 :   }
     232           0 : }
     233             : 
     234             : static void
     235             : after_frag( fd_store_tile_ctx_t * ctx,
     236             :             ulong                 in_idx,
     237             :             ulong                 seq    FD_PARAM_UNUSED,
     238             :             ulong                 sig,
     239             :             ulong                 sz,
     240             :             ulong                 tsorig,
     241             :             ulong                 tspub,
     242           0 :             fd_stem_context_t *   stem ) {
     243           0 :   return;
     244             : 
     245           0 :   ulong const in_kind = ctx->in_kind[ in_idx ];
     246             : 
     247           0 :   if( FD_UNLIKELY( in_kind==IN_KIND_STAKE ) ) {
     248           0 :     fd_stake_ci_stake_msg_fini( ctx->stake_ci );
     249           0 :     return;
     250           0 :   }
     251             : 
     252           0 :   if( FD_UNLIKELY( ctx->archive_out_mem &&
     253           0 :                    ( in_kind==IN_KIND_REPAIR || in_kind==IN_KIND_SHRED ) ) ) {
     254           0 :     ulong new_sig;
     255           0 :     if( in_kind==IN_KIND_REPAIR ) {
     256           0 :       fd_memcpy( fd_chunk_to_laddr( ctx->archive_out_mem, ctx->archive_out_chunk ), ctx->shred_buffer, sz );
     257           0 :       new_sig = FD_ARCHIVER_SIG_MARK_REPAIR(sig);
     258           0 :     } else if( in_kind==IN_KIND_SHRED ) {
     259           0 :       fd_memcpy( fd_chunk_to_laddr( ctx->archive_out_mem, ctx->archive_out_chunk ), ctx->s34_buffer, sz );
     260           0 :       new_sig = FD_ARCHIVER_SIG_MARK_SHRED(sig);
     261           0 :     }
     262           0 :     fd_stem_publish( stem, ctx->out_idx[ OUT_KIND_ARCHIVE ], new_sig, ctx->archive_out_chunk, sz, 0UL, tsorig, tspub );
     263           0 :     ctx->archive_out_chunk = fd_dcache_compact_next( ctx->archive_out_chunk, sz, ctx->archive_out_chunk0, ctx->archive_out_wmark );
     264           0 :   }
     265             : 
     266           0 :   if( FD_UNLIKELY( in_kind==IN_KIND_REPAIR ) ) {
     267           0 :     fd_shred_t const * shred = (fd_shred_t const *)fd_type_pun_const( ctx->shred_buffer );
     268           0 :     if( !fd_pending_slots_check( ctx->store->pending_slots, shred->slot ) ) {
     269           0 :       FD_LOG_WARNING(("received repair shred %lu that would overrun pending queue. skipping.", shred->slot));
     270           0 :       return;
     271           0 :     }
     272             : 
     273           0 :     if( FD_UNLIKELY( (long)(ctx->store->curr_turbine_slot - shred->slot) > (long)8192 ) ) {
     274           0 :       FD_LOG_WARNING(("received repair shred with slot %lu that would overrun pending queue. skipping.", shred->slot));
     275           0 :       return;
     276           0 :     }
     277             : 
     278           0 :     if( fd_store_shred_insert( ctx->store, shred ) < FD_BLOCKSTORE_SUCCESS ) {
     279           0 :       FD_LOG_ERR(( "failed inserting to blockstore" ));
     280           0 :     } else if ( ctx->shred_cap_ctx.is_archive ) {
     281           0 :       uchar shred_cap_flag = FD_SHRED_CAP_FLAG_MARK_REPAIR( 0 );
     282           0 :       if( fd_shred_cap_archive( &ctx->shred_cap_ctx, shred, shred_cap_flag ) < FD_SHRED_CAP_OK ) {
     283           0 :         FD_LOG_ERR( ( "failed at archiving repair shred to file" ) );
     284           0 :       }
     285           0 :     }
     286           0 :     ctx->repair_cnt++;
     287           0 :     return;
     288           0 :   }
     289             : 
     290           0 :   if( FD_UNLIKELY( in_kind==IN_KIND_RSTART ) ) {
     291           0 :     FD_LOG_NOTICE(( "Store tile starts to repair backwards from slot%lu, which should be on the same fork as slot%lu",
     292           0 :                     ctx->restart_heaviest_fork_slot, ctx->restart_funk_root ));
     293           0 :     fd_store_add_pending( ctx->store, ctx->restart_heaviest_fork_slot, (long)5e6, 0, 0 );
     294           0 :     return;
     295           0 :   }
     296             : 
     297             :   /* everything else is shred */
     298             :   //FD_TEST( (ctx->s34_buffer->shred_cnt>0UL) & (ctx->s34_buffer->shred_cnt<=34UL) );
     299             : 
     300           0 :   if( FD_UNLIKELY( ctx->is_trusted ) ) {
     301             :     /* this slot is coming from our leader pipeline */
     302           0 :     fd_trusted_slots_add( ctx->trusted_slots, ctx->s34_buffer->pkts[ 0 ].shred.slot );
     303           0 :   }
     304           0 :   for( ulong i = 0; i < ctx->s34_buffer->shred_cnt; i++ ) {
     305           0 :     fd_shred_t * shred = &ctx->s34_buffer->pkts[i].shred;
     306             :     // TODO: these checks are not great as they assume a lot about the distance of shreds.
     307           0 :     if( !fd_pending_slots_check( ctx->store->pending_slots, shred->slot ) ) {
     308           0 :       FD_LOG_WARNING(("received shred %lu that would overrun pending queue. skipping.", shred->slot));
     309           0 :       continue;
     310           0 :     }
     311             : 
     312           0 :     if( FD_UNLIKELY( (long)(ctx->store->curr_turbine_slot - shred->slot) > (long)8192 ) ) {
     313           0 :       FD_LOG_WARNING(("received shred with slot %lu that would overrun pending queue. skipping.", shred->slot));
     314           0 :       continue;
     315           0 :     }
     316             :     // TODO: improve return value of api to not use < OK
     317             : 
     318           0 :     if( fd_store_shred_insert( ctx->store, shred ) < FD_BLOCKSTORE_SUCCESS ) {
     319           0 :       FD_LOG_ERR(( "failed inserting to blockstore" ));
     320           0 :     } else if ( ctx->shred_cap_ctx.is_archive ) {
     321           0 :       uchar shred_cap_flag = FD_SHRED_CAP_FLAG_MARK_TURBINE(0);
     322           0 :       if ( fd_shred_cap_archive(&ctx->shred_cap_ctx, shred, shred_cap_flag) < FD_SHRED_CAP_OK ) {
     323           0 :         FD_LOG_ERR(( "failed at archiving turbine shred to file" ));
     324           0 :       }
     325           0 :     }
     326           0 :     ctx->turbine_cnt++;
     327             : 
     328           0 :     fd_store_shred_update_with_shred_from_turbine( ctx->store, shred );
     329           0 :   }
     330           0 : }
     331             : 
     332             : static void
     333             : fd_store_tile_slot_prepare( fd_store_tile_ctx_t * ctx,
     334             :                             fd_stem_context_t *  stem,
     335             :                             int                  store_slot_prepare_mode,
     336           0 :                             ulong                slot ) {
     337           0 :   ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
     338           0 :   fd_repair_request_t * repair_reqs = fd_chunk_to_laddr( ctx->repair_req_out_mem, ctx->repair_req_out_chunk );
     339             :   /* We are leader at this slot and the slot is newer than turbine! */
     340             :   // FIXME: I dont think that this `ctx->store->curr_turbine_slot >= slot`
     341             :   // check works on fork switches to lower slot numbers. Use a given fork height
     342             :   // instead
     343             :   // if( ctx->store->curr_turbine_slot >= slot
     344             :   //     && memcmp( ctx->identity_key, slot_leader, sizeof(fd_pubkey_t) ) == 0 ) {
     345             :   //   if( store_slot_prepare_mode == FD_STORE_SLOT_PREPARE_CONTINUE ) {
     346             :   //     fd_block_t * block = fd_blockstore_block_query( ctx->blockstore, slot );
     347             :   //     if( FD_LIKELY( block ) ) {
     348             :   //       block->flags = fd_uchar_set_bit( block->flags, FD_BLOCK_FLAG_PROCESSED );
     349             :   //     }
     350             :   //   } else {
     351             :   //     return;
     352             :   //   }
     353             :   // }
     354             : 
     355           0 :   ulong repair_req_cnt = 0;
     356           0 :   switch( store_slot_prepare_mode ) {
     357           0 :     case FD_STORE_SLOT_PREPARE_CONTINUE: {
     358           0 :       ulong root = fd_fseq_query( ctx->root_slot_fseq );
     359           0 :       if( root!=ULONG_MAX ) {
     360             :         // FD_LOG_WARNING(("CONTINUE: %lu", root));
     361           0 :         fd_store_set_root( ctx->store, root );
     362           0 :       }
     363           0 :       break;
     364           0 :     }
     365           0 :     case FD_STORE_SLOT_PREPARE_NEED_PARENT_EXEC: {
     366           0 :       break;
     367           0 :     }
     368           0 :     case FD_STORE_SLOT_PREPARE_NEED_REPAIR: {
     369           0 :       repair_req_cnt = fd_store_slot_repair( ctx->store, slot, repair_reqs, MAX_REPAIR_REQS );
     370           0 :       break;
     371           0 :     }
     372           0 :     case FD_STORE_SLOT_PREPARE_NEED_ORPHAN: {
     373           0 :       fd_repair_request_t * repair_req = &repair_reqs[0];
     374           0 :       repair_req->slot = slot;
     375           0 :       repair_req->shred_index = UINT_MAX;
     376           0 :       repair_req->type = FD_REPAIR_REQ_TYPE_NEED_ORPHAN;
     377           0 :       repair_req_cnt = 1;
     378           0 :       break;
     379           0 :     }
     380           0 :     case FD_STORE_SLOT_PREPARE_ALREADY_EXECUTED: {
     381           0 :       return;
     382           0 :     }
     383           0 :     default: {
     384           0 :       FD_LOG_ERR(( "unrecognized store slot prepare mode" ));
     385           0 :       return;
     386           0 :     }
     387           0 :   }
     388             : 
     389           0 :   if( store_slot_prepare_mode == FD_STORE_SLOT_PREPARE_CONTINUE ) {
     390             : 
     391           0 :     if ( FD_UNLIKELY( ctx->sim && slot>=ctx->sim_end_slot ) ) {
     392           0 :       FD_LOG_ERR(( "Finished simulation to slot %lu", ctx->sim_end_slot ));
     393           0 :     }
     394             : 
     395           0 :     FD_LOG_NOTICE( ( "\n\n[Store]\n"
     396           0 :                      "slot:            %lu\n"
     397           0 :                      "current turbine: %lu\n"
     398           0 :                      "first turbine:   %lu\n"
     399           0 :                      "slots behind:    %lu\n"
     400           0 :                      "live:            %d\n",
     401           0 :                      slot,
     402           0 :                      ctx->store->curr_turbine_slot,
     403           0 :                      ctx->store->first_turbine_slot,
     404           0 :                      ctx->store->curr_turbine_slot - slot,
     405           0 :                      ( ctx->store->curr_turbine_slot - slot ) < 5 ) );
     406             : 
     407           0 :     if( !fd_blockstore_shreds_complete( ctx->blockstore, slot ) ) {
     408           0 :       FD_LOG_ERR(( "could not find block - slot: %lu", slot ));
     409           0 :     }
     410             : 
     411           0 :     ulong parent_slot = fd_blockstore_parent_slot_query( ctx->blockstore, slot );
     412           0 :     if ( FD_UNLIKELY( parent_slot == FD_SLOT_NULL ) ) FD_LOG_ERR(( "could not find slot %lu meta", slot ));
     413             : 
     414           0 :     FD_SCRATCH_SCOPE_BEGIN {
     415           0 :       ctx->metrics.first_turbine_slot = ctx->store->first_turbine_slot;
     416           0 :       ctx->metrics.current_turbine_slot = ctx->store->curr_turbine_slot;
     417             : 
     418           0 :       ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
     419           0 :       ulong caught_up_flag = (ctx->store->curr_turbine_slot - slot)<4 ? 0UL : REPLAY_FLAG_CATCHING_UP;
     420           0 :       ulong replay_sig = fd_disco_replay_old_sig( slot, REPLAY_FLAG_MICROBLOCK | caught_up_flag );
     421             : 
     422           0 :       if( FD_UNLIKELY( fd_trusted_slots_find( ctx->trusted_slots, slot ) ) ) {
     423             :         /* if is caught up and is leader */
     424           0 :         replay_sig = fd_disco_replay_old_sig( slot, REPLAY_FLAG_FINISHED_BLOCK );
     425           0 :         FD_LOG_INFO(( "packed block prepared - slot: %lu", slot ));
     426           0 :       } else {
     427           0 :         replay_sig = fd_disco_replay_old_sig( slot, REPLAY_FLAG_FINISHED_BLOCK | REPLAY_FLAG_MICROBLOCK | caught_up_flag );
     428           0 :       }
     429             : 
     430           0 :       fd_block_set_t data_complete_idxs[FD_SHRED_BLK_MAX / sizeof(ulong)] = { 0 };
     431           0 :       ulong  buffered_idx = 0;
     432           0 :       ulong  complete_idx = 0;
     433           0 :       for(;;) { /* speculative query */
     434           0 :         fd_block_map_query_t query[1] = { 0 };
     435           0 :         int err = fd_block_map_query_try( ctx->blockstore->block_map, &slot, NULL, query, 0 );
     436           0 :         fd_block_info_t * block_info = fd_block_map_query_ele( query );
     437             : 
     438           0 :         if( FD_UNLIKELY( err == FD_MAP_ERR_KEY   ) ) FD_LOG_ERR(( "could not find block - slot: %lu", slot ));
     439           0 :         if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
     440             : 
     441           0 :         memcpy( data_complete_idxs, block_info->data_complete_idxs, sizeof(data_complete_idxs) );
     442           0 :         complete_idx = block_info->data_complete_idx;
     443           0 :         buffered_idx = block_info->buffered_idx;
     444             : 
     445           0 :         if( FD_UNLIKELY( fd_block_map_query_test( query ) == FD_MAP_SUCCESS ) ) break;
     446           0 :       }
     447             : 
     448           0 :       FD_TEST( complete_idx == buffered_idx );
     449             : 
     450             :       /* artificially publishing slices in order. Simulates intended repair
     451             :          tile behavior. */
     452             : 
     453           0 :       uint consumed_idx = UINT_MAX;
     454           0 :       for( uint idx = 0; idx <= buffered_idx; idx++ ) {
     455           0 :         if( FD_UNLIKELY( fd_block_set_test( data_complete_idxs, idx ) ) ) {
     456           0 :           uint data_cnt = consumed_idx != UINT_MAX ? idx - consumed_idx : idx + 1;
     457             : 
     458           0 :           replay_sig = fd_disco_repair_replay_sig( slot, (ushort)( slot - parent_slot ), data_cnt, complete_idx == idx );
     459           0 :           fd_stem_publish( stem, ctx->out_idx[ OUT_KIND_REPLAY ], replay_sig, ctx->replay_out_chunk, 0, 0UL, tsorig, tspub );
     460           0 :           ctx->replay_out_chunk = fd_dcache_compact_next( ctx->replay_out_chunk, 0, ctx->replay_out_chunk0, ctx->replay_out_wmark );
     461           0 :           consumed_idx = idx;
     462           0 :         }
     463           0 :       }
     464             : 
     465             : 
     466           0 :     } FD_SCRATCH_SCOPE_END;
     467           0 :   }
     468             : 
     469           0 :   if( repair_req_cnt != 0 ) {
     470           0 :     ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
     471           0 :     ulong repair_req_sig = 50UL;
     472           0 :     ulong repair_req_sz = repair_req_cnt * sizeof(fd_repair_request_t);
     473           0 :     FD_TEST( repair_req_sz<=USHORT_MAX );
     474           0 :     fd_mcache_publish( ctx->repair_req_out_mcache, ctx->repair_req_out_depth, ctx->repair_req_out_seq, repair_req_sig, ctx->repair_req_out_chunk,
     475           0 :       repair_req_sz, 0UL, tsorig, tspub );
     476           0 :     ctx->repair_req_out_seq   = fd_seq_inc( ctx->repair_req_out_seq, 1UL );
     477           0 :     ctx->repair_req_out_chunk = fd_dcache_compact_next( ctx->repair_req_out_chunk, repair_req_sz, ctx->repair_req_out_chunk0, ctx->repair_req_out_wmark );
     478           0 :   }
     479             : 
     480           0 :   return;
     481           0 : }
     482             : 
     483             : static void
     484             : after_credit( fd_store_tile_ctx_t * ctx,
     485             :               fd_stem_context_t *   stem,
     486             :               int *                 opt_poll_in FD_PARAM_UNUSED,
     487           0 :               int *                 charge_busy ) {
     488             :   /* TODO: Don't charge the tile as busy if after_credit isn't actually
     489             :      doing any work. */
     490           0 :   *charge_busy = 1;
     491             : 
     492           0 :   fd_mcache_seq_update( ctx->replay_out_sync, ctx->replay_out_seq );
     493           0 :   fd_mcache_seq_update( ctx->repair_req_out_sync, ctx->repair_req_out_seq );
     494             : 
     495           0 :   if( FD_UNLIKELY( ctx->sim &&
     496           0 :                    ctx->store->pending_slots->start == ctx->store->pending_slots->end ) ) {
     497             :     // FD_LOG_WARNING(( "Sim is complete." ));
     498           0 :   }
     499             : 
     500           0 :   for( ulong i = 0; i<fd_txn_iter_map_slot_cnt(); i++ ) {
     501           0 :     if( ctx->txn_iter_map[i].slot != FD_SLOT_NULL ) {
     502           0 :       fd_store_tile_slot_prepare( ctx, stem, FD_STORE_SLOT_PREPARE_CONTINUE, ctx->txn_iter_map[i].slot );
     503           0 :     }
     504           0 :   }
     505             : 
     506           0 :   for( ulong i = fd_pending_slots_iter_init( ctx->store->pending_slots );
     507           0 :          (i = fd_pending_slots_iter_next( ctx->store->pending_slots, ctx->store->now, i )) != ULONG_MAX; ) {
     508           0 :     ulong repair_slot = FD_SLOT_NULL;
     509           0 :     int store_slot_prepare_mode = fd_store_slot_prepare( ctx->store, i, &repair_slot );
     510             : 
     511           0 :     ulong slot = repair_slot == 0 ? i : repair_slot;
     512           0 :     FD_LOG_DEBUG(( "store slot - mode: %d, slot: %lu, repair_slot: %lu", store_slot_prepare_mode, i, repair_slot ));
     513           0 :     fd_store_tile_slot_prepare( ctx, stem, store_slot_prepare_mode, slot );
     514             : 
     515           0 :     if( FD_UNLIKELY( ctx->restart_heaviest_fork_slot &&
     516           0 :                      i==ctx->restart_heaviest_fork_slot ) ) {
     517           0 :       if( FD_LIKELY( store_slot_prepare_mode!=FD_STORE_SLOT_PREPARE_ALREADY_EXECUTED ) ) {
     518           0 :         fd_store_add_pending( ctx->store, ctx->restart_heaviest_fork_slot, (long)5e6, 0, 0 );
     519           0 :       } else if( ctx->restart_out_mem ) {
     520           0 :         fd_hash_t blk_hash;
     521           0 :         int err = fd_blockstore_block_hash_query( ctx->blockstore,
     522           0 :                                                   ctx->restart_heaviest_fork_slot,
     523           0 :                                                   &blk_hash );
     524           0 :         if( FD_UNLIKELY( err ) ){
     525           0 :           FD_LOG_ERR(( "Wen-restart cannot get the block hash of HeaviestForkSlot %lu", ctx->restart_heaviest_fork_slot ));
     526           0 :         }
     527           0 :         fd_funk_txn_xid_t xid;
     528           0 :         fd_memcpy( &xid, blk_hash.uc, sizeof(fd_funk_txn_xid_t) );
     529           0 :         xid.ul[0] = ctx->restart_heaviest_fork_slot;
     530             : 
     531             :         /* Send xid to restart tile */
     532           0 :         uchar * buf   = fd_chunk_to_laddr( ctx->restart_out_mem, ctx->restart_out_chunk );
     533           0 :         ulong buf_len = sizeof(fd_funk_txn_xid_t);
     534           0 :         fd_memcpy( buf, &xid, sizeof(fd_funk_txn_xid_t) );
     535           0 :         fd_mcache_publish( ctx->restart_out_mcache, ctx->restart_out_depth, ctx->restart_out_seq, 1UL, ctx->restart_out_chunk,
     536           0 :                            buf_len, 0UL, 0, 0 );
     537           0 :         ctx->restart_out_seq   = fd_seq_inc( ctx->restart_out_seq, 1UL );
     538           0 :         ctx->restart_out_chunk = fd_dcache_compact_next( ctx->restart_out_chunk, buf_len, ctx->restart_out_chunk0, ctx->restart_out_wmark );
     539           0 :       }
     540           0 :     }
     541           0 :   }
     542           0 : }
     543             : 
     544             : static inline void
     545           0 : during_housekeeping( fd_store_tile_ctx_t * ctx ) {
     546           0 :   ctx->store->now = fd_log_wallclock();
     547           0 : }
     548             : 
     549             : static void
     550             : privileged_init( fd_topo_t *      topo,
     551           0 :                  fd_topo_tile_t * tile ) {
     552           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     553             : 
     554           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     555           0 :   fd_store_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_store_tile_ctx_t), sizeof(fd_store_tile_ctx_t) );
     556           0 :   FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
     557           0 :   memset( ctx, 0, sizeof(fd_store_tile_ctx_t) );
     558             : 
     559           0 :   if( FD_UNLIKELY( !strcmp( tile->store_int.identity_key_path, "" ) ) )
     560           0 :     FD_LOG_ERR(( "identity_key_path not set" ));
     561             : 
     562           0 :   ctx->identity_key[ 0 ] = *(fd_pubkey_t const *)fd_type_pun_const( fd_keyload_load( tile->store_int.identity_key_path, /* pubkey only: */ 1 ) );
     563           0 :   ctx->blockstore_fd = open( tile->store_int.blockstore_file, O_RDONLY );
     564           0 : }
     565             : 
     566             : static void
     567             : unprivileged_init( fd_topo_t *      topo,
     568           0 :                    fd_topo_tile_t * tile ) {
     569           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     570             : 
     571             :   /* Scratch mem setup */
     572             : 
     573           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     574           0 :   fd_store_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_store_tile_ctx_t), sizeof(fd_store_tile_ctx_t) );
     575           0 :   ctx->blockstore = &ctx->blockstore_ljoin;
     576             :   // TODO: set the lo_mark_slot to the actual snapshot slot!
     577           0 :   ctx->store = fd_store_join( fd_store_new( FD_SCRATCH_ALLOC_APPEND( l, fd_store_align(), fd_store_footprint() ), 1 ) );
     578           0 :   ctx->repair_req_buffer = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_repair_request_t), MAX_REPAIR_REQS * sizeof(fd_repair_request_t) );
     579           0 :   ctx->stake_ci = fd_stake_ci_join( fd_stake_ci_new( FD_SCRATCH_ALLOC_APPEND( l, fd_stake_ci_align(), fd_stake_ci_footprint() ), ctx->identity_key ) );
     580             : 
     581           0 :   void * trusted_slots_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_trusted_slots_align(), fd_trusted_slots_footprint( MAX_SLOTS_PER_EPOCH ) );
     582           0 :   ctx->trusted_slots = fd_trusted_slots_join( fd_trusted_slots_new( trusted_slots_mem, MAX_SLOTS_PER_EPOCH ) );
     583           0 :   FD_TEST( ctx->trusted_slots!=NULL );
     584             : 
     585           0 :   void * iter_map_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_txn_iter_map_align(), fd_txn_iter_map_footprint() );
     586           0 :   ctx->txn_iter_map = fd_txn_iter_map_join( fd_txn_iter_map_new( iter_map_mem ) );
     587             : 
     588           0 :   ctx->wksp = topo->workspaces[ topo->objs[ tile->tile_obj_id ].wksp_id ].wksp;
     589             : 
     590           0 :   ulong blockstore_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "blockstore" );
     591           0 :   FD_TEST( blockstore_obj_id!=ULONG_MAX );
     592           0 :   ctx->blockstore_wksp = topo->workspaces[ topo->objs[ blockstore_obj_id ].wksp_id ].wksp;
     593             : 
     594           0 :   if( ctx->blockstore_wksp == NULL ) {
     595           0 :     FD_LOG_ERR(( "blockstore_wksp must be defined in topo." ));
     596           0 :   }
     597             : 
     598           0 :   FD_TEST( tile->in_cnt<=sizeof( ctx->in )/sizeof( ctx->in[ 0 ] ) );
     599           0 :   for( ulong i=0UL; i<tile->in_cnt; i++ ) {
     600           0 :     fd_topo_link_t * link = &topo->links[ tile->in_link_id[ i ] ];
     601           0 :     fd_topo_wksp_t * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ];
     602             : 
     603           0 :     ctx->in[i].mem    = link_wksp->wksp;
     604           0 :     ctx->in[i].chunk0 = fd_dcache_compact_chunk0( ctx->in[i].mem, link->dcache );
     605           0 :     ctx->in[i].wmark  = fd_dcache_compact_wmark ( ctx->in[i].mem, link->dcache, link->mtu );
     606             : 
     607           0 :     if( !strcmp( link->name, "stake_out" ) ) {
     608           0 :       ctx->in_kind[ i ] = IN_KIND_STAKE;
     609           0 :     } else if( !strcmp( link->name, "repair_store" ) ) {
     610           0 :       ctx->in_kind[ i ] = IN_KIND_REPAIR;
     611           0 :     } else if( !strcmp( link->name, "rstart_store" ) ) {
     612           0 :       ctx->in_kind[ i ] = IN_KIND_RSTART;
     613           0 :     } else if( !strcmp( link->name, "shred_storei" ) ) {
     614           0 :       ctx->in_kind[ i ] = IN_KIND_SHRED;
     615           0 :     } else {
     616           0 :       FD_LOG_ERR(( "unexpected input link %s", link->name ));
     617           0 :     }
     618           0 :   }
     619             : 
     620           0 :   FD_TEST( tile->out_cnt<=sizeof( ctx->out_idx )/sizeof( ctx->out_idx[ 0 ] ) );
     621           0 :   for( ulong i=0UL; i<tile->out_cnt; i++ ) {
     622           0 :     fd_topo_link_t * link      = &topo->links[ tile->out_link_id[ i ] ];
     623           0 :     fd_wksp_t *      link_wksp = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
     624             : 
     625           0 :     if( !strcmp( link->name, "store_replay" ) ) {
     626             : 
     627           0 :       ctx->out_idx[ OUT_KIND_REPLAY ] = (uchar)i;
     628           0 :       FD_TEST( !ctx->replay_out_mem );
     629           0 :       ctx->replay_out_mcache = link->mcache;
     630           0 :       ctx->replay_out_sync   = fd_mcache_seq_laddr( ctx->replay_out_mcache );
     631           0 :       ctx->replay_out_depth  = fd_mcache_depth( ctx->replay_out_mcache );
     632           0 :       ctx->replay_out_seq    = fd_mcache_seq_query( ctx->replay_out_sync );
     633           0 :       ctx->replay_out_mem    = link_wksp;
     634           0 :       ctx->replay_out_chunk0 = fd_dcache_compact_chunk0( link_wksp, link->dcache );
     635           0 :       ctx->replay_out_wmark  = fd_dcache_compact_wmark ( link_wksp, link->dcache, link->mtu );
     636           0 :       ctx->replay_out_chunk  = ctx->replay_out_chunk0;
     637             : 
     638           0 :     } else if( !strcmp( link->name, "store_repair" ) ) {
     639             : 
     640           0 :       ctx->out_idx[ OUT_KIND_REPAIR ] = (uchar)i;
     641           0 :       FD_TEST( !ctx->repair_req_out_mem );
     642           0 :       ctx->repair_req_out_mcache = link->mcache;
     643           0 :       ctx->repair_req_out_sync   = fd_mcache_seq_laddr( ctx->repair_req_out_mcache );
     644           0 :       ctx->repair_req_out_depth  = fd_mcache_depth( ctx->repair_req_out_mcache );
     645           0 :       ctx->repair_req_out_seq    = fd_mcache_seq_query( ctx->repair_req_out_sync );
     646           0 :       ctx->repair_req_out_mem    = link_wksp;
     647           0 :       ctx->repair_req_out_chunk0 = fd_dcache_compact_chunk0( link_wksp, link->dcache );
     648           0 :       ctx->repair_req_out_wmark  = fd_dcache_compact_wmark ( link_wksp, link->dcache, link->mtu );
     649           0 :       ctx->repair_req_out_chunk  = ctx->repair_req_out_chunk0;
     650             : 
     651           0 :     } else if( !strcmp( link->name, "store_rstart" ) ) {
     652             : 
     653           0 :       ctx->out_idx[ OUT_KIND_RSTART ] = (uchar)i;
     654           0 :       FD_TEST( !ctx->restart_out_mem );
     655           0 :       ctx->restart_out_mcache = link->mcache;
     656           0 :       ulong * sync            = fd_mcache_seq_laddr( ctx->restart_out_mcache );
     657           0 :       ctx->restart_out_depth  = fd_mcache_depth( ctx->restart_out_mcache );
     658           0 :       ctx->restart_out_seq    = fd_mcache_seq_query( sync );
     659           0 :       ctx->restart_out_mem    = link_wksp;
     660           0 :       ctx->restart_out_chunk0 = fd_dcache_compact_chunk0( link_wksp, link->dcache );
     661           0 :       ctx->restart_out_wmark  = fd_dcache_compact_wmark ( link_wksp, link->dcache, link->mtu );
     662           0 :       ctx->restart_out_chunk  = ctx->restart_out_chunk0;
     663             : 
     664           0 :     } else if( !strcmp( link->name, "store_feeder" ) ||
     665           0 :                !strcmp( link->name, "storei_notif" ) ) {
     666             : 
     667           0 :       ctx->out_idx[ OUT_KIND_ARCHIVE ] = (uchar)i;
     668           0 :       FD_TEST( !ctx->archive_out_mem );
     669           0 :       FD_LOG_NOTICE(( "storei tile has output link %s", link->name ));
     670           0 :       ctx->archive_out_mem    = link_wksp;
     671           0 :       ctx->archive_out_chunk0 = fd_dcache_compact_chunk0( link_wksp, link->dcache );
     672           0 :       ctx->archive_out_wmark  = fd_dcache_compact_wmark ( link_wksp, link->dcache, link->mtu );
     673           0 :       ctx->archive_out_chunk  = ctx->archive_out_chunk0;
     674             : 
     675           0 :     } else {
     676           0 :       FD_LOG_ERR(( "unexpected output link %s", link->name ));
     677           0 :     }
     678           0 :   }
     679             : 
     680           0 :   if( FD_UNLIKELY( !ctx->replay_out_mem     ) ) FD_LOG_ERR(( "Missing replay output link" ));
     681           0 :   if( FD_UNLIKELY( !ctx->repair_req_out_mem ) ) FD_LOG_ERR(( "Missing repair output link" ));
     682             : 
     683             :   /**********************************************************************/
     684             :   /* root_slot fseq                                                     */
     685             :   /**********************************************************************/
     686             : 
     687           0 :   ulong root_slot_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "root_slot" );
     688           0 :   FD_TEST( root_slot_obj_id!=ULONG_MAX );
     689           0 :   ctx->root_slot_fseq = fd_fseq_join( fd_topo_obj_laddr( topo, root_slot_obj_id ) );
     690           0 :   if( FD_UNLIKELY( !ctx->root_slot_fseq ) ) FD_LOG_ERR(( "replay tile has no root_slot fseq" ));
     691           0 :   FD_TEST( ULONG_MAX==fd_fseq_query( ctx->root_slot_fseq ) );
     692             : 
     693             :   /* Prevent blockstore from being created until we know the shred version */
     694           0 :   ulong expected_shred_version = tile->store_int.expected_shred_version;
     695           0 :   if( FD_LIKELY( !expected_shred_version ) ) {
     696           0 :     ulong busy_obj_id = fd_pod_query_ulong( topo->props, "poh_shred", ULONG_MAX );
     697           0 :     FD_TEST( busy_obj_id!=ULONG_MAX );
     698           0 :     ulong * gossip_shred_version = fd_fseq_join( fd_topo_obj_laddr( topo, busy_obj_id ) );
     699           0 :     FD_LOG_INFO(( "waiting for shred version to be determined via gossip." ));
     700           0 :     do {
     701           0 :       expected_shred_version = fd_fseq_query( gossip_shred_version );
     702           0 :     } while( expected_shred_version==ULONG_MAX );
     703           0 :     FD_LOG_NOTICE(( "using shred version %lu", expected_shred_version ));
     704           0 :   }
     705           0 :   if( FD_UNLIKELY( expected_shred_version>USHORT_MAX ) ) FD_LOG_ERR(( "invalid shred version %lu", expected_shred_version ));
     706           0 :   FD_TEST( expected_shred_version );
     707           0 :   fd_store_expected_shred_version( ctx->store, expected_shred_version );
     708             : 
     709           0 :   if( FD_UNLIKELY( strlen( tile->store_int.blockstore_restore ) > 0 ) ) {
     710           0 :     FD_LOG_NOTICE(( "starting blockstore_wksp restore %s", tile->store_int.blockstore_restore ));
     711           0 :     int rc = fd_wksp_restore( ctx->blockstore_wksp, tile->store_int.blockstore_restore, (uint)FD_BLOCKSTORE_MAGIC );
     712           0 :     if( rc ) {
     713           0 :       FD_LOG_ERR(( "failed to restore %s: error %d.", tile->store_int.blockstore_restore, rc ));
     714           0 :     }
     715           0 :     FD_LOG_NOTICE(( "finished blockstore_wksp restore %s", tile->store_int.blockstore_restore ));
     716           0 :     fd_wksp_tag_query_info_t info;
     717           0 :     ulong tag = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "obj.%lu.wksp_tag", blockstore_obj_id );
     718           0 :     if( FD_LIKELY( fd_wksp_tag_query( ctx->blockstore_wksp, &tag, 1, &info, 1 ) > 0 ) ) {
     719           0 :       void * blockstore_mem = fd_wksp_laddr_fast( ctx->blockstore_wksp, info.gaddr_lo );
     720           0 :       ctx->blockstore = fd_blockstore_join( &ctx->blockstore_ljoin, blockstore_mem );
     721           0 :     } else {
     722           0 :       FD_LOG_WARNING(( "failed to find blockstore in workspace. making new blockstore." ));
     723           0 :     }
     724           0 :   } else {
     725           0 :     void * blockstore_shmem = fd_topo_obj_laddr( topo, blockstore_obj_id );
     726           0 :     if( blockstore_shmem == NULL ) {
     727           0 :       FD_LOG_ERR(( "failed to find blockstore" ));
     728           0 :     }
     729             : 
     730           0 :     ctx->blockstore = fd_blockstore_join( &ctx->blockstore_ljoin, blockstore_shmem );
     731           0 :   }
     732             : 
     733           0 :   FD_LOG_NOTICE(( "blockstore: %s", tile->store_int.blockstore_file ));
     734             : 
     735           0 :   FD_TEST( ctx->blockstore );
     736           0 :   ctx->store->blockstore = ctx->blockstore;
     737             : 
     738           0 :   void * alloc_shmem = fd_wksp_alloc_laddr( ctx->wksp, fd_alloc_align(), fd_alloc_footprint(), 3UL );
     739           0 :   if( FD_UNLIKELY( !alloc_shmem ) ) {
     740           0 :     FD_LOG_ERR( ( "fd_alloc too large for workspace" ) );
     741           0 :   }
     742             : 
     743             :   /* Set up ctx states for wen-restart */
     744           0 :   ctx->restart_funk_root          = 0;
     745           0 :   ctx->restart_heaviest_fork_slot = 0;
     746             : 
     747           0 :   void * smem = FD_SCRATCH_ALLOC_APPEND( l, fd_scratch_smem_align(), fd_scratch_smem_footprint( SCRATCH_SMAX ) );
     748           0 :   void * fmem = FD_SCRATCH_ALLOC_APPEND( l, fd_scratch_fmem_align(), fd_scratch_fmem_footprint( SCRATCH_SDEPTH ) );
     749             : 
     750             :   /* Create scratch region */
     751           0 :   FD_TEST( (!!smem) & (!!fmem) );
     752           0 :   fd_scratch_attach( smem, fmem, SCRATCH_SMAX, SCRATCH_SDEPTH );
     753             : 
     754           0 :   ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
     755           0 :   if( FD_UNLIKELY( scratch_top != (ulong)scratch + scratch_footprint( tile ) ) ) {
     756           0 :     FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
     757           0 :   }
     758             : 
     759           0 :   if( FD_UNLIKELY( strlen( tile->store_int.slots_pending ) > 0 ) ) {
     760           0 :     ctx->sim = 1;
     761             : 
     762           0 :     const char * split = strchr( tile->store_int.slots_pending, '-' );
     763           0 :     FD_TEST( split != NULL && *( split + 1 ) != '\0' );
     764           0 :     const char * snapshot_slot_str = split + 1;
     765           0 :     char *       endptr;
     766           0 :     ulong        snapshot_slot = strtoul( snapshot_slot_str, &endptr, 10 );
     767             : 
     768           0 :     FILE * file = fopen( tile->store_int.slots_pending, "r" );
     769           0 :     char   buf[20]; /* max # of digits for a ulong */
     770             : 
     771           0 :     ulong cnt = 1;
     772           0 :     FD_TEST( fd_blockstore_block_info_remove( ctx->blockstore, snapshot_slot ) );
     773             : 
     774           0 :     while( fgets( buf, sizeof( buf ), file ) ) {
     775           0 :       char *       endptr;
     776           0 :       ulong        slot  = strtoul( buf, &endptr, 10 );
     777           0 :       fd_block_map_query_t query[1] = { 0 };
     778           0 :       int err = fd_block_map_prepare( ctx->blockstore->block_map, &slot, NULL, query, FD_MAP_FLAG_BLOCKING );
     779           0 :       fd_block_info_t * block_map_entry = fd_block_map_query_ele( query );
     780           0 :       if( err || block_map_entry->slot != slot ) {
     781           0 :         FD_LOG_ERR(( "init: slot %lu does not match block_map_entry->slot %lu", slot, block_map_entry->slot ));
     782           0 :       }
     783           0 :       block_map_entry->flags = 0;
     784           0 :       fd_block_map_publish( query );
     785           0 :       fd_store_add_pending( ctx->store, slot, (long)cnt++, 0, 0 );
     786           0 :     }
     787           0 :     fclose( file );
     788           0 :   }
     789             : 
     790           0 :   ctx->shred_cap_ctx.is_archive        = 0;
     791           0 :   ctx->shred_cap_ctx.stable_slot_end   = 0;
     792           0 :   ctx->shred_cap_ctx.stable_slot_start = 0;
     793           0 :   if( strlen( tile->store_int.shred_cap_archive ) > 0 ) {
     794           0 :     ctx->shred_cap_ctx.is_archive      = 1;
     795           0 :     ctx->shred_cap_ctx.shred_cap_fileno = open( tile->store_int.shred_cap_archive,
     796           0 :                                                 O_WRONLY | O_CREAT,
     797           0 :                                                 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH );
     798           0 :     if( ctx->shred_cap_ctx.shred_cap_fileno==-1 ) FD_LOG_ERR(( "failed at opening the shredcap file" ));
     799           0 :   } else if( strlen( tile->store_int.shred_cap_replay )>0 ) {
     800           0 :     ctx->sim                           = 1;
     801           0 :     ctx->sim_end_slot                  = tile->store_int.shred_cap_end_slot;
     802           0 :     FD_LOG_WARNING(( "simulating to slot %lu", ctx->sim_end_slot ));
     803           0 :     ctx->store->blockstore->shmem->wmk = 0UL;
     804           0 :     while( ctx->store->blockstore->shmem->wmk==0UL ) {
     805           0 :       FD_LOG_DEBUG(( "Waiting for blockstore to be initialized" ));
     806           0 :     }
     807           0 :     FD_TEST( fd_shred_cap_replay( tile->store_int.shred_cap_replay, ctx->store ) == FD_SHRED_CAP_OK );
     808           0 :   }
     809             : 
     810           0 : }
     811             : 
     812             : static ulong
     813             : populate_allowed_seccomp( fd_topo_t const *      topo,
     814             :                           fd_topo_tile_t const * tile,
     815             :                           ulong                  out_cnt,
     816           0 :                           struct sock_filter *   out ) {
     817           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     818           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     819           0 :   fd_store_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_store_tile_ctx_t), sizeof(fd_store_tile_ctx_t) );
     820           0 :   FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
     821             : 
     822           0 :   populate_sock_filter_policy_fd_storei_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->blockstore_fd );
     823           0 :   return sock_filter_policy_fd_storei_tile_instr_cnt;
     824           0 : }
     825             : 
     826             : static ulong
     827             : populate_allowed_fds( fd_topo_t const *      topo,
     828             :                       fd_topo_tile_t const * tile,
     829             :                       ulong                  out_fds_cnt,
     830           0 :                       int *                  out_fds ) {
     831           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     832             : 
     833           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     834           0 :   fd_store_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_store_tile_ctx_t), sizeof(fd_store_tile_ctx_t) );
     835           0 :   FD_SCRATCH_ALLOC_FINI( l, sizeof(fd_store_tile_ctx_t) );
     836             : 
     837           0 :   if( FD_UNLIKELY( out_fds_cnt<3UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
     838             : 
     839           0 :   ulong out_cnt = 0UL;
     840           0 :   out_fds[ out_cnt++ ] = STDERR_FILENO;
     841           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
     842           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
     843           0 :   out_fds[ out_cnt++ ] = ctx->blockstore_fd;
     844           0 :   return out_cnt;
     845           0 : }
     846             : 
     847             : static inline void
     848           0 : metrics_write( fd_store_tile_ctx_t * ctx ) {
     849           0 :   FD_MGAUGE_SET( STOREI, CURRENT_TURBINE_SLOT, ctx->metrics.current_turbine_slot );
     850           0 :   FD_MGAUGE_SET( STOREI, FIRST_TURBINE_SLOT, ctx->metrics.first_turbine_slot );
     851           0 : }
     852             : 
     853           0 : #define STEM_BURST (64UL) /* This could be bounded by max # of batches in a slot, so wrth discussing after full refactor */
     854             : 
     855           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_store_tile_ctx_t
     856           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_store_tile_ctx_t)
     857             : 
     858           0 : #define STEM_CALLBACK_AFTER_CREDIT        after_credit
     859           0 : #define STEM_CALLBACK_DURING_FRAG         during_frag
     860           0 : #define STEM_CALLBACK_AFTER_FRAG          after_frag
     861           0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
     862           0 : #define STEM_CALLBACK_METRICS_WRITE       metrics_write
     863             : 
     864             : #include "../../disco/stem/fd_stem.c"
     865             : 
     866             : fd_topo_run_tile_t fd_tile_storei = {
     867             :   .name                     = "storei",
     868             :   .loose_footprint          = loose_footprint,
     869             :   .populate_allowed_seccomp = populate_allowed_seccomp,
     870             :   .populate_allowed_fds     = populate_allowed_fds,
     871             :   .scratch_align            = scratch_align,
     872             :   .scratch_footprint        = scratch_footprint,
     873             :   .privileged_init          = privileged_init,
     874             :   .unprivileged_init        = unprivileged_init,
     875             :   .run                      = stem_run,
     876             : };

Generated by: LCOV version 1.14