LCOV - code coverage report
Current view: top level - discof/shredcap - fd_shredcap_tile.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 502 0.0 %
Date: 2025-09-19 04:41:14 Functions: 0 23 0.0 %

          Line data    Source code
       1             : #define _GNU_SOURCE  /* Enable GNU and POSIX extensions */
       2             : #include "../../disco/topo/fd_topo.h"
       3             : #include "../../disco/net/fd_net_tile.h"
       4             : #include "../../flamenco/types/fd_types.h"
       5             : #include "../../flamenco/fd_flamenco_base.h"
       6             : #include "../../util/pod/fd_pod_format.h"
       7             : #include "../../flamenco/gossip/fd_gossip_types.h"
       8             : #include "../../disco/fd_disco.h"
       9             : #include "../../discof/fd_discof.h"
      10             : #include "../../discof/restore/utils/fd_ssmsg.h"
      11             : #include "../../discof/restore/utils/fd_ssmanifest_parser.h"
      12             : #include "../../flamenco/stakes/fd_stakes.h"
      13             : #include "../../flamenco/runtime/sysvar/fd_sysvar_epoch_schedule.h"
      14             : #include "../../disco/fd_disco.h"
      15             : #include "../../util/pod/fd_pod_format.h"
      16             : #include "../replay/fd_exec.h"
      17             : 
      18             : #include <errno.h>
      19             : #include <fcntl.h>
      20             : #include <sys/mman.h>
      21             : #include <sys/stat.h>
      22             : #include <string.h>
      23             : #include <stdio.h>
      24             : #include <unistd.h>
      25             : #include <linux/unistd.h>
      26             : #include <sys/socket.h>
      27             : #include <linux/if_xdp.h>
      28             : #include "generated/fd_shredcap_tile_seccomp.h"
      29             : 
      30             : 
      31             : /* This tile currently has two functionalities.
      32             : 
      33             :    The first is spying on the net_shred, repair_net, and shred_repair
      34             :    links and currently outputs to a csv that can analyze repair
      35             :    performance in post.
      36             : 
      37             :    The second is to capture the bank hashes from the replay tile and
      38             :    slices of shreds from the repair tile.  These are outputted to binary
      39             :    files that can be used to reproduce a live replay execution. */
      40             : 
      41           0 : #define FD_SHREDCAP_DEFAULT_WRITER_BUF_SZ  (4096UL)  /* local filesystem block size */
      42           0 : #define FD_SHREDCAP_ALLOC_TAG              (4UL)
      43             : #define MAX_BUFFER_SIZE                    (20000UL * sizeof(fd_shred_dest_wire_t))
      44           0 : #define MANIFEST_MAX_TOTAL_BANKS           (2UL) /* the minimum is 2 */
      45           0 : #define MANIFEST_MAX_FORK_WIDTH            (1UL) /* banks are only needed during publish_stake_weights() */
      46             : 
      47           0 : #define NET_SHRED        (0UL)
      48           0 : #define REPAIR_NET       (1UL)
      49           0 : #define SHRED_REPAIR     (2UL)
      50           0 : #define GOSSIP_OUT       (3UL)
      51           0 : #define REPAIR_SHREDCAP (4UL)
      52           0 : #define REPLAY_SHREDCAP (5UL)
      53             : 
      54             : typedef union {
      55             :   struct {
      56             :     fd_wksp_t * mem;
      57             :     ulong       chunk0;
      58             :     ulong       wmark;
      59             :   };
      60             :   fd_net_rx_bounds_t net_rx;
      61             : } fd_capture_in_ctx_t;
      62             : 
      63             : struct out_link {
      64             :   ulong       idx;
      65             :   fd_frag_meta_t * mcache;
      66             :   ulong *          sync;
      67             :   ulong            depth;
      68             :   ulong            seq;
      69             :   fd_wksp_t * mem;
      70             :   ulong       chunk0;
      71             :   ulong       wmark;
      72             :   ulong       chunk;
      73             : };
      74             : typedef struct out_link out_link_t;
      75             : 
      76             : struct fd_capture_tile_ctx {
      77             :   uchar               in_kind[ 32 ];
      78             :   fd_capture_in_ctx_t in_links[ 32 ];
      79             : 
      80             :   int skip_frag;
      81             :   ushort repair_intake_listen_port;
      82             : 
      83             :   ulong shred_buffer_sz;
      84             :   uchar shred_buffer[ FD_NET_MTU ];
      85             : 
      86             :   ulong repair_buffer_sz;
      87             :   uchar repair_buffer[ FD_NET_MTU ];
      88             : 
      89             :   out_link_t           stake_out[1];
      90             :   out_link_t           snap_out[1];
      91             :   int                  enable_publish_stake_weights;
      92             :   ulong *              manifest_wmark;
      93             :   uchar *              manifest_bank_mem;
      94             :   uchar *              mainfest_exec_slot_ctx_mem;
      95             :   fd_exec_slot_ctx_t * manifest_exec_slot_ctx;
      96             :   char                 manifest_path[ PATH_MAX ];
      97             :   int                  manifest_load_done;
      98             :   uchar *              manifest_spad_mem;
      99             :   fd_spad_t *          manifest_spad;
     100             :   uchar *              shared_spad_mem;
     101             :   fd_spad_t *          shared_spad;
     102             : 
     103             :   fd_ip4_udp_hdrs_t intake_hdr[1];
     104             : 
     105             :   ulong now;
     106             :   ulong  last_packet_ns;
     107             :   double tick_per_ns;
     108             : 
     109             :   fd_io_buffered_ostream_t shred_ostream;
     110             :   fd_io_buffered_ostream_t repair_ostream;
     111             :   fd_io_buffered_ostream_t fecs_ostream;
     112             :   fd_io_buffered_ostream_t peers_ostream;
     113             :   fd_io_buffered_ostream_t slices_ostream;
     114             :   fd_io_buffered_ostream_t bank_hashes_ostream;
     115             : 
     116             :   int shreds_fd; /* shreds snooped from net_shred */
     117             :   int requests_fd;
     118             :   int fecs_fd;
     119             :   int peers_fd;
     120             :   int slices_fd; /* all shreds in slices from repair tile */
     121             :   int bank_hashes_fd; /* bank hashes from writer tile */
     122             : 
     123             :   ulong write_buf_sz;
     124             : 
     125             :   uchar * shreds_buf;
     126             :   uchar * requests_buf;
     127             :   uchar * fecs_buf;
     128             :   uchar * peers_buf;
     129             :   uchar * slices_buf;
     130             :   uchar * bank_hashes_buf;
     131             : 
     132             :   fd_alloc_t * alloc;
     133             :   uchar contact_info_buffer[ MAX_BUFFER_SIZE ];
     134             : };
     135             : typedef struct fd_capture_tile_ctx fd_capture_tile_ctx_t;
     136             : 
     137             : FD_FN_CONST static inline ulong
     138           0 : scratch_align( void ) {
     139           0 :   return 4096UL;
     140           0 : }
     141             : 
     142             : FD_FN_CONST static inline ulong
     143           0 : manifest_bank_align( void ) {
     144           0 :   return fd_banks_align();
     145           0 : }
     146             : 
     147             : FD_FN_CONST static inline ulong
     148           0 : manifest_bank_footprint( void ) {
     149           0 :   return fd_banks_footprint( MANIFEST_MAX_TOTAL_BANKS, MANIFEST_MAX_FORK_WIDTH );
     150           0 : }
     151             : 
     152             : FD_FN_CONST static inline ulong
     153           0 : manifest_load_align( void ) {
     154           0 :   return 128UL;
     155           0 : }
     156             : 
     157             : FD_FN_CONST static inline ulong
     158           0 : manifest_load_footprint( void ) {
     159             :   /* A manifest typically requires 1GB, but closer to 2GB
     160             :      have been observed in mainnet.  The footprint is then
     161             :      set to 2GB.  TODO a future adjustment may be needed. */
     162           0 :   return 2UL * FD_SHMEM_GIGANTIC_PAGE_SZ;
     163           0 : }
     164             : 
     165             : FD_FN_CONST static inline ulong
     166           0 : manifest_spad_max_alloc_align( void ) {
     167           0 :   return FD_SPAD_ALIGN;
     168           0 : }
     169             : 
     170             : FD_FN_CONST static inline ulong
     171           0 : manifest_spad_max_alloc_footprint( void ) {
     172             :   /* The amount of memory required in the manifest load
     173             :      scratchpad to process it tends to be slightly larger
     174             :      than the manifest load footprint. */
     175           0 :   return manifest_load_footprint() + 128UL * FD_SHMEM_HUGE_PAGE_SZ;
     176           0 : }
     177             : 
     178             : FD_FN_CONST static inline ulong
     179           0 : shared_spad_max_alloc_align( void ) {
     180           0 :   return FD_SPAD_ALIGN;
     181           0 : }
     182             : 
     183             : FD_FN_CONST static inline ulong
     184           0 : shared_spad_max_alloc_footprint( void ) {
     185             :   /* The shared scratchpad is used by the manifest banks
     186             :      and by the manifest load (but not at the same time).
     187             :      The footprint for the banks needs to be equal to
     188             :      banks footprint (at least for the current setup with
     189             :      MANIFEST_MAX_TOTAL_BANKS==2). */
     190           0 :   return fd_ulong_max( manifest_bank_footprint(), manifest_load_footprint() );
     191           0 : }
     192             : 
     193             : FD_FN_PURE static inline ulong
     194           0 : loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
     195           0 :   ulong footprint = sizeof(fd_capture_tile_ctx_t) + FD_EXEC_SLOT_CTX_FOOTPRINT
     196           0 :                     + manifest_bank_footprint()
     197           0 :                     + fd_spad_footprint( manifest_spad_max_alloc_footprint() )
     198           0 :                     + fd_spad_footprint( shared_spad_max_alloc_footprint() )
     199           0 :                     + fd_alloc_footprint();
     200           0 :   return fd_ulong_align_up( footprint, FD_SHMEM_GIGANTIC_PAGE_SZ );
     201           0 : }
     202             : 
     203             : 
     204             : static ulong
     205             : populate_allowed_seccomp( fd_topo_t const *      topo FD_PARAM_UNUSED,
     206             :                           fd_topo_tile_t const * tile,
     207             :                           ulong                  out_cnt,
     208           0 :                           struct sock_filter *   out ) {
     209           0 :   populate_sock_filter_policy_fd_shredcap_tile( out_cnt,
     210           0 :                                              out,
     211           0 :                                              (uint)fd_log_private_logfile_fd(),
     212           0 :                                              (uint)tile->shredcap.shreds_fd,
     213           0 :                                              (uint)tile->shredcap.requests_fd,
     214           0 :                                              (uint)tile->shredcap.fecs_fd,
     215           0 :                                              (uint)tile->shredcap.peers_fd );
     216           0 :   return sock_filter_policy_fd_shredcap_tile_instr_cnt;
     217           0 : }
     218             : 
     219             : FD_FN_PURE static inline ulong
     220           0 : scratch_footprint( fd_topo_tile_t const * tile ) {
     221           0 :   (void)tile;
     222           0 :   ulong l = FD_LAYOUT_INIT;
     223           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_capture_tile_ctx_t),  sizeof(fd_capture_tile_ctx_t) );
     224           0 :   l = FD_LAYOUT_APPEND( l, FD_EXEC_SLOT_CTX_ALIGN,          FD_EXEC_SLOT_CTX_FOOTPRINT );
     225           0 :   l = FD_LAYOUT_APPEND( l, manifest_bank_align(),           manifest_bank_footprint() );
     226           0 :   l = FD_LAYOUT_APPEND( l, manifest_spad_max_alloc_align(), fd_spad_footprint( manifest_spad_max_alloc_footprint() ) );
     227           0 :   l = FD_LAYOUT_APPEND( l, shared_spad_max_alloc_align(),   fd_spad_footprint( shared_spad_max_alloc_footprint() ) );
     228           0 :   l = FD_LAYOUT_APPEND( l, fd_alloc_align(),                fd_alloc_footprint() );
     229           0 :   return FD_LAYOUT_FINI( l, scratch_align() );
     230           0 : }
     231             : 
     232             : static void
     233             : publish_stake_weights_manifest( fd_capture_tile_ctx_t * ctx,
     234             :                                 fd_stem_context_t *    stem,
     235           0 :                                 fd_snapshot_manifest_t const * manifest ) {
     236           0 :   fd_epoch_schedule_t const * schedule = fd_type_pun_const( &manifest->epoch_schedule_params );
     237           0 :   ulong epoch = fd_slot_to_epoch( schedule, manifest->slot, NULL );
     238             : 
     239             :   /* current epoch */
     240           0 :   ulong * stake_weights_msg = fd_chunk_to_laddr( ctx->stake_out->mem, ctx->stake_out->chunk );
     241           0 :   ulong stake_weights_sz = generate_stake_weight_msg_manifest( epoch, schedule, &manifest->epoch_stakes[0], stake_weights_msg );
     242           0 :   ulong stake_weights_sig = 4UL;
     243           0 :   fd_stem_publish( stem, 0UL, stake_weights_sig, ctx->stake_out->chunk, stake_weights_sz, 0UL, 0UL, fd_frag_meta_ts_comp( fd_tickcount() ) );
     244           0 :   ctx->stake_out->chunk = fd_dcache_compact_next( ctx->stake_out->chunk, stake_weights_sz, ctx->stake_out->chunk0, ctx->stake_out->wmark );
     245           0 :   FD_LOG_NOTICE(("sending current epoch stake weights - epoch: %lu, stake_weight_cnt: %lu, start_slot: %lu, slot_cnt: %lu", stake_weights_msg[0], stake_weights_msg[1], stake_weights_msg[2], stake_weights_msg[3]));
     246             : 
     247             :   /* next current epoch */
     248           0 :   stake_weights_msg = fd_chunk_to_laddr( ctx->stake_out->mem, ctx->stake_out->chunk );
     249           0 :   stake_weights_sz = generate_stake_weight_msg_manifest( epoch + 1, schedule, &manifest->epoch_stakes[1], stake_weights_msg );
     250           0 :   stake_weights_sig = 4UL;
     251           0 :   fd_stem_publish( stem, 0UL, stake_weights_sig, ctx->stake_out->chunk, stake_weights_sz, 0UL, 0UL, fd_frag_meta_ts_comp( fd_tickcount() ) );
     252           0 :   ctx->stake_out->chunk = fd_dcache_compact_next( ctx->stake_out->chunk, stake_weights_sz, ctx->stake_out->chunk0, ctx->stake_out->wmark );
     253           0 :   FD_LOG_NOTICE(("sending next epoch stake weights - epoch: %lu, stake_weight_cnt: %lu, start_slot: %lu, slot_cnt: %lu", stake_weights_msg[0], stake_weights_msg[1], stake_weights_msg[2], stake_weights_msg[3]));
     254           0 : }
     255             : 
     256             : static inline int
     257             : before_frag( fd_capture_tile_ctx_t * ctx,
     258             :              ulong            in_idx,
     259             :              ulong            seq FD_PARAM_UNUSED,
     260           0 :              ulong            sig ) {
     261           0 :   if( FD_LIKELY( ctx->in_kind[ in_idx ]==NET_SHRED ) ) {
     262           0 :     return (int)(fd_disco_netmux_sig_proto( sig )!=DST_PROTO_SHRED) & (int)(fd_disco_netmux_sig_proto( sig )!=DST_PROTO_REPAIR);
     263           0 :   } else if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==GOSSIP_OUT)) {
     264           0 :     return sig!=FD_GOSSIP_UPDATE_TAG_CONTACT_INFO;
     265           0 :   }
     266           0 :   return 0;
     267           0 : }
     268             : 
     269             : static inline void
     270             : handle_new_contact_info( fd_capture_tile_ctx_t * ctx,
     271           0 :                          uchar const *           buf ) {
     272           0 :   fd_gossip_update_message_t const * msg = (fd_gossip_update_message_t const *)fd_type_pun_const( buf );
     273           0 :   char tvu_buf[1024];
     274           0 :   char repair_buf[1024];
     275           0 :   fd_ip4_port_t tvu    = msg->contact_info.contact_info->sockets[ FD_CONTACT_INFO_SOCKET_TVU ];
     276           0 :   fd_ip4_port_t repair = msg->contact_info.contact_info->sockets[ FD_CONTACT_INFO_SOCKET_SERVE_REPAIR ];
     277             : 
     278           0 :   if( FD_UNLIKELY( tvu.l!=0UL ) ){
     279           0 :     snprintf( tvu_buf, sizeof(tvu_buf),
     280           0 :               "%u,%u(tvu),%s,%d\n",
     281           0 :               tvu.addr, tvu.port, FD_BASE58_ENC_32_ALLOCA(msg->contact_info.contact_info->pubkey.uc), 1);
     282           0 :     int err = fd_io_buffered_ostream_write( &ctx->peers_ostream, tvu_buf, strlen(tvu_buf) );
     283           0 :     FD_TEST( err==0 );
     284           0 :   }
     285           0 :   if( FD_UNLIKELY( repair.l!=0UL ) ){
     286           0 :     snprintf( repair_buf, sizeof(repair_buf),
     287           0 :               "%u,%u(repair),%s,%d\n",
     288           0 :               repair.addr, repair.port, FD_BASE58_ENC_32_ALLOCA(msg->contact_info.contact_info->pubkey.uc), 1);
     289           0 :     int err = fd_io_buffered_ostream_write( &ctx->peers_ostream, repair_buf, strlen(repair_buf) );
     290           0 :     FD_TEST( err==0 );
     291           0 :   }
     292           0 : }
     293             : 
     294             : static int
     295           0 : is_fec_completes_msg( ulong sz ) {
     296           0 :   return sz == FD_SHRED_DATA_HEADER_SZ + 2 * FD_SHRED_MERKLE_ROOT_SZ;
     297           0 : }
     298             : 
     299             : static inline void
     300             : during_frag( fd_capture_tile_ctx_t * ctx,
     301             :              ulong                   in_idx,
     302             :              ulong                   seq FD_PARAM_UNUSED,
     303             :              ulong                   sig,
     304             :              ulong                   chunk,
     305             :              ulong                   sz,
     306           0 :              ulong                   ctl ) {
     307           0 :   ctx->skip_frag = 0;
     308           0 :   if( ctx->in_kind[ in_idx ]==SHRED_REPAIR ) {
     309           0 :     if( !is_fec_completes_msg( sz ) ) {
     310           0 :       ctx->skip_frag = 1;
     311           0 :       return;
     312           0 :     }
     313           0 :     fd_memcpy( ctx->shred_buffer, fd_chunk_to_laddr_const( ctx->in_links[ in_idx ].mem, chunk ), sz );
     314           0 :     ctx->shred_buffer_sz = sz;
     315           0 :   } else if( ctx->in_kind[ in_idx ] == NET_SHRED ) {
     316           0 :     uchar const * dcache_entry = fd_net_rx_translate_frag( &ctx->in_links[ in_idx ].net_rx, chunk, ctl, sz );
     317           0 :     ulong hdr_sz = fd_disco_netmux_sig_hdr_sz( sig );
     318           0 :     FD_TEST( hdr_sz <= sz ); /* Should be ensured by the net tile */
     319           0 :     fd_shred_t const * shred = fd_shred_parse( dcache_entry+hdr_sz, sz-hdr_sz );
     320           0 :     if( FD_UNLIKELY( !shred ) ) {
     321           0 :       ctx->skip_frag = 1;
     322           0 :       return;
     323           0 :     };
     324           0 :     fd_memcpy( ctx->shred_buffer, dcache_entry, sz );
     325           0 :     ctx->shred_buffer_sz = sz-hdr_sz;
     326           0 :   } else if( ctx->in_kind[ in_idx ] == REPAIR_NET ) {
     327             :     /* Repair will have outgoing pings, outgoing repair requests, and
     328             :        outgoing served shreds we want to filter everything but the
     329             :        repair requests.
     330             :        1. We can index into the ip4 udp packet hdr and check if the src
     331             :           port is the intake listen port or serve port
     332             :        2. Then we can filter on the discriminant which luckily does not
     333             :           require decoding! */
     334             : 
     335           0 :     uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in_links[ in_idx ].mem, chunk );
     336           0 :     fd_ip4_udp_hdrs_t const * hdr = (fd_ip4_udp_hdrs_t const *)dcache_entry;
     337           0 :     if( hdr->udp->net_sport != fd_ushort_bswap( ctx->repair_intake_listen_port ) ) {
     338           0 :       ctx->skip_frag = 1;
     339           0 :       return;
     340           0 :     }
     341           0 :     const uchar * encoded_protocol = dcache_entry + sizeof(fd_ip4_udp_hdrs_t);
     342           0 :     uint discriminant = FD_LOAD(uint, encoded_protocol);
     343             : 
     344           0 :     if( FD_UNLIKELY( discriminant <= fd_repair_protocol_enum_pong ) ) {
     345           0 :       ctx->skip_frag = 1;
     346           0 :       return;
     347           0 :     }
     348           0 :     fd_memcpy( ctx->repair_buffer, dcache_entry, sz );
     349           0 :     ctx->repair_buffer_sz = sz;
     350           0 :   } else if( ctx->in_kind[ in_idx ] == REPAIR_SHREDCAP ) {
     351             : 
     352           0 :     uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in_links[ in_idx ].mem, chunk );
     353             : 
     354             :     /* FIXME this should all be happening in after_frag */
     355             : 
     356             :     /* We expect to get all of the data shreds in a batch at once.  When
     357             :        we do we will write the header, the shreds, and a trailer. */
     358           0 :     ulong payload_sz = sig;
     359           0 :     fd_shredcap_slice_header_msg_t header = {
     360           0 :       .magic      = FD_SHREDCAP_SLICE_HEADER_MAGIC,
     361           0 :       .version    = FD_SHREDCAP_SLICE_HEADER_V1,
     362           0 :       .payload_sz = payload_sz,
     363           0 :     };
     364           0 :     int err;
     365           0 :     err = fd_io_buffered_ostream_write( &ctx->slices_ostream, &header, FD_SHREDCAP_SLICE_HEADER_FOOTPRINT );
     366           0 :     if( FD_UNLIKELY( err != 0 ) ) {
     367           0 :       FD_LOG_CRIT(( "failed to write slice header %d", err ));
     368           0 :     }
     369           0 :     err = fd_io_buffered_ostream_write( &ctx->slices_ostream, dcache_entry, payload_sz );
     370           0 :     if( FD_UNLIKELY( err != 0 ) ) {
     371           0 :       FD_LOG_CRIT(( "failed to write slice data %d", err ));
     372           0 :     }
     373           0 :     fd_shredcap_slice_trailer_msg_t trailer = {
     374           0 :       .magic   = FD_SHREDCAP_SLICE_TRAILER_MAGIC,
     375           0 :       .version = FD_SHREDCAP_SLICE_TRAILER_V1,
     376           0 :     };
     377           0 :     err = fd_io_buffered_ostream_write( &ctx->slices_ostream, &trailer, FD_SHREDCAP_SLICE_TRAILER_FOOTPRINT );
     378           0 :     if( FD_UNLIKELY( err != 0 ) ) {
     379           0 :       FD_LOG_CRIT(( "failed to write slice trailer %d", err ));
     380           0 :     }
     381             : 
     382           0 :   } else if( ctx->in_kind[ in_idx ] == REPLAY_SHREDCAP ) {
     383             : 
     384             :     /* FIXME this should all be happening in after_frag */
     385             : 
     386           0 :    uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in_links[ in_idx ].mem, chunk );
     387           0 :    fd_shredcap_bank_hash_msg_t bank_hash_msg = {
     388           0 :      .magic   = FD_SHREDCAP_BANK_HASH_MAGIC,
     389           0 :      .version = FD_SHREDCAP_BANK_HASH_V1
     390           0 :    };
     391           0 :    fd_memcpy( &bank_hash_msg.bank_hash, dcache_entry, sizeof(fd_hash_t) );
     392           0 :    fd_memcpy( &bank_hash_msg.slot, dcache_entry+sizeof(fd_hash_t), sizeof(ulong) );
     393             : 
     394           0 :    fd_io_buffered_ostream_write( &ctx->bank_hashes_ostream, &bank_hash_msg, FD_SHREDCAP_BANK_HASH_FOOTPRINT );
     395             : 
     396           0 :   } else {
     397             :     // contact infos can be copied into a buffer
     398           0 :     if( FD_UNLIKELY( chunk<ctx->in_links[ in_idx ].chunk0 || chunk>ctx->in_links[ in_idx ].wmark ) ) {
     399           0 :       FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
     400           0 :                    ctx->in_links[ in_idx ].chunk0, ctx->in_links[ in_idx ].wmark ));
     401           0 :     }
     402           0 :     uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in_links[ in_idx ].mem, chunk );
     403           0 :     fd_memcpy( ctx->contact_info_buffer, dcache_entry, sz );
     404           0 :   }
     405           0 : }
     406             : 
     407             : static void
     408             : after_credit( fd_capture_tile_ctx_t * ctx,
     409             :               fd_stem_context_t *     stem,
     410             :               int *                   opt_poll_in FD_PARAM_UNUSED,
     411           0 :               int *                   charge_busy FD_PARAM_UNUSED ) {
     412             : 
     413           0 :   if( FD_UNLIKELY( !ctx->manifest_load_done ) ) {
     414           0 :     if( FD_LIKELY( !!strcmp( ctx->manifest_path, "") ) ) {
     415             :       /* ctx->manifest_spad will hold the processed manifest. */
     416           0 :       fd_spad_reset( ctx->manifest_spad );
     417             :       /* do not pop from ctx->manifest_spad, the manifest needs
     418             :          to remain available until a new manifest is processed. */
     419             : 
     420           0 :       int fd = open( ctx->manifest_path, O_RDONLY );
     421           0 :       if( FD_UNLIKELY( fd < 0 ) ) {
     422           0 :         FD_LOG_WARNING(( "open(%s) failed (%d-%s)", ctx->manifest_path, errno, fd_io_strerror( errno ) ));
     423           0 :         return;
     424           0 :       }
     425           0 :       FD_LOG_NOTICE(( "manifest %s.", ctx->manifest_path ));
     426             : 
     427           0 :       fd_snapshot_manifest_t * manifest = NULL;
     428           0 :       FD_SPAD_FRAME_BEGIN( ctx->manifest_spad ) {
     429           0 :         manifest = fd_spad_alloc( ctx->manifest_spad, alignof(fd_snapshot_manifest_t), sizeof(fd_snapshot_manifest_t) );
     430           0 :       } FD_SPAD_FRAME_END;
     431           0 :       FD_TEST( manifest );
     432             : 
     433           0 :       FD_SPAD_FRAME_BEGIN( ctx->shared_spad ) {
     434           0 :         uchar * buf    = fd_spad_alloc( ctx->shared_spad, manifest_load_align(), manifest_load_footprint() );
     435           0 :         ulong   buf_sz = 0;
     436           0 :         FD_TEST( !fd_io_read( fd, buf/*dst*/, 0/*dst_min*/, manifest_load_footprint()-1UL /*dst_max*/, &buf_sz ) );
     437             : 
     438           0 :         fd_ssmanifest_parser_t * parser = fd_ssmanifest_parser_join( fd_ssmanifest_parser_new( aligned_alloc(
     439           0 :                 fd_ssmanifest_parser_align(), fd_ssmanifest_parser_footprint( 1UL<<24UL ) ), 1UL<<24UL, 42UL ) );
     440           0 :         FD_TEST( parser );
     441           0 :         fd_ssmanifest_parser_init( parser, manifest );
     442           0 :         int parser_err = fd_ssmanifest_parser_consume( parser, buf, buf_sz );
     443           0 :         if( FD_UNLIKELY( parser_err ) ) FD_LOG_ERR(( "fd_ssmanifest_parser_consume failed (%d)", parser_err ));
     444           0 :       } FD_SPAD_FRAME_END;
     445           0 :       FD_LOG_NOTICE(( "manifest bank slot %lu", manifest->slot ));
     446             : 
     447           0 :       fd_fseq_update( ctx->manifest_wmark, manifest->slot );
     448             : 
     449           0 :       uchar * chunk = fd_chunk_to_laddr( ctx->snap_out->mem, ctx->snap_out->chunk );
     450           0 :       ulong   sz    = sizeof(fd_snapshot_manifest_t);
     451           0 :       ulong   sig   = fd_ssmsg_sig( FD_SSMSG_MANIFEST_INCREMENTAL );
     452           0 :       memcpy( chunk, manifest, sz );
     453           0 :       fd_stem_publish( stem, ctx->snap_out->idx, sig, ctx->snap_out->chunk, sz, 0UL, 0UL, fd_frag_meta_ts_comp( fd_tickcount() ) );
     454           0 :       ctx->snap_out->chunk = fd_dcache_compact_next( ctx->snap_out->chunk, sz, ctx->snap_out->chunk0, ctx->snap_out->wmark );
     455             : 
     456           0 :       fd_stem_publish( stem, ctx->snap_out->idx, fd_ssmsg_sig( FD_SSMSG_DONE ), 0UL, 0UL, 0UL, 0UL, 0UL );
     457             : 
     458           0 :       publish_stake_weights_manifest( ctx, stem, manifest );
     459             :       //*charge_busy = 0;
     460           0 :     }
     461             :     /* No need to strcmp every time after_credit is called. */
     462           0 :     ctx->manifest_load_done = 1;
     463           0 :   }
     464           0 : }
     465             : 
     466             : static inline void
     467             : after_frag( fd_capture_tile_ctx_t * ctx,
     468             :             ulong                   in_idx,
     469             :             ulong                   seq    FD_PARAM_UNUSED,
     470             :             ulong                   sig,
     471             :             ulong                   sz,
     472             :             ulong                   tsorig FD_PARAM_UNUSED,
     473             :             ulong                   tspub  FD_PARAM_UNUSED,
     474           0 :             fd_stem_context_t *     stem   FD_PARAM_UNUSED ) {
     475           0 :   if( FD_UNLIKELY( ctx->skip_frag ) ) return;
     476             : 
     477           0 :   if( ctx->in_kind[ in_idx ] == SHRED_REPAIR ) {
     478             :     /* This is a fec completes message! we can use it to check how long
     479             :        it takes to complete a fec */
     480             : 
     481           0 :     fd_shred_t const * shred = (fd_shred_t *)fd_type_pun( ctx->shred_buffer );
     482           0 :     uint data_cnt = fd_disco_shred_repair_fec_sig_data_cnt( sig );
     483           0 :     uint ref_tick = shred->data.flags & FD_SHRED_DATA_REF_TICK_MASK;
     484           0 :     char fec_complete[1024];
     485           0 :     snprintf( fec_complete, sizeof(fec_complete),
     486           0 :              "%ld,%lu,%u,%u,%u\n",
     487           0 :               fd_log_wallclock(), shred->slot, ref_tick, shred->fec_set_idx, data_cnt );
     488             : 
     489             :     // Last shred is guaranteed to be a data shred
     490             : 
     491             : 
     492           0 :     int err = fd_io_buffered_ostream_write( &ctx->fecs_ostream, fec_complete, strlen(fec_complete) );
     493           0 :     FD_TEST( err==0 );
     494           0 :   } else if( ctx->in_kind[ in_idx ] == NET_SHRED ) {
     495             :     /* TODO: leader schedule early exits in shred tile right around
     496             :        startup, which discards some turbine shreds, but there is a
     497             :        chance we capture this shred here. Currently handled in post, but
     498             :        in the future will want to get the leader schedule here so we can
     499             :        also benchmark whether the excepcted sender in the turbine tree
     500             :        matches the actual sender. */
     501             : 
     502           0 :     ulong hdr_sz     = fd_disco_netmux_sig_hdr_sz( sig );
     503           0 :     fd_ip4_udp_hdrs_t * hdr = (fd_ip4_udp_hdrs_t *)ctx->shred_buffer;
     504           0 :     uint src_ip4_addr = hdr->ip4->saddr;
     505           0 :     ushort src_port   = hdr->udp->net_sport;
     506             : 
     507           0 :     fd_shred_t const * shred = fd_shred_parse( ctx->shred_buffer + hdr_sz, sz - hdr_sz );
     508           0 :     int   is_turbine = fd_disco_netmux_sig_proto( sig ) == DST_PROTO_SHRED;
     509           0 :     uint  nonce      = is_turbine ? 0 : FD_LOAD(uint, ctx->shred_buffer + hdr_sz + fd_shred_sz( shred ) );
     510           0 :     int   is_data    = fd_shred_is_data( fd_shred_type( shred->variant ) );
     511           0 :     ulong slot       = shred->slot;
     512           0 :     uint  idx        = shred->idx;
     513           0 :     uint  fec_idx    = shred->fec_set_idx;
     514           0 :     uint  ref_tick   = 65;
     515           0 :     if( FD_UNLIKELY( is_turbine && is_data ) ) {
     516             :       /* We can then index into the flag and get a REFTICK */
     517           0 :       ref_tick = shred->data.flags & FD_SHRED_DATA_REF_TICK_MASK;
     518           0 :     }
     519             : 
     520           0 :     char repair_data_buf[1024];
     521           0 :     snprintf( repair_data_buf, sizeof(repair_data_buf),
     522           0 :              "%u,%u,%ld,%lu,%u,%u,%u,%d,%d,%u\n",
     523           0 :               src_ip4_addr, src_port, fd_log_wallclock(), slot, ref_tick, fec_idx, idx, is_turbine, is_data, nonce );
     524             : 
     525           0 :     int err = fd_io_buffered_ostream_write( &ctx->shred_ostream, repair_data_buf, strlen(repair_data_buf) );
     526           0 :     FD_TEST( err==0 );
     527           0 :   } else if( ctx->in_kind[ in_idx ] == REPAIR_NET ) {
     528             :     /* We have a valid repair request that we can finally decode.
     529             :        Unfortunately we actually have to decode because we cant cast
     530             :        directly to the protocol */
     531           0 :     fd_ip4_udp_hdrs_t * hdr = (fd_ip4_udp_hdrs_t *)ctx->repair_buffer;
     532           0 :     fd_repair_protocol_t protocol;
     533           0 :     fd_bincode_decode_ctx_t bctx = { .data = ctx->repair_buffer + sizeof(fd_ip4_udp_hdrs_t), .dataend = ctx->repair_buffer + ctx->repair_buffer_sz };
     534           0 :     fd_repair_protocol_t * decoded = fd_repair_protocol_decode( &protocol, &bctx );
     535             : 
     536           0 :     FD_TEST( decoded == &protocol );
     537           0 :     FD_TEST( decoded != NULL );
     538             : 
     539           0 :     uint   peer_ip4_addr = hdr->ip4->daddr;
     540           0 :     ushort peer_port     = hdr->udp->net_dport;
     541           0 :     ulong  slot          = 0UL;
     542           0 :     ulong  shred_index   = UINT_MAX;
     543           0 :     uint   nonce         = 0U;
     544             : 
     545           0 :     switch( protocol.discriminant ) {
     546           0 :       case fd_repair_protocol_enum_window_index: {
     547           0 :         slot        = protocol.inner.window_index.slot;
     548           0 :         shred_index = protocol.inner.window_index.shred_index;
     549           0 :         nonce       = protocol.inner.window_index.header.nonce;
     550           0 :         break;
     551           0 :       }
     552           0 :       case fd_repair_protocol_enum_highest_window_index: {
     553           0 :         slot        = protocol.inner.highest_window_index.slot;
     554           0 :         shred_index = protocol.inner.highest_window_index.shred_index;
     555           0 :         nonce       = protocol.inner.highest_window_index.header.nonce;
     556           0 :         break;
     557           0 :       }
     558           0 :       case fd_repair_protocol_enum_orphan: {
     559           0 :         slot  = protocol.inner.orphan.slot;
     560           0 :         nonce = protocol.inner.orphan.header.nonce;
     561           0 :         break;
     562           0 :       }
     563           0 :       default:
     564           0 :         break;
     565           0 :     }
     566             : 
     567           0 :     char repair_data_buf[1024];
     568           0 :     snprintf( repair_data_buf, sizeof(repair_data_buf),
     569           0 :               "%u,%u,%ld,%u,%lu,%lu\n",
     570           0 :               peer_ip4_addr, peer_port, fd_log_wallclock(), nonce, slot, shred_index );
     571           0 :     int err = fd_io_buffered_ostream_write( &ctx->repair_ostream, repair_data_buf, strlen(repair_data_buf) );
     572           0 :     FD_TEST( err==0 );
     573           0 :   } else if( ctx->in_kind[ in_idx ] == GOSSIP_OUT ) {
     574           0 :     handle_new_contact_info( ctx, ctx->contact_info_buffer );
     575           0 :   }
     576           0 : }
     577             : 
     578             : static ulong
     579             : populate_allowed_fds( fd_topo_t const      * topo        FD_PARAM_UNUSED,
     580             :                       fd_topo_tile_t const * tile,
     581             :                       ulong                  out_fds_cnt FD_PARAM_UNUSED,
     582           0 :                       int *                  out_fds ) {
     583           0 :   ulong out_cnt = 0UL;
     584             : 
     585           0 :   out_fds[ out_cnt++ ] = 2; /* stderr */
     586           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
     587           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
     588           0 :   if( FD_LIKELY( -1!=tile->shredcap.shreds_fd ) )
     589           0 :     out_fds[ out_cnt++ ] = tile->shredcap.shreds_fd; /* shred file */
     590           0 :   if( FD_LIKELY( -1!=tile->shredcap.requests_fd ) )
     591           0 :     out_fds[ out_cnt++ ] = tile->shredcap.requests_fd; /* request file */
     592           0 :   if( FD_LIKELY( -1!=tile->shredcap.fecs_fd ) )
     593           0 :     out_fds[ out_cnt++ ] = tile->shredcap.fecs_fd; /* fec complete file */
     594           0 :   if( FD_LIKELY( -1!=tile->shredcap.peers_fd ) )
     595           0 :     out_fds[ out_cnt++ ] = tile->shredcap.peers_fd; /* peers file */
     596           0 :   if( FD_LIKELY( -1!=tile->shredcap.slices_fd ) )
     597           0 :     out_fds[ out_cnt++ ] = tile->shredcap.slices_fd; /* slices file */
     598           0 :   if( FD_LIKELY( -1!=tile->shredcap.bank_hashes_fd ) )
     599           0 :     out_fds[ out_cnt++ ] = tile->shredcap.bank_hashes_fd; /* bank hashes file */
     600             : 
     601           0 :   return out_cnt;
     602           0 : }
     603             : 
     604             : static void
     605             : privileged_init( fd_topo_t *      topo FD_PARAM_UNUSED,
     606           0 :                  fd_topo_tile_t * tile ) {
     607           0 :   char file_path[PATH_MAX];
     608           0 :   strcpy( file_path, tile->shredcap.folder_path );
     609           0 :   strcat( file_path, "/shred_data.csv" );
     610           0 :   tile->shredcap.shreds_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
     611           0 :   if ( FD_UNLIKELY( tile->shredcap.shreds_fd == -1 ) ) {
     612           0 :     FD_LOG_ERR(( "failed to open or create shred csv dump file %s %d %s", file_path, errno, strerror(errno) ));
     613           0 :   }
     614             : 
     615           0 :   strcpy( file_path, tile->shredcap.folder_path );
     616           0 :   strcat( file_path, "/request_data.csv" );
     617           0 :   tile->shredcap.requests_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
     618           0 :   if ( FD_UNLIKELY( tile->shredcap.requests_fd == -1 ) ) {
     619           0 :     FD_LOG_ERR(( "failed to open or create request csv dump file %s %d %s", file_path, errno, strerror(errno) ));
     620           0 :   }
     621             : 
     622           0 :   strcpy( file_path, tile->shredcap.folder_path );
     623           0 :   strcat( file_path, "/fec_complete.csv" );
     624           0 :   tile->shredcap.fecs_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
     625           0 :   if ( FD_UNLIKELY( tile->shredcap.fecs_fd == -1 ) ) {
     626           0 :     FD_LOG_ERR(( "failed to open or create fec complete csv dump file %s %d %s", file_path, errno, strerror(errno) ));
     627           0 :   }
     628           0 :   FD_LOG_NOTICE(( "Opening shred csv dump file at %s", file_path ));
     629             : 
     630           0 :   strcpy( file_path, tile->shredcap.folder_path );
     631           0 :   strcat( file_path, "/peers.csv" );
     632           0 :   tile->shredcap.peers_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
     633           0 :   if ( FD_UNLIKELY( tile->shredcap.peers_fd == -1 ) ) {
     634           0 :     FD_LOG_ERR(( "failed to open or create peers csv dump file %s %d %s", file_path, errno, strerror(errno) ));
     635           0 :   }
     636             : 
     637           0 :   strcpy( file_path, tile->shredcap.folder_path );
     638           0 :   strcat( file_path, "/slices.bin" );
     639           0 :   tile->shredcap.slices_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
     640           0 :   if ( FD_UNLIKELY( tile->shredcap.slices_fd == -1 ) ) {
     641           0 :     FD_LOG_ERR(( "failed to open or create slices csv dump file %s %d %s", file_path, errno, strerror(errno) ));
     642           0 :   }
     643           0 :   FD_LOG_NOTICE(( "Opening val_shreds binary dump file at %s", file_path ));
     644             : 
     645           0 :   strcpy( file_path, tile->shredcap.folder_path );
     646           0 :   strcat( file_path, "/bank_hashes.bin" );
     647           0 :   tile->shredcap.bank_hashes_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
     648           0 :   if ( FD_UNLIKELY( tile->shredcap.bank_hashes_fd == -1 ) ) {
     649           0 :     FD_LOG_ERR(( "failed to open or create bank_hashes csv dump file %s %d %s", file_path, errno, strerror(errno) ));
     650           0 :   }
     651           0 :   FD_LOG_NOTICE(( "Opening bank_hashes binary dump file at %s", file_path ));
     652           0 : }
     653             : 
     654             : static void
     655             : init_file_handlers( fd_capture_tile_ctx_t    * ctx,
     656             :                     int                      * ctx_file,
     657             :                     int                        tile_file,
     658             :                     uchar                   ** ctx_buf,
     659           0 :                     fd_io_buffered_ostream_t * ctx_ostream ) {
     660           0 :   *ctx_file =  tile_file ;
     661             : 
     662           0 :   int err = ftruncate( *ctx_file, 0UL );
     663           0 :   if( FD_UNLIKELY( err ) ) {
     664           0 :     FD_LOG_ERR(( "failed to truncate file (%i-%s)", errno, fd_io_strerror( errno ) ));
     665           0 :   }
     666           0 :   long seek = lseek( *ctx_file, 0UL, SEEK_SET );
     667           0 :   if( FD_UNLIKELY( seek!=0L ) ) {
     668           0 :     FD_LOG_ERR(( "failed to seek to the beginning of file" ));
     669           0 :   }
     670             : 
     671           0 :   *ctx_buf = fd_alloc_malloc( ctx->alloc, 4096, ctx->write_buf_sz );
     672           0 :   if( FD_UNLIKELY( *ctx_buf == NULL ) ) {
     673           0 :     FD_LOG_ERR(( "failed to allocate ostream buffer" ));
     674           0 :   }
     675             : 
     676           0 :   if( FD_UNLIKELY( !fd_io_buffered_ostream_init(
     677           0 :     ctx_ostream,
     678           0 :     *ctx_file,
     679           0 :     *ctx_buf,
     680           0 :     ctx->write_buf_sz ) ) ) {
     681           0 :     FD_LOG_ERR(( "failed to initialize ostream" ));
     682           0 :   }
     683           0 : }
     684             : 
     685             : 
     686             : static void
     687             : unprivileged_init( fd_topo_t *      topo,
     688           0 :                    fd_topo_tile_t * tile ) {
     689             : 
     690           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     691           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     692           0 :   fd_capture_tile_ctx_t * ctx       = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_capture_tile_ctx_t),  sizeof(fd_capture_tile_ctx_t) );
     693           0 :   void * mainfest_exec_slot_ctx_mem = FD_SCRATCH_ALLOC_APPEND( l, FD_EXEC_SLOT_CTX_ALIGN,          FD_EXEC_SLOT_CTX_FOOTPRINT );
     694           0 :   void * manifest_bank_mem          = FD_SCRATCH_ALLOC_APPEND( l, manifest_bank_align(),           manifest_bank_footprint() );
     695           0 :   void * manifest_spad_mem          = FD_SCRATCH_ALLOC_APPEND( l, manifest_spad_max_alloc_align(), fd_spad_footprint( manifest_spad_max_alloc_footprint() ) );
     696           0 :   void * shared_spad_mem            = FD_SCRATCH_ALLOC_APPEND( l, shared_spad_max_alloc_align(),   fd_spad_footprint( shared_spad_max_alloc_footprint() ) );
     697           0 :   void * alloc_mem                  = FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(),                fd_alloc_footprint() );
     698           0 :   FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
     699             : 
     700             :   /* Input links */
     701           0 :   for( ulong i=0; i<tile->in_cnt; i++ ) {
     702           0 :     fd_topo_link_t * link = &topo->links[ tile->in_link_id[ i ] ];
     703           0 :     fd_topo_wksp_t * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ];
     704           0 :     if( 0==strcmp( link->name, "net_shred" ) ) {
     705           0 :       ctx->in_kind[ i ] = NET_SHRED;
     706           0 :       fd_net_rx_bounds_init( &ctx->in_links[ i ].net_rx, link->dcache );
     707           0 :       continue;
     708           0 :     } else if( 0==strcmp( link->name, "repair_net" ) ) {
     709           0 :       ctx->in_kind[ i ] = REPAIR_NET;
     710           0 :     } else if( 0==strcmp( link->name, "shred_repair" ) ) {
     711           0 :       ctx->in_kind[ i ] = SHRED_REPAIR;
     712           0 :     } else if( 0==strcmp( link->name, "gossip_out" ) ) {
     713           0 :       ctx->in_kind[ i ] = GOSSIP_OUT;
     714           0 :     } else if( 0==strcmp( link->name, "repair_scap" ) ) {
     715           0 :       ctx->in_kind[ i ] = REPAIR_SHREDCAP;
     716           0 :     } else if( 0==strcmp( link->name, "replay_scap" ) ) {
     717           0 :       ctx->in_kind[ i ] = REPLAY_SHREDCAP;
     718           0 :     } else {
     719           0 :       FD_LOG_ERR(( "scap tile has unexpected input link %s", link->name ));
     720           0 :     }
     721             : 
     722           0 :     ctx->in_links[ i ].mem    = link_wksp->wksp;
     723           0 :     ctx->in_links[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->in_links[ i ].mem, link->dcache );
     724           0 :     ctx->in_links[ i ].wmark  = fd_dcache_compact_wmark ( ctx->in_links[ i ].mem, link->dcache, link->mtu );
     725           0 :   }
     726             : 
     727           0 :   ctx->repair_intake_listen_port = tile->shredcap.repair_intake_listen_port;
     728           0 :   ctx->write_buf_sz = tile->shredcap.write_buffer_size ? tile->shredcap.write_buffer_size : FD_SHREDCAP_DEFAULT_WRITER_BUF_SZ;
     729             : 
     730             :   /* Set up stake weights tile output */
     731           0 :   ctx->stake_out->idx       = fd_topo_find_tile_out_link( topo, tile, "replay_stake", 0 );
     732           0 :   if( FD_LIKELY( ctx->stake_out->idx!=ULONG_MAX ) ) {
     733           0 :     fd_topo_link_t * stake_weights_out = &topo->links[ tile->out_link_id[ ctx->stake_out->idx] ];
     734           0 :     ctx->stake_out->mcache  = stake_weights_out->mcache;
     735           0 :     ctx->stake_out->mem     = topo->workspaces[ topo->objs[ stake_weights_out->dcache_obj_id ].wksp_id ].wksp;
     736           0 :     ctx->stake_out->sync    = fd_mcache_seq_laddr     ( ctx->stake_out->mcache );
     737           0 :     ctx->stake_out->depth   = fd_mcache_depth         ( ctx->stake_out->mcache );
     738           0 :     ctx->stake_out->seq     = fd_mcache_seq_query     ( ctx->stake_out->sync );
     739           0 :     ctx->stake_out->chunk0  = fd_dcache_compact_chunk0( ctx->stake_out->mem, stake_weights_out->dcache );
     740           0 :     ctx->stake_out->wmark   = fd_dcache_compact_wmark ( ctx->stake_out->mem, stake_weights_out->dcache, stake_weights_out->mtu );
     741           0 :     ctx->stake_out->chunk   = ctx->stake_out->chunk0;
     742           0 :   } else {
     743           0 :     FD_LOG_WARNING(( "no connection to stake_out link" ));
     744           0 :     memset( ctx->stake_out, 0, sizeof(out_link_t) );
     745           0 :   }
     746             : 
     747           0 :   ctx->snap_out->idx          = fd_topo_find_tile_out_link( topo, tile, "snap_out", 0 );
     748           0 :   if( FD_LIKELY( ctx->snap_out->idx!=ULONG_MAX ) ) {
     749           0 :     fd_topo_link_t * snap_out = &topo->links[tile->out_link_id[ctx->snap_out->idx]];
     750           0 :     ctx->snap_out->mem        = topo->workspaces[topo->objs[snap_out->dcache_obj_id].wksp_id].wksp;
     751           0 :     ctx->snap_out->chunk0     = fd_dcache_compact_chunk0( ctx->snap_out->mem, snap_out->dcache );
     752           0 :     ctx->snap_out->wmark      = fd_dcache_compact_wmark( ctx->snap_out->mem, snap_out->dcache, snap_out->mtu );
     753           0 :     ctx->snap_out->chunk      = ctx->snap_out->chunk0;
     754           0 :   } else {
     755           0 :     FD_LOG_WARNING(( "no connection to snap_out link" ));
     756           0 :     memset( ctx->snap_out, 0, sizeof(out_link_t) );
     757           0 :   }
     758             : 
     759             :   /* If the manifest is enabled (for processing), the stake_out link
     760             :      must be connected to the tile.  TODO in principle, it should be
     761             :      possible to gate the remaining of the manifest-related config. */
     762           0 :   ctx->enable_publish_stake_weights = tile->shredcap.enable_publish_stake_weights;
     763           0 :   FD_LOG_NOTICE(( "enable_publish_stake_weights ? %d", ctx->enable_publish_stake_weights ));
     764             : 
     765             :   /* manifest_wmark (root slot) */
     766           0 :   ulong root_slot_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "root_slot" );
     767           0 :   if( FD_LIKELY( root_slot_obj_id!=ULONG_MAX ) ) { /* for profiler */
     768           0 :     ctx->manifest_wmark = fd_fseq_join( fd_topo_obj_laddr( topo, root_slot_obj_id ) );
     769           0 :     if( FD_UNLIKELY( !ctx->manifest_wmark ) ) FD_LOG_ERR(( "no root_slot fseq" ));
     770           0 :     FD_TEST( ULONG_MAX==fd_fseq_query( ctx->manifest_wmark ) );
     771           0 :   }
     772             : 
     773           0 :   ctx->manifest_bank_mem    = manifest_bank_mem;
     774             : 
     775           0 :   ctx->mainfest_exec_slot_ctx_mem    = mainfest_exec_slot_ctx_mem;
     776           0 :   ctx->manifest_exec_slot_ctx        = fd_exec_slot_ctx_join( fd_exec_slot_ctx_new( ctx->mainfest_exec_slot_ctx_mem  ) );
     777           0 :   FD_TEST( ctx->manifest_exec_slot_ctx );
     778           0 :   ctx->manifest_exec_slot_ctx->banks = fd_banks_join( fd_banks_new( ctx->manifest_bank_mem, MANIFEST_MAX_TOTAL_BANKS, MANIFEST_MAX_FORK_WIDTH ) );
     779           0 :   FD_TEST( ctx->manifest_exec_slot_ctx->banks );
     780           0 :   ctx->manifest_exec_slot_ctx->bank  = fd_banks_init_bank( ctx->manifest_exec_slot_ctx->banks, fd_eslot( 0UL, 0UL ) );
     781           0 :   FD_TEST( ctx->manifest_exec_slot_ctx->bank );
     782             : 
     783           0 :   strncpy( ctx->manifest_path, tile->shredcap.manifest_path, PATH_MAX );
     784           0 :   ctx->manifest_load_done = 0;
     785           0 :   ctx->manifest_spad_mem  = manifest_spad_mem;
     786           0 :   ctx->manifest_spad      = fd_spad_join( fd_spad_new( ctx->manifest_spad_mem, manifest_spad_max_alloc_footprint() ) );
     787           0 :   ctx->shared_spad_mem    = shared_spad_mem;
     788           0 :   ctx->shared_spad        = fd_spad_join( fd_spad_new( ctx->shared_spad_mem, shared_spad_max_alloc_footprint() ) );
     789             : 
     790             :   /* Allocate the write buffers */
     791           0 :   ctx->alloc = fd_alloc_join( fd_alloc_new( alloc_mem, FD_SHREDCAP_ALLOC_TAG ), fd_tile_idx() );
     792           0 :   if( FD_UNLIKELY( !ctx->alloc ) ) {
     793           0 :     FD_LOG_ERR( ( "fd_alloc_join failed" ) );
     794           0 :   }
     795             : 
     796             :   /* Setup the csv files to be in the expected state */
     797             : 
     798           0 :   init_file_handlers( ctx, &ctx->shreds_fd,      tile->shredcap.shreds_fd,      &ctx->shreds_buf,      &ctx->shred_ostream );
     799           0 :   init_file_handlers( ctx, &ctx->requests_fd,    tile->shredcap.requests_fd,    &ctx->requests_buf,    &ctx->repair_ostream );
     800           0 :   init_file_handlers( ctx, &ctx->fecs_fd,        tile->shredcap.fecs_fd,        &ctx->fecs_buf,        &ctx->fecs_ostream );
     801           0 :   init_file_handlers( ctx, &ctx->peers_fd,       tile->shredcap.peers_fd,       &ctx->peers_buf,       &ctx->peers_ostream );
     802             : 
     803           0 :   int err = fd_io_buffered_ostream_write( &ctx->shred_ostream,  "src_ip,src_port,timestamp,slot,ref_tick,fec_set_idx,idx,is_turbine,is_data,nonce\n", 81UL );
     804           0 :   err    |= fd_io_buffered_ostream_write( &ctx->repair_ostream, "dst_ip,dst_port,timestamp,nonce,slot,idx\n", 41UL );
     805           0 :   err    |= fd_io_buffered_ostream_write( &ctx->fecs_ostream,   "timestamp,slot,ref_tick,fec_set_idx,data_cnt\n", 45UL );
     806           0 :   err    |= fd_io_buffered_ostream_write( &ctx->peers_ostream,  "peer_ip4_addr,peer_port,pubkey,turbine\n", 48UL );
     807             : 
     808           0 :   if( FD_UNLIKELY( err ) ) {
     809           0 :     FD_LOG_ERR(( "failed to write header to any of the 4 csv files (%i-%s)", errno, fd_io_strerror( errno ) ));
     810           0 :   }
     811             : 
     812             :   /* Setup the binary files to be in the expected state. These files are
     813             :      not csv, so we don't need headers. */
     814           0 :   init_file_handlers( ctx, &ctx->slices_fd,      tile->shredcap.slices_fd,      &ctx->slices_buf,      &ctx->slices_ostream );
     815           0 :   init_file_handlers( ctx, &ctx->bank_hashes_fd, tile->shredcap.bank_hashes_fd, &ctx->bank_hashes_buf, &ctx->bank_hashes_ostream );
     816           0 : }
     817             : 
     818           0 : #define STEM_BURST (1UL)
     819           0 : #define STEM_LAZY  (50UL)
     820             : 
     821           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_capture_tile_ctx_t
     822           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_capture_tile_ctx_t)
     823             : 
     824           0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
     825           0 : #define STEM_CALLBACK_DURING_FRAG during_frag
     826           0 : #define STEM_CALLBACK_AFTER_FRAG  after_frag
     827           0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
     828             : 
     829             : #include "../../disco/stem/fd_stem.c"
     830             : 
     831             : fd_topo_run_tile_t fd_tile_shredcap = {
     832             :   .name                     = "scap",
     833             :   .loose_footprint          = loose_footprint,
     834             :   .populate_allowed_seccomp = populate_allowed_seccomp,
     835             :   .populate_allowed_fds     = populate_allowed_fds,
     836             :   .scratch_align            = scratch_align,
     837             :   .scratch_footprint        = scratch_footprint,
     838             :   .privileged_init          = privileged_init,
     839             :   .unprivileged_init        = unprivileged_init,
     840             :   .run                      = stem_run,
     841             : };

Generated by: LCOV version 1.14