LCOV - code coverage report
Current view: top level - discof/shredcap - fd_shredcap_tile.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 541 0.0 %
Date: 2025-12-07 04:58:33 Functions: 0 25 0.0 %

          Line data    Source code
       1             : #define _GNU_SOURCE  /* Enable GNU and POSIX extensions */
       2             : #include "../../disco/topo/fd_topo.h"
       3             : #include "../../disco/net/fd_net_tile.h"
       4             : #include "../../flamenco/types/fd_types.h"
       5             : #include "../../flamenco/fd_flamenco_base.h"
       6             : #include "../../util/pod/fd_pod_format.h"
       7             : #include "../../flamenco/gossip/fd_gossip_types.h"
       8             : #include "../../disco/fd_disco.h"
       9             : #include "../../discof/fd_discof.h"
      10             : #include "../../discof/repair/fd_repair.h"
      11             : #include "../../discof/replay/fd_replay_tile.h"
      12             : #include "../../discof/replay/fd_exec.h"
      13             : #include "../../discof/restore/utils/fd_ssmsg.h"
      14             : #include "../../discof/restore/utils/fd_ssmanifest_parser.h"
      15             : #include "../../flamenco/runtime/sysvar/fd_sysvar_epoch_schedule.h"
      16             : #include "../../disco/fd_disco.h"
      17             : #include "../../util/pod/fd_pod_format.h"
      18             : 
      19             : #include <errno.h>
      20             : #include <fcntl.h>
      21             : #include <sys/mman.h>
      22             : #include <sys/stat.h>
      23             : #include <string.h>
      24             : #include <stdio.h>
      25             : #include <unistd.h>
      26             : #include <sys/socket.h>
      27             : #include "generated/fd_shredcap_tile_seccomp.h"
      28             : 
      29             : 
      30             : /* This tile currently has two functionalities.
      31             : 
      32             :    The first is spying on the net_shred, repair_net, and shred_out
      33             :    links and currently outputs to a csv that can analyze repair
      34             :    performance in post.
      35             : 
      36             :    The second is to capture the bank hashes from the replay tile and
      37             :    slices of shreds from the repair tile.  These are outputted to binary
      38             :    files that can be used to reproduce a live replay execution. */
      39             : 
      40           0 : #define FD_SHREDCAP_DEFAULT_WRITER_BUF_SZ  (4096UL)  /* local filesystem block size */
      41           0 : #define FD_SHREDCAP_ALLOC_TAG              (4UL)
      42             : #define MAX_BUFFER_SIZE                    (20000UL * sizeof(fd_shred_dest_wire_t))
      43           0 : #define MANIFEST_MAX_TOTAL_BANKS           (2UL) /* the minimum is 2 */
      44           0 : #define MANIFEST_MAX_FORK_WIDTH            (1UL) /* banks are only needed during publish_stake_weights() */
      45             : 
      46           0 : #define NET_SHRED        (0UL)
      47           0 : #define REPAIR_NET       (1UL)
      48           0 : #define SHRED_OUT        (2UL)
      49           0 : #define GOSSIP_OUT       (3UL)
      50           0 : #define REPAIR_SHREDCAP  (4UL)
      51           0 : #define REPLAY_OUT       (5UL)
      52             : 
      53             : typedef union {
      54             :   struct {
      55             :     fd_wksp_t * mem;
      56             :     ulong       chunk0;
      57             :     ulong       wmark;
      58             :   };
      59             :   fd_net_rx_bounds_t net_rx;
      60             : } fd_capture_in_ctx_t;
      61             : 
      62             : struct out_link {
      63             :   ulong       idx;
      64             :   fd_frag_meta_t * mcache;
      65             :   ulong *          sync;
      66             :   ulong            depth;
      67             :   ulong            seq;
      68             :   fd_wksp_t * mem;
      69             :   ulong       chunk0;
      70             :   ulong       wmark;
      71             :   ulong       chunk;
      72             : };
      73             : typedef struct out_link out_link_t;
      74             : 
      75             : struct fd_capture_tile_ctx {
      76             :   uchar               in_kind[ 32 ];
      77             :   fd_capture_in_ctx_t in_links[ 32 ];
      78             : 
      79             :   int skip_frag;
      80             :   ushort repair_intake_listen_port;
      81             : 
      82             :   ulong shred_buffer_sz;
      83             :   uchar shred_buffer[ FD_NET_MTU ];
      84             : 
      85             :   ulong repair_buffer_sz;
      86             :   uchar repair_buffer[ FD_NET_MTU ];
      87             : 
      88             :   out_link_t           stake_out[1];
      89             :   out_link_t           snap_out[1];
      90             :   int                  enable_publish_stake_weights;
      91             :   ulong *              manifest_wmark;
      92             :   uchar *              manifest_bank_mem;
      93             :   fd_banks_t *         banks;
      94             :   fd_bank_t *          bank;
      95             :   char                 manifest_path[ PATH_MAX ];
      96             :   int                  manifest_load_done;
      97             :   uchar *              manifest_spad_mem;
      98             :   fd_spad_t *          manifest_spad;
      99             :   uchar *              shared_spad_mem;
     100             :   fd_spad_t *          shared_spad;
     101             : 
     102             :   fd_ip4_udp_hdrs_t intake_hdr[1];
     103             : 
     104             :   ulong now;
     105             :   ulong  last_packet_ns;
     106             :   double tick_per_ns;
     107             : 
     108             :   fd_io_buffered_ostream_t shred_ostream;
     109             :   fd_io_buffered_ostream_t repair_ostream;
     110             :   fd_io_buffered_ostream_t fecs_ostream;
     111             :   fd_io_buffered_ostream_t peers_ostream;
     112             :   fd_io_buffered_ostream_t slices_ostream;
     113             :   fd_io_buffered_ostream_t bank_hashes_ostream;
     114             : 
     115             :   int shreds_fd; /* shreds snooped from net_shred */
     116             :   int requests_fd;
     117             :   int fecs_fd;
     118             :   int peers_fd;
     119             :   int slices_fd; /* all shreds in slices from repair tile */
     120             :   int bank_hashes_fd; /* bank hashes from replay tile */
     121             : 
     122             :   ulong write_buf_sz;
     123             : 
     124             :   uchar * shreds_buf;
     125             :   uchar * requests_buf;
     126             :   uchar * fecs_buf;
     127             :   uchar * peers_buf;
     128             :   uchar * slices_buf;
     129             :   uchar * bank_hashes_buf;
     130             : 
     131             :   fd_alloc_t * alloc;
     132             :   uchar contact_info_buffer[ MAX_BUFFER_SIZE ];
     133             : };
     134             : typedef struct fd_capture_tile_ctx fd_capture_tile_ctx_t;
     135             : 
     136             : FD_FN_CONST static inline ulong
     137           0 : scratch_align( void ) {
     138           0 :   return 4096UL;
     139           0 : }
     140             : 
     141             : FD_FN_CONST static inline ulong
     142           0 : manifest_bank_align( void ) {
     143           0 :   return fd_banks_align();
     144           0 : }
     145             : 
     146             : FD_FN_CONST static inline ulong
     147           0 : manifest_bank_footprint( void ) {
     148           0 :   return fd_banks_footprint( MANIFEST_MAX_TOTAL_BANKS, MANIFEST_MAX_FORK_WIDTH );
     149           0 : }
     150             : 
     151             : FD_FN_CONST static inline ulong
     152           0 : manifest_load_align( void ) {
     153           0 :   return 128UL;
     154           0 : }
     155             : 
     156             : FD_FN_CONST static inline ulong
     157           0 : manifest_load_footprint( void ) {
     158             :   /* A manifest typically requires 1GB, but closer to 2GB
     159             :      have been observed in mainnet.  The footprint is then
     160             :      set to 2GB.  TODO a future adjustment may be needed. */
     161           0 :   return 2UL * FD_SHMEM_GIGANTIC_PAGE_SZ;
     162           0 : }
     163             : 
     164             : FD_FN_CONST static inline ulong
     165           0 : manifest_spad_max_alloc_align( void ) {
     166           0 :   return FD_SPAD_ALIGN;
     167           0 : }
     168             : 
     169             : FD_FN_CONST static inline ulong
     170           0 : manifest_spad_max_alloc_footprint( void ) {
     171             :   /* The amount of memory required in the manifest load
     172             :      scratchpad to process it tends to be slightly larger
     173             :      than the manifest load footprint. */
     174           0 :   return manifest_load_footprint() + 128UL * FD_SHMEM_HUGE_PAGE_SZ;
     175           0 : }
     176             : 
     177             : FD_FN_CONST static inline ulong
     178           0 : shared_spad_max_alloc_align( void ) {
     179           0 :   return FD_SPAD_ALIGN;
     180           0 : }
     181             : 
     182             : FD_FN_CONST static inline ulong
     183           0 : shared_spad_max_alloc_footprint( void ) {
     184             :   /* The shared scratchpad is used by the manifest banks
     185             :      and by the manifest load (but not at the same time).
     186             :      The footprint for the banks needs to be equal to
     187             :      banks footprint (at least for the current setup with
     188             :      MANIFEST_MAX_TOTAL_BANKS==2). */
     189           0 :   return fd_ulong_max( manifest_bank_footprint(), manifest_load_footprint() );
     190           0 : }
     191             : 
     192             : FD_FN_PURE static inline ulong
     193           0 : loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
     194           0 :   ulong footprint = sizeof(fd_capture_tile_ctx_t)
     195           0 :                     + manifest_bank_footprint()
     196           0 :                     + fd_spad_footprint( manifest_spad_max_alloc_footprint() )
     197           0 :                     + fd_spad_footprint( shared_spad_max_alloc_footprint() )
     198           0 :                     + fd_alloc_footprint();
     199           0 :   return fd_ulong_align_up( footprint, FD_SHMEM_GIGANTIC_PAGE_SZ );
     200           0 : }
     201             : 
     202             : 
     203             : static ulong
     204             : populate_allowed_seccomp( fd_topo_t const *      topo FD_PARAM_UNUSED,
     205             :                           fd_topo_tile_t const * tile,
     206             :                           ulong                  out_cnt,
     207           0 :                           struct sock_filter *   out ) {
     208           0 :   populate_sock_filter_policy_fd_shredcap_tile( out_cnt,
     209           0 :                                              out,
     210           0 :                                              (uint)fd_log_private_logfile_fd(),
     211           0 :                                              (uint)tile->shredcap.shreds_fd,
     212           0 :                                              (uint)tile->shredcap.requests_fd,
     213           0 :                                              (uint)tile->shredcap.fecs_fd,
     214           0 :                                              (uint)tile->shredcap.peers_fd );
     215           0 :   return sock_filter_policy_fd_shredcap_tile_instr_cnt;
     216           0 : }
     217             : 
     218             : FD_FN_PURE static inline ulong
     219           0 : scratch_footprint( fd_topo_tile_t const * tile ) {
     220           0 :   (void)tile;
     221           0 :   ulong l = FD_LAYOUT_INIT;
     222           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_capture_tile_ctx_t),  sizeof(fd_capture_tile_ctx_t) );
     223           0 :   l = FD_LAYOUT_APPEND( l, manifest_bank_align(),           manifest_bank_footprint() );
     224           0 :   l = FD_LAYOUT_APPEND( l, manifest_spad_max_alloc_align(), fd_spad_footprint( manifest_spad_max_alloc_footprint() ) );
     225           0 :   l = FD_LAYOUT_APPEND( l, shared_spad_max_alloc_align(),   fd_spad_footprint( shared_spad_max_alloc_footprint() ) );
     226           0 :   l = FD_LAYOUT_APPEND( l, fd_alloc_align(),                fd_alloc_footprint() );
     227           0 :   return FD_LAYOUT_FINI( l, scratch_align() );
     228           0 : }
     229             : 
     230             : static inline ulong
     231             : generate_stake_weight_msg_manifest( ulong                                       epoch,
     232             :                                     fd_epoch_schedule_t const *                 epoch_schedule,
     233             :                                     fd_snapshot_manifest_epoch_stakes_t const * epoch_stakes,
     234           0 :                                     ulong *                                     stake_weight_msg_out ) {
     235           0 :   fd_stake_weight_msg_t *  stake_weight_msg = (fd_stake_weight_msg_t *)fd_type_pun( stake_weight_msg_out );
     236           0 :   fd_vote_stake_weight_t * stake_weights    = stake_weight_msg->weights;
     237             : 
     238           0 :   stake_weight_msg->epoch             = epoch;
     239           0 :   stake_weight_msg->staked_cnt        = epoch_stakes->vote_stakes_len;
     240           0 :   stake_weight_msg->start_slot        = fd_epoch_slot0( epoch_schedule, epoch );
     241           0 :   stake_weight_msg->slot_cnt          = epoch_schedule->slots_per_epoch;
     242           0 :   stake_weight_msg->excluded_stake    = 0UL;
     243           0 :   stake_weight_msg->vote_keyed_lsched = 1UL;
     244             : 
     245             :   /* FIXME: SIMD-0180 - hack to (de)activate in testnet vs mainnet.
     246             :      This code can be removed once the feature is active. */
     247           0 :   {
     248           0 :     if(    ( 1==epoch_schedule->warmup && epoch<FD_SIMD0180_ACTIVE_EPOCH_TESTNET )
     249           0 :         || ( 0==epoch_schedule->warmup && epoch<FD_SIMD0180_ACTIVE_EPOCH_MAINNET ) ) {
     250           0 :       stake_weight_msg->vote_keyed_lsched = 0UL;
     251           0 :     }
     252           0 :   }
     253             : 
     254             :   /* epoch_stakes from manifest are already filtered (stake>0), but not sorted */
     255           0 :   for( ulong i=0UL; i<epoch_stakes->vote_stakes_len; i++ ) {
     256           0 :     stake_weights[ i ].stake = epoch_stakes->vote_stakes[ i ].stake;
     257           0 :     memcpy( stake_weights[ i ].id_key.uc, epoch_stakes->vote_stakes[ i ].identity, sizeof(fd_pubkey_t) );
     258           0 :     memcpy( stake_weights[ i ].vote_key.uc, epoch_stakes->vote_stakes[ i ].vote, sizeof(fd_pubkey_t) );
     259           0 :   }
     260           0 :   sort_vote_weights_by_stake_vote_inplace( stake_weights, epoch_stakes->vote_stakes_len);
     261             : 
     262           0 :   return fd_stake_weight_msg_sz( epoch_stakes->vote_stakes_len );
     263           0 : }
     264             : 
     265             : static void
     266             : publish_stake_weights_manifest( fd_capture_tile_ctx_t * ctx,
     267             :                                 fd_stem_context_t *    stem,
     268           0 :                                 fd_snapshot_manifest_t const * manifest ) {
     269           0 :   fd_epoch_schedule_t const * schedule = fd_type_pun_const( &manifest->epoch_schedule_params );
     270           0 :   ulong epoch = fd_slot_to_epoch( schedule, manifest->slot, NULL );
     271             : 
     272             :   /* current epoch */
     273           0 :   ulong * stake_weights_msg = fd_chunk_to_laddr( ctx->stake_out->mem, ctx->stake_out->chunk );
     274           0 :   ulong stake_weights_sz = generate_stake_weight_msg_manifest( epoch, schedule, &manifest->epoch_stakes[0], stake_weights_msg );
     275           0 :   ulong stake_weights_sig = 4UL;
     276           0 :   fd_stem_publish( stem, 0UL, stake_weights_sig, ctx->stake_out->chunk, stake_weights_sz, 0UL, 0UL, fd_frag_meta_ts_comp( fd_tickcount() ) );
     277           0 :   ctx->stake_out->chunk = fd_dcache_compact_next( ctx->stake_out->chunk, stake_weights_sz, ctx->stake_out->chunk0, ctx->stake_out->wmark );
     278           0 :   FD_LOG_NOTICE(("sending current epoch stake weights - epoch: %lu, stake_weight_cnt: %lu, start_slot: %lu, slot_cnt: %lu", stake_weights_msg[0], stake_weights_msg[1], stake_weights_msg[2], stake_weights_msg[3]));
     279             : 
     280             :   /* next current epoch */
     281           0 :   stake_weights_msg = fd_chunk_to_laddr( ctx->stake_out->mem, ctx->stake_out->chunk );
     282           0 :   stake_weights_sz = generate_stake_weight_msg_manifest( epoch + 1, schedule, &manifest->epoch_stakes[1], stake_weights_msg );
     283           0 :   stake_weights_sig = 4UL;
     284           0 :   fd_stem_publish( stem, 0UL, stake_weights_sig, ctx->stake_out->chunk, stake_weights_sz, 0UL, 0UL, fd_frag_meta_ts_comp( fd_tickcount() ) );
     285           0 :   ctx->stake_out->chunk = fd_dcache_compact_next( ctx->stake_out->chunk, stake_weights_sz, ctx->stake_out->chunk0, ctx->stake_out->wmark );
     286           0 :   FD_LOG_NOTICE(("sending next epoch stake weights - epoch: %lu, stake_weight_cnt: %lu, start_slot: %lu, slot_cnt: %lu", stake_weights_msg[0], stake_weights_msg[1], stake_weights_msg[2], stake_weights_msg[3]));
     287           0 : }
     288             : 
     289             : static inline int
     290             : before_frag( fd_capture_tile_ctx_t * ctx,
     291             :              ulong            in_idx,
     292             :              ulong            seq FD_PARAM_UNUSED,
     293           0 :              ulong            sig ) {
     294           0 :   if( FD_LIKELY( ctx->in_kind[ in_idx ]==NET_SHRED ) ) {
     295           0 :     return (int)(fd_disco_netmux_sig_proto( sig )!=DST_PROTO_SHRED) & (int)(fd_disco_netmux_sig_proto( sig )!=DST_PROTO_REPAIR);
     296           0 :   } else if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==GOSSIP_OUT)) {
     297           0 :     return sig!=FD_GOSSIP_UPDATE_TAG_CONTACT_INFO;
     298           0 :   }
     299           0 :   return 0;
     300           0 : }
     301             : 
     302             : static inline void
     303             : handle_new_contact_info( fd_capture_tile_ctx_t * ctx,
     304           0 :                          uchar const *           buf ) {
     305           0 :   fd_gossip_update_message_t const * msg = (fd_gossip_update_message_t const *)fd_type_pun_const( buf );
     306           0 :   char tvu_buf[1024];
     307           0 :   char repair_buf[1024];
     308           0 :   fd_ip4_port_t tvu    = msg->contact_info.contact_info->sockets[ FD_CONTACT_INFO_SOCKET_TVU ];
     309           0 :   fd_ip4_port_t repair = msg->contact_info.contact_info->sockets[ FD_CONTACT_INFO_SOCKET_SERVE_REPAIR ];
     310             : 
     311           0 :   if( FD_UNLIKELY( tvu.l!=0UL ) ){
     312           0 :     snprintf( tvu_buf, sizeof(tvu_buf),
     313           0 :               "%u,%u(tvu),%s,%d\n",
     314           0 :               tvu.addr, tvu.port, FD_BASE58_ENC_32_ALLOCA(msg->contact_info.contact_info->pubkey.uc), 1);
     315           0 :     int err = fd_io_buffered_ostream_write( &ctx->peers_ostream, tvu_buf, strlen(tvu_buf) );
     316           0 :     FD_TEST( err==0 );
     317           0 :   }
     318           0 :   if( FD_UNLIKELY( repair.l!=0UL ) ){
     319           0 :     snprintf( repair_buf, sizeof(repair_buf),
     320           0 :               "%u,%u(repair),%s,%d\n",
     321           0 :               repair.addr, repair.port, FD_BASE58_ENC_32_ALLOCA(msg->contact_info.contact_info->pubkey.uc), 1);
     322           0 :     int err = fd_io_buffered_ostream_write( &ctx->peers_ostream, repair_buf, strlen(repair_buf) );
     323           0 :     FD_TEST( err==0 );
     324           0 :   }
     325           0 : }
     326             : 
     327             : static int
     328           0 : is_fec_completes_msg( ulong sz ) {
     329           0 :   return sz == FD_SHRED_DATA_HEADER_SZ + 2 * FD_SHRED_MERKLE_ROOT_SZ;
     330           0 : }
     331             : 
     332             : static inline void
     333             : during_frag( fd_capture_tile_ctx_t * ctx,
     334             :              ulong                   in_idx,
     335             :              ulong                   seq FD_PARAM_UNUSED,
     336             :              ulong                   sig,
     337             :              ulong                   chunk,
     338             :              ulong                   sz,
     339           0 :              ulong                   ctl ) {
     340           0 :   ctx->skip_frag = 0;
     341           0 :   if( ctx->in_kind[ in_idx ]==SHRED_OUT ) {
     342           0 :     if( !is_fec_completes_msg( sz ) ) {
     343           0 :       ctx->skip_frag = 1;
     344           0 :       return;
     345           0 :     }
     346           0 :     fd_memcpy( ctx->shred_buffer, fd_chunk_to_laddr_const( ctx->in_links[ in_idx ].mem, chunk ), sz );
     347           0 :     ctx->shred_buffer_sz = sz;
     348           0 :   } else if( ctx->in_kind[ in_idx ] == NET_SHRED ) {
     349           0 :     uchar const * dcache_entry = fd_net_rx_translate_frag( &ctx->in_links[ in_idx ].net_rx, chunk, ctl, sz );
     350           0 :     ulong hdr_sz = fd_disco_netmux_sig_hdr_sz( sig );
     351           0 :     FD_TEST( hdr_sz <= sz ); /* Should be ensured by the net tile */
     352           0 :     fd_shred_t const * shred = fd_shred_parse( dcache_entry+hdr_sz, sz-hdr_sz );
     353           0 :     if( FD_UNLIKELY( !shred ) ) {
     354           0 :       ctx->skip_frag = 1;
     355           0 :       return;
     356           0 :     };
     357           0 :     fd_memcpy( ctx->shred_buffer, dcache_entry, sz );
     358           0 :     ctx->shred_buffer_sz = sz-hdr_sz;
     359           0 :   } else if( ctx->in_kind[ in_idx ] == REPAIR_NET ) {
     360             :     /* Repair will have outgoing pings, outgoing repair requests, and
     361             :        outgoing served shreds we want to filter everything but the
     362             :        repair requests.
     363             :        1. We can index into the ip4 udp packet hdr and check if the src
     364             :           port is the intake listen port or serve port
     365             :        2. Then we can filter on the discriminant which luckily does not
     366             :           require decoding! */
     367             : 
     368           0 :     uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in_links[ in_idx ].mem, chunk );
     369           0 :     fd_ip4_udp_hdrs_t const * hdr = (fd_ip4_udp_hdrs_t const *)dcache_entry;
     370           0 :     if( hdr->udp->net_sport != fd_ushort_bswap( ctx->repair_intake_listen_port ) ) {
     371           0 :       ctx->skip_frag = 1;
     372           0 :       return;
     373           0 :     }
     374           0 :     const uchar * encoded_protocol = dcache_entry + sizeof(fd_ip4_udp_hdrs_t);
     375           0 :     uint discriminant = FD_LOAD(uint, encoded_protocol);
     376             : 
     377           0 :     if( FD_UNLIKELY( discriminant <= FD_REPAIR_KIND_PONG ) ) {
     378           0 :       ctx->skip_frag = 1;
     379           0 :       return;
     380           0 :     }
     381           0 :     fd_memcpy( ctx->repair_buffer, dcache_entry, sz );
     382           0 :     ctx->repair_buffer_sz = sz;
     383           0 :   } else if( ctx->in_kind[ in_idx ] == REPAIR_SHREDCAP ) {
     384             : 
     385           0 :     uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in_links[ in_idx ].mem, chunk );
     386             : 
     387             :     /* FIXME this should all be happening in after_frag */
     388             : 
     389             :     /* We expect to get all of the data shreds in a batch at once.  When
     390             :        we do we will write the header, the shreds, and a trailer. */
     391           0 :     ulong payload_sz = sig;
     392           0 :     fd_shredcap_slice_header_msg_t header = {
     393           0 :       .magic      = FD_SHREDCAP_SLICE_HEADER_MAGIC,
     394           0 :       .version    = FD_SHREDCAP_SLICE_HEADER_V1,
     395           0 :       .payload_sz = payload_sz,
     396           0 :     };
     397           0 :     int err;
     398           0 :     err = fd_io_buffered_ostream_write( &ctx->slices_ostream, &header, FD_SHREDCAP_SLICE_HEADER_FOOTPRINT );
     399           0 :     if( FD_UNLIKELY( err != 0 ) ) {
     400           0 :       FD_LOG_CRIT(( "failed to write slice header %d", err ));
     401           0 :     }
     402           0 :     err = fd_io_buffered_ostream_write( &ctx->slices_ostream, dcache_entry, payload_sz );
     403           0 :     if( FD_UNLIKELY( err != 0 ) ) {
     404           0 :       FD_LOG_CRIT(( "failed to write slice data %d", err ));
     405           0 :     }
     406           0 :     fd_shredcap_slice_trailer_msg_t trailer = {
     407           0 :       .magic   = FD_SHREDCAP_SLICE_TRAILER_MAGIC,
     408           0 :       .version = FD_SHREDCAP_SLICE_TRAILER_V1,
     409           0 :     };
     410           0 :     err = fd_io_buffered_ostream_write( &ctx->slices_ostream, &trailer, FD_SHREDCAP_SLICE_TRAILER_FOOTPRINT );
     411           0 :     if( FD_UNLIKELY( err != 0 ) ) {
     412           0 :       FD_LOG_CRIT(( "failed to write slice trailer %d", err ));
     413           0 :     }
     414             : 
     415           0 :   } else if( ctx->in_kind[ in_idx ] == REPLAY_OUT ) {
     416           0 :     if( FD_UNLIKELY( sig!=REPLAY_SIG_SLOT_COMPLETED ) ) return;
     417             : 
     418             :     /* FIXME this should all be happening in after_frag */
     419             : 
     420           0 :    fd_replay_slot_completed_t const * msg = fd_chunk_to_laddr_const( ctx->in_links[ in_idx ].mem, chunk );
     421           0 :    fd_shredcap_bank_hash_msg_t bank_hash_msg = {
     422           0 :      .magic   = FD_SHREDCAP_BANK_HASH_MAGIC,
     423           0 :      .version = FD_SHREDCAP_BANK_HASH_V1
     424           0 :    };
     425           0 :    fd_memcpy( &bank_hash_msg.bank_hash, msg->bank_hash.uc, sizeof(fd_hash_t) );
     426           0 :    bank_hash_msg.slot = msg->slot;
     427             : 
     428           0 :    fd_io_buffered_ostream_write( &ctx->bank_hashes_ostream, &bank_hash_msg, FD_SHREDCAP_BANK_HASH_FOOTPRINT );
     429             : 
     430           0 :   } else {
     431             :     // contact infos can be copied into a buffer
     432           0 :     if( FD_UNLIKELY( chunk<ctx->in_links[ in_idx ].chunk0 || chunk>ctx->in_links[ in_idx ].wmark ) ) {
     433           0 :       FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
     434           0 :                    ctx->in_links[ in_idx ].chunk0, ctx->in_links[ in_idx ].wmark ));
     435           0 :     }
     436           0 :     uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in_links[ in_idx ].mem, chunk );
     437           0 :     fd_memcpy( ctx->contact_info_buffer, dcache_entry, sz );
     438           0 :   }
     439           0 : }
     440             : 
     441             : static void
     442             : after_credit( fd_capture_tile_ctx_t * ctx,
     443             :               fd_stem_context_t *     stem,
     444             :               int *                   opt_poll_in FD_PARAM_UNUSED,
     445           0 :               int *                   charge_busy FD_PARAM_UNUSED ) {
     446             : 
     447           0 :   if( FD_UNLIKELY( !ctx->manifest_load_done ) ) {
     448           0 :     if( FD_LIKELY( !!strcmp( ctx->manifest_path, "") ) ) {
     449             :       /* ctx->manifest_spad will hold the processed manifest. */
     450           0 :       fd_spad_reset( ctx->manifest_spad );
     451             :       /* do not pop from ctx->manifest_spad, the manifest needs
     452             :          to remain available until a new manifest is processed. */
     453             : 
     454           0 :       int fd = open( ctx->manifest_path, O_RDONLY );
     455           0 :       if( FD_UNLIKELY( fd < 0 ) ) {
     456           0 :         FD_LOG_WARNING(( "open(%s) failed (%d-%s)", ctx->manifest_path, errno, fd_io_strerror( errno ) ));
     457           0 :         return;
     458           0 :       }
     459           0 :       FD_LOG_NOTICE(( "manifest %s.", ctx->manifest_path ));
     460             : 
     461           0 :       fd_snapshot_manifest_t * manifest = NULL;
     462           0 :       FD_SPAD_FRAME_BEGIN( ctx->manifest_spad ) {
     463           0 :         manifest = fd_spad_alloc( ctx->manifest_spad, alignof(fd_snapshot_manifest_t), sizeof(fd_snapshot_manifest_t) );
     464           0 :       } FD_SPAD_FRAME_END;
     465           0 :       FD_TEST( manifest );
     466             : 
     467           0 :       FD_SPAD_FRAME_BEGIN( ctx->shared_spad ) {
     468           0 :         uchar * buf    = fd_spad_alloc( ctx->shared_spad, manifest_load_align(), manifest_load_footprint() );
     469           0 :         ulong   buf_sz = 0;
     470           0 :         FD_TEST( !fd_io_read( fd, buf/*dst*/, 0/*dst_min*/, manifest_load_footprint()-1UL /*dst_max*/, &buf_sz ) );
     471             : 
     472           0 :         fd_ssmanifest_parser_t * parser = fd_ssmanifest_parser_join( fd_ssmanifest_parser_new( aligned_alloc(
     473           0 :                 fd_ssmanifest_parser_align(), fd_ssmanifest_parser_footprint() ) ) );
     474           0 :         FD_TEST( parser );
     475           0 :         fd_ssmanifest_parser_init( parser, manifest );
     476           0 :         int parser_err = fd_ssmanifest_parser_consume( parser, buf, buf_sz, NULL, NULL );
     477           0 :         FD_TEST( parser_err==1 );
     478             :         // if( FD_UNLIKELY( parser_err ) ) FD_LOG_ERR(( "fd_ssmanifest_parser_consume failed (%d)", parser_err ));
     479           0 :       } FD_SPAD_FRAME_END;
     480           0 :       FD_LOG_NOTICE(( "manifest bank slot %lu", manifest->slot ));
     481             : 
     482           0 :       fd_fseq_update( ctx->manifest_wmark, manifest->slot );
     483             : 
     484           0 :       uchar * chunk = fd_chunk_to_laddr( ctx->snap_out->mem, ctx->snap_out->chunk );
     485           0 :       ulong   sz    = sizeof(fd_snapshot_manifest_t);
     486           0 :       ulong   sig   = fd_ssmsg_sig( FD_SSMSG_MANIFEST_INCREMENTAL );
     487           0 :       memcpy( chunk, manifest, sz );
     488           0 :       fd_stem_publish( stem, ctx->snap_out->idx, sig, ctx->snap_out->chunk, sz, 0UL, 0UL, fd_frag_meta_ts_comp( fd_tickcount() ) );
     489           0 :       ctx->snap_out->chunk = fd_dcache_compact_next( ctx->snap_out->chunk, sz, ctx->snap_out->chunk0, ctx->snap_out->wmark );
     490             : 
     491           0 :       fd_stem_publish( stem, ctx->snap_out->idx, fd_ssmsg_sig( FD_SSMSG_DONE ), 0UL, 0UL, 0UL, 0UL, 0UL );
     492             : 
     493           0 :       publish_stake_weights_manifest( ctx, stem, manifest );
     494             :       //*charge_busy = 0;
     495           0 :     }
     496             :     /* No need to strcmp every time after_credit is called. */
     497           0 :     ctx->manifest_load_done = 1;
     498           0 :   }
     499           0 : }
     500             : 
     501             : static void
     502           0 : handle_repair_request( fd_capture_tile_ctx_t * ctx ) {
     503             :   /* We have a valid repair request that we can finally decode.
     504             :      Unfortunately we actually have to decode because we cant cast
     505             :      directly to the protocol */
     506           0 :   fd_ip4_udp_hdrs_t * hdr = (fd_ip4_udp_hdrs_t *)ctx->repair_buffer;
     507             : 
     508           0 :   uint   peer_ip4_addr = hdr->ip4->daddr;
     509           0 :   ushort peer_port     = hdr->udp->net_dport;
     510           0 :   ulong  slot          = 0UL;
     511           0 :   ulong  shred_index   = UINT_MAX;
     512           0 :   uint   nonce         = 0U;
     513             : 
     514             :   /* FIXME this assumes IPv4 without options, which is not always true */
     515           0 :   uchar const * const buf0 = ctx->repair_buffer + sizeof(fd_ip4_udp_hdrs_t);
     516           0 :   uchar const * const buf1 = ctx->repair_buffer + ctx->repair_buffer_sz;
     517           0 :   if( FD_UNLIKELY( buf0>=buf1 ) ) return;
     518             : 
     519           0 :   uchar const * cur = buf0;
     520           0 :   ulong         rem = (ulong)( buf1-buf0 );
     521             : 
     522           0 :   if( FD_UNLIKELY( rem<sizeof(uint) ) ) return;
     523           0 :   uint discriminant = FD_LOAD( uint, cur );
     524           0 :   cur += sizeof(uint);
     525           0 :   rem -= sizeof(uint);
     526             : 
     527           0 :   switch( discriminant ) {
     528           0 :   case FD_REPAIR_KIND_SHRED: {
     529           0 :     if( FD_UNLIKELY( rem<sizeof(fd_repair_shred_req_t) ) ) return;
     530           0 :     fd_repair_shred_req_t req = FD_LOAD( fd_repair_shred_req_t, cur );
     531           0 :     cur += sizeof(fd_repair_shred_req_t);
     532           0 :     rem -= sizeof(fd_repair_shred_req_t);
     533             : 
     534           0 :     slot        = req.slot;
     535           0 :     shred_index = req.shred_idx;
     536           0 :     nonce       = req.nonce;
     537           0 :     break;
     538           0 :   }
     539           0 :   case FD_REPAIR_KIND_HIGHEST_SHRED: {
     540           0 :     if( FD_UNLIKELY( rem<sizeof(fd_repair_highest_shred_req_t) ) ) return;
     541           0 :     fd_repair_highest_shred_req_t req = FD_LOAD( fd_repair_highest_shred_req_t, cur );
     542           0 :     cur += sizeof(fd_repair_highest_shred_req_t);
     543           0 :     rem -= sizeof(fd_repair_highest_shred_req_t);
     544             : 
     545           0 :     slot        = req.slot;
     546           0 :     shred_index = req.shred_idx;
     547           0 :     nonce       = req.nonce;
     548           0 :     break;
     549           0 :   }
     550           0 :   case FD_REPAIR_KIND_ORPHAN: {
     551           0 :     if( FD_UNLIKELY( rem<sizeof(fd_repair_orphan_req_t) ) ) return;
     552           0 :     fd_repair_orphan_req_t req = FD_LOAD( fd_repair_orphan_req_t, cur );
     553           0 :     cur += sizeof(fd_repair_orphan_req_t);
     554           0 :     rem -= sizeof(fd_repair_orphan_req_t);
     555             : 
     556           0 :     slot  = req.slot;
     557           0 :     nonce = req.nonce;
     558           0 :     break;
     559           0 :   }
     560           0 :   default:
     561           0 :     break;
     562           0 :   }
     563             : 
     564           0 :   char repair_data_buf[1024];
     565           0 :   snprintf( repair_data_buf, sizeof(repair_data_buf),
     566           0 :             "%u,%u,%ld,%u,%lu,%lu\n",
     567           0 :             peer_ip4_addr, peer_port, fd_log_wallclock(), nonce, slot, shred_index );
     568           0 :   int err = fd_io_buffered_ostream_write( &ctx->repair_ostream, repair_data_buf, strlen(repair_data_buf) );
     569           0 :   FD_TEST( err==0 );
     570           0 : }
     571             : 
     572             : static inline void
     573             : after_frag( fd_capture_tile_ctx_t * ctx,
     574             :             ulong                   in_idx,
     575             :             ulong                   seq    FD_PARAM_UNUSED,
     576             :             ulong                   sig,
     577             :             ulong                   sz,
     578             :             ulong                   tsorig FD_PARAM_UNUSED,
     579             :             ulong                   tspub  FD_PARAM_UNUSED,
     580           0 :             fd_stem_context_t *     stem   FD_PARAM_UNUSED ) {
     581           0 :   if( FD_UNLIKELY( ctx->skip_frag ) ) return;
     582             : 
     583           0 :   if( ctx->in_kind[ in_idx ] == SHRED_OUT ) {
     584             :     /* This is a fec completes message! we can use it to check how long
     585             :        it takes to complete a fec */
     586             : 
     587           0 :     fd_shred_t const * shred = (fd_shred_t *)fd_type_pun( ctx->shred_buffer );
     588           0 :     uint data_cnt = fd_disco_shred_out_fec_sig_data_cnt( sig );
     589           0 :     uint ref_tick = shred->data.flags & FD_SHRED_DATA_REF_TICK_MASK;
     590           0 :     char fec_complete[1024];
     591           0 :     snprintf( fec_complete, sizeof(fec_complete),
     592           0 :              "%ld,%lu,%u,%u,%u\n",
     593           0 :               fd_log_wallclock(), shred->slot, ref_tick, shred->fec_set_idx, data_cnt );
     594             : 
     595             :     // Last shred is guaranteed to be a data shred
     596             : 
     597             : 
     598           0 :     int err = fd_io_buffered_ostream_write( &ctx->fecs_ostream, fec_complete, strlen(fec_complete) );
     599           0 :     FD_TEST( err==0 );
     600           0 :   } else if( ctx->in_kind[ in_idx ] == NET_SHRED ) {
     601             :     /* TODO: leader schedule early exits in shred tile right around
     602             :        startup, which discards some turbine shreds, but there is a
     603             :        chance we capture this shred here. Currently handled in post, but
     604             :        in the future will want to get the leader schedule here so we can
     605             :        also benchmark whether the excepcted sender in the turbine tree
     606             :        matches the actual sender. */
     607             : 
     608           0 :     ulong hdr_sz     = fd_disco_netmux_sig_hdr_sz( sig );
     609           0 :     fd_ip4_udp_hdrs_t * hdr = (fd_ip4_udp_hdrs_t *)ctx->shred_buffer;
     610           0 :     uint src_ip4_addr = hdr->ip4->saddr;
     611           0 :     ushort src_port   = hdr->udp->net_sport;
     612             : 
     613           0 :     fd_shred_t const * shred = fd_shred_parse( ctx->shred_buffer + hdr_sz, sz - hdr_sz );
     614           0 :     int   is_turbine = fd_disco_netmux_sig_proto( sig ) == DST_PROTO_SHRED;
     615           0 :     uint  nonce      = is_turbine ? 0 : FD_LOAD(uint, ctx->shred_buffer + hdr_sz + fd_shred_sz( shred ) );
     616           0 :     int   is_data    = fd_shred_is_data( fd_shred_type( shred->variant ) );
     617           0 :     ulong slot       = shred->slot;
     618           0 :     uint  idx        = shred->idx;
     619           0 :     uint  fec_idx    = shred->fec_set_idx;
     620           0 :     uint  ref_tick   = 65;
     621           0 :     if( FD_UNLIKELY( is_turbine && is_data ) ) {
     622             :       /* We can then index into the flag and get a REFTICK */
     623           0 :       ref_tick = shred->data.flags & FD_SHRED_DATA_REF_TICK_MASK;
     624           0 :     }
     625             : 
     626           0 :     char repair_data_buf[1024];
     627           0 :     snprintf( repair_data_buf, sizeof(repair_data_buf),
     628           0 :              "%u,%u,%ld,%lu,%u,%u,%u,%d,%d,%u\n",
     629           0 :               src_ip4_addr, src_port, fd_log_wallclock(), slot, ref_tick, fec_idx, idx, is_turbine, is_data, nonce );
     630             : 
     631           0 :     int err = fd_io_buffered_ostream_write( &ctx->shred_ostream, repair_data_buf, strlen(repair_data_buf) );
     632           0 :     FD_TEST( err==0 );
     633           0 :   } else if( ctx->in_kind[ in_idx ] == REPAIR_NET ) {
     634           0 :     handle_repair_request( ctx );
     635           0 :   } else if( ctx->in_kind[ in_idx ] == GOSSIP_OUT ) {
     636           0 :     handle_new_contact_info( ctx, ctx->contact_info_buffer );
     637           0 :   }
     638           0 : }
     639             : 
     640             : static ulong
     641             : populate_allowed_fds( fd_topo_t const      * topo        FD_PARAM_UNUSED,
     642             :                       fd_topo_tile_t const * tile,
     643             :                       ulong                  out_fds_cnt FD_PARAM_UNUSED,
     644           0 :                       int *                  out_fds ) {
     645           0 :   ulong out_cnt = 0UL;
     646             : 
     647           0 :   out_fds[ out_cnt++ ] = 2; /* stderr */
     648           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
     649           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
     650           0 :   if( FD_LIKELY( -1!=tile->shredcap.shreds_fd ) )
     651           0 :     out_fds[ out_cnt++ ] = tile->shredcap.shreds_fd; /* shred file */
     652           0 :   if( FD_LIKELY( -1!=tile->shredcap.requests_fd ) )
     653           0 :     out_fds[ out_cnt++ ] = tile->shredcap.requests_fd; /* request file */
     654           0 :   if( FD_LIKELY( -1!=tile->shredcap.fecs_fd ) )
     655           0 :     out_fds[ out_cnt++ ] = tile->shredcap.fecs_fd; /* fec complete file */
     656           0 :   if( FD_LIKELY( -1!=tile->shredcap.peers_fd ) )
     657           0 :     out_fds[ out_cnt++ ] = tile->shredcap.peers_fd; /* peers file */
     658           0 :   if( FD_LIKELY( -1!=tile->shredcap.slices_fd ) )
     659           0 :     out_fds[ out_cnt++ ] = tile->shredcap.slices_fd; /* slices file */
     660           0 :   if( FD_LIKELY( -1!=tile->shredcap.bank_hashes_fd ) )
     661           0 :     out_fds[ out_cnt++ ] = tile->shredcap.bank_hashes_fd; /* bank hashes file */
     662             : 
     663           0 :   return out_cnt;
     664           0 : }
     665             : 
     666             : static void
     667             : privileged_init( fd_topo_t *      topo FD_PARAM_UNUSED,
     668           0 :                  fd_topo_tile_t * tile ) {
     669           0 :   char file_path[PATH_MAX];
     670           0 :   strcpy( file_path, tile->shredcap.folder_path );
     671           0 :   strcat( file_path, "/shred_data.csv" );
     672           0 :   tile->shredcap.shreds_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
     673           0 :   if ( FD_UNLIKELY( tile->shredcap.shreds_fd == -1 ) ) {
     674           0 :     FD_LOG_ERR(( "failed to open or create shred csv dump file %s %d %s", file_path, errno, strerror(errno) ));
     675           0 :   }
     676             : 
     677           0 :   strcpy( file_path, tile->shredcap.folder_path );
     678           0 :   strcat( file_path, "/request_data.csv" );
     679           0 :   tile->shredcap.requests_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
     680           0 :   if ( FD_UNLIKELY( tile->shredcap.requests_fd == -1 ) ) {
     681           0 :     FD_LOG_ERR(( "failed to open or create request csv dump file %s %d %s", file_path, errno, strerror(errno) ));
     682           0 :   }
     683             : 
     684           0 :   strcpy( file_path, tile->shredcap.folder_path );
     685           0 :   strcat( file_path, "/fec_complete.csv" );
     686           0 :   tile->shredcap.fecs_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
     687           0 :   if ( FD_UNLIKELY( tile->shredcap.fecs_fd == -1 ) ) {
     688           0 :     FD_LOG_ERR(( "failed to open or create fec complete csv dump file %s %d %s", file_path, errno, strerror(errno) ));
     689           0 :   }
     690           0 :   FD_LOG_NOTICE(( "Opening shred csv dump file at %s", file_path ));
     691             : 
     692           0 :   strcpy( file_path, tile->shredcap.folder_path );
     693           0 :   strcat( file_path, "/peers.csv" );
     694           0 :   tile->shredcap.peers_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
     695           0 :   if ( FD_UNLIKELY( tile->shredcap.peers_fd == -1 ) ) {
     696           0 :     FD_LOG_ERR(( "failed to open or create peers csv dump file %s %d %s", file_path, errno, strerror(errno) ));
     697           0 :   }
     698             : 
     699           0 :   strcpy( file_path, tile->shredcap.folder_path );
     700           0 :   strcat( file_path, "/slices.bin" );
     701           0 :   tile->shredcap.slices_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
     702           0 :   if ( FD_UNLIKELY( tile->shredcap.slices_fd == -1 ) ) {
     703           0 :     FD_LOG_ERR(( "failed to open or create slices csv dump file %s %d %s", file_path, errno, strerror(errno) ));
     704           0 :   }
     705           0 :   FD_LOG_NOTICE(( "Opening val_shreds binary dump file at %s", file_path ));
     706             : 
     707           0 :   strcpy( file_path, tile->shredcap.folder_path );
     708           0 :   strcat( file_path, "/bank_hashes.bin" );
     709           0 :   tile->shredcap.bank_hashes_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
     710           0 :   if ( FD_UNLIKELY( tile->shredcap.bank_hashes_fd == -1 ) ) {
     711           0 :     FD_LOG_ERR(( "failed to open or create bank_hashes csv dump file %s %d %s", file_path, errno, strerror(errno) ));
     712           0 :   }
     713           0 :   FD_LOG_NOTICE(( "Opening bank_hashes binary dump file at %s", file_path ));
     714           0 : }
     715             : 
     716             : static void
     717             : init_file_handlers( fd_capture_tile_ctx_t    * ctx,
     718             :                     int                      * ctx_file,
     719             :                     int                        tile_file,
     720             :                     uchar                   ** ctx_buf,
     721           0 :                     fd_io_buffered_ostream_t * ctx_ostream ) {
     722           0 :   *ctx_file =  tile_file ;
     723             : 
     724           0 :   int err = ftruncate( *ctx_file, 0UL );
     725           0 :   if( FD_UNLIKELY( err ) ) {
     726           0 :     FD_LOG_ERR(( "failed to truncate file (%i-%s)", errno, fd_io_strerror( errno ) ));
     727           0 :   }
     728           0 :   long seek = lseek( *ctx_file, 0UL, SEEK_SET );
     729           0 :   if( FD_UNLIKELY( seek!=0L ) ) {
     730           0 :     FD_LOG_ERR(( "failed to seek to the beginning of file" ));
     731           0 :   }
     732             : 
     733           0 :   *ctx_buf = fd_alloc_malloc( ctx->alloc, 4096, ctx->write_buf_sz );
     734           0 :   if( FD_UNLIKELY( *ctx_buf == NULL ) ) {
     735           0 :     FD_LOG_ERR(( "failed to allocate ostream buffer" ));
     736           0 :   }
     737             : 
     738           0 :   if( FD_UNLIKELY( !fd_io_buffered_ostream_init(
     739           0 :     ctx_ostream,
     740           0 :     *ctx_file,
     741           0 :     *ctx_buf,
     742           0 :     ctx->write_buf_sz ) ) ) {
     743           0 :     FD_LOG_ERR(( "failed to initialize ostream" ));
     744           0 :   }
     745           0 : }
     746             : 
     747             : 
     748             : static void
     749             : unprivileged_init( fd_topo_t *      topo,
     750           0 :                    fd_topo_tile_t * tile ) {
     751             : 
     752           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     753           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     754           0 :   fd_capture_tile_ctx_t * ctx       = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_capture_tile_ctx_t),  sizeof(fd_capture_tile_ctx_t) );
     755           0 :   void * manifest_bank_mem          = FD_SCRATCH_ALLOC_APPEND( l, manifest_bank_align(),           manifest_bank_footprint() );
     756           0 :   void * manifest_spad_mem          = FD_SCRATCH_ALLOC_APPEND( l, manifest_spad_max_alloc_align(), fd_spad_footprint( manifest_spad_max_alloc_footprint() ) );
     757           0 :   void * shared_spad_mem            = FD_SCRATCH_ALLOC_APPEND( l, shared_spad_max_alloc_align(),   fd_spad_footprint( shared_spad_max_alloc_footprint() ) );
     758           0 :   void * alloc_mem                  = FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(),                fd_alloc_footprint() );
     759           0 :   FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
     760             : 
     761             :   /* Input links */
     762           0 :   for( ulong i=0; i<tile->in_cnt; i++ ) {
     763           0 :     fd_topo_link_t * link = &topo->links[ tile->in_link_id[ i ] ];
     764           0 :     fd_topo_wksp_t * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ];
     765           0 :     if( 0==strcmp( link->name, "net_shred" ) ) {
     766           0 :       ctx->in_kind[ i ] = NET_SHRED;
     767           0 :       fd_net_rx_bounds_init( &ctx->in_links[ i ].net_rx, link->dcache );
     768           0 :       continue;
     769           0 :     } else if( 0==strcmp( link->name, "repair_net" ) ) {
     770           0 :       ctx->in_kind[ i ] = REPAIR_NET;
     771           0 :     } else if( 0==strcmp( link->name, "shred_out" ) ) {
     772           0 :       ctx->in_kind[ i ] = SHRED_OUT;
     773           0 :     } else if( 0==strcmp( link->name, "gossip_out" ) ) {
     774           0 :       ctx->in_kind[ i ] = GOSSIP_OUT;
     775           0 :     } else if( 0==strcmp( link->name, "repair_scap" ) ) {
     776           0 :       ctx->in_kind[ i ] = REPAIR_SHREDCAP;
     777           0 :     } else if( 0==strcmp( link->name, "replay_out" ) ) {
     778           0 :       ctx->in_kind[ i ] = REPLAY_OUT;
     779           0 :     } else {
     780           0 :       FD_LOG_ERR(( "scap tile has unexpected input link %s", link->name ));
     781           0 :     }
     782             : 
     783           0 :     ctx->in_links[ i ].mem    = link_wksp->wksp;
     784           0 :     ctx->in_links[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->in_links[ i ].mem, link->dcache );
     785           0 :     ctx->in_links[ i ].wmark  = fd_dcache_compact_wmark ( ctx->in_links[ i ].mem, link->dcache, link->mtu );
     786           0 :   }
     787             : 
     788           0 :   ctx->repair_intake_listen_port = tile->shredcap.repair_intake_listen_port;
     789           0 :   ctx->write_buf_sz = tile->shredcap.write_buffer_size ? tile->shredcap.write_buffer_size : FD_SHREDCAP_DEFAULT_WRITER_BUF_SZ;
     790             : 
     791             :   /* Set up stake weights tile output */
     792           0 :   ctx->stake_out->idx       = fd_topo_find_tile_out_link( topo, tile, "replay_stake", 0 );
     793           0 :   if( FD_LIKELY( ctx->stake_out->idx!=ULONG_MAX ) ) {
     794           0 :     fd_topo_link_t * stake_weights_out = &topo->links[ tile->out_link_id[ ctx->stake_out->idx] ];
     795           0 :     ctx->stake_out->mcache  = stake_weights_out->mcache;
     796           0 :     ctx->stake_out->mem     = topo->workspaces[ topo->objs[ stake_weights_out->dcache_obj_id ].wksp_id ].wksp;
     797           0 :     ctx->stake_out->sync    = fd_mcache_seq_laddr     ( ctx->stake_out->mcache );
     798           0 :     ctx->stake_out->depth   = fd_mcache_depth         ( ctx->stake_out->mcache );
     799           0 :     ctx->stake_out->seq     = fd_mcache_seq_query     ( ctx->stake_out->sync );
     800           0 :     ctx->stake_out->chunk0  = fd_dcache_compact_chunk0( ctx->stake_out->mem, stake_weights_out->dcache );
     801           0 :     ctx->stake_out->wmark   = fd_dcache_compact_wmark ( ctx->stake_out->mem, stake_weights_out->dcache, stake_weights_out->mtu );
     802           0 :     ctx->stake_out->chunk   = ctx->stake_out->chunk0;
     803           0 :   } else {
     804           0 :     FD_LOG_WARNING(( "no connection to stake_out link" ));
     805           0 :     memset( ctx->stake_out, 0, sizeof(out_link_t) );
     806           0 :   }
     807             : 
     808           0 :   ctx->snap_out->idx          = fd_topo_find_tile_out_link( topo, tile, "snapin_manif", 0 );
     809           0 :   if( FD_LIKELY( ctx->snap_out->idx!=ULONG_MAX ) ) {
     810           0 :     fd_topo_link_t * snap_out = &topo->links[tile->out_link_id[ctx->snap_out->idx]];
     811           0 :     ctx->snap_out->mem        = topo->workspaces[topo->objs[snap_out->dcache_obj_id].wksp_id].wksp;
     812           0 :     ctx->snap_out->chunk0     = fd_dcache_compact_chunk0( ctx->snap_out->mem, snap_out->dcache );
     813           0 :     ctx->snap_out->wmark      = fd_dcache_compact_wmark( ctx->snap_out->mem, snap_out->dcache, snap_out->mtu );
     814           0 :     ctx->snap_out->chunk      = ctx->snap_out->chunk0;
     815           0 :   } else {
     816           0 :     FD_LOG_WARNING(( "no connection to snap_out link" ));
     817           0 :     memset( ctx->snap_out, 0, sizeof(out_link_t) );
     818           0 :   }
     819             : 
     820             :   /* If the manifest is enabled (for processing), the stake_out link
     821             :      must be connected to the tile.  TODO in principle, it should be
     822             :      possible to gate the remaining of the manifest-related config. */
     823           0 :   ctx->enable_publish_stake_weights = tile->shredcap.enable_publish_stake_weights;
     824           0 :   FD_LOG_NOTICE(( "enable_publish_stake_weights ? %d", ctx->enable_publish_stake_weights ));
     825             : 
     826             :   /* manifest_wmark (root slot) */
     827           0 :   ulong root_slot_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "root_slot" );
     828           0 :   if( FD_LIKELY( root_slot_obj_id!=ULONG_MAX ) ) { /* for profiler */
     829           0 :     ctx->manifest_wmark = fd_fseq_join( fd_topo_obj_laddr( topo, root_slot_obj_id ) );
     830           0 :     if( FD_UNLIKELY( !ctx->manifest_wmark ) ) FD_LOG_ERR(( "no root_slot fseq" ));
     831           0 :     FD_TEST( ULONG_MAX==fd_fseq_query( ctx->manifest_wmark ) );
     832           0 :   }
     833             : 
     834           0 :   ctx->manifest_bank_mem    = manifest_bank_mem;
     835             : 
     836             :   // TODO: ???? Why is this calling fd_banks_new ... does not seem right
     837           0 :   ctx->banks = fd_banks_join( fd_banks_new( ctx->manifest_bank_mem, MANIFEST_MAX_TOTAL_BANKS, MANIFEST_MAX_FORK_WIDTH, 0 /* TODO? */, 8888UL /* TODO? */ ) );
     838           0 :   FD_TEST( ctx->banks );
     839           0 :   ctx->bank  = fd_banks_init_bank( ctx->banks );
     840           0 :   fd_bank_slot_set( ctx->bank, 0UL );
     841           0 :   FD_TEST( ctx->bank );
     842             : 
     843           0 :   strncpy( ctx->manifest_path, tile->shredcap.manifest_path, PATH_MAX );
     844           0 :   ctx->manifest_load_done = 0;
     845           0 :   ctx->manifest_spad_mem  = manifest_spad_mem;
     846           0 :   ctx->manifest_spad      = fd_spad_join( fd_spad_new( ctx->manifest_spad_mem, manifest_spad_max_alloc_footprint() ) );
     847           0 :   ctx->shared_spad_mem    = shared_spad_mem;
     848           0 :   ctx->shared_spad        = fd_spad_join( fd_spad_new( ctx->shared_spad_mem, shared_spad_max_alloc_footprint() ) );
     849             : 
     850             :   /* Allocate the write buffers */
     851           0 :   ctx->alloc = fd_alloc_join( fd_alloc_new( alloc_mem, FD_SHREDCAP_ALLOC_TAG ), fd_tile_idx() );
     852           0 :   if( FD_UNLIKELY( !ctx->alloc ) ) {
     853           0 :     FD_LOG_ERR( ( "fd_alloc_join failed" ) );
     854           0 :   }
     855             : 
     856             :   /* Setup the csv files to be in the expected state */
     857             : 
     858           0 :   init_file_handlers( ctx, &ctx->shreds_fd,      tile->shredcap.shreds_fd,      &ctx->shreds_buf,      &ctx->shred_ostream );
     859           0 :   init_file_handlers( ctx, &ctx->requests_fd,    tile->shredcap.requests_fd,    &ctx->requests_buf,    &ctx->repair_ostream );
     860           0 :   init_file_handlers( ctx, &ctx->fecs_fd,        tile->shredcap.fecs_fd,        &ctx->fecs_buf,        &ctx->fecs_ostream );
     861           0 :   init_file_handlers( ctx, &ctx->peers_fd,       tile->shredcap.peers_fd,       &ctx->peers_buf,       &ctx->peers_ostream );
     862             : 
     863           0 :   int err = fd_io_buffered_ostream_write( &ctx->shred_ostream,  "src_ip,src_port,timestamp,slot,ref_tick,fec_set_idx,idx,is_turbine,is_data,nonce\n", 81UL );
     864           0 :   err    |= fd_io_buffered_ostream_write( &ctx->repair_ostream, "dst_ip,dst_port,timestamp,nonce,slot,idx\n", 41UL );
     865           0 :   err    |= fd_io_buffered_ostream_write( &ctx->fecs_ostream,   "timestamp,slot,ref_tick,fec_set_idx,data_cnt\n", 45UL );
     866           0 :   err    |= fd_io_buffered_ostream_write( &ctx->peers_ostream,  "peer_ip4_addr,peer_port,pubkey,turbine\n", 48UL );
     867             : 
     868           0 :   if( FD_UNLIKELY( err ) ) {
     869           0 :     FD_LOG_ERR(( "failed to write header to any of the 4 csv files (%i-%s)", errno, fd_io_strerror( errno ) ));
     870           0 :   }
     871             : 
     872             :   /* Setup the binary files to be in the expected state. These files are
     873             :      not csv, so we don't need headers. */
     874           0 :   init_file_handlers( ctx, &ctx->slices_fd,      tile->shredcap.slices_fd,      &ctx->slices_buf,      &ctx->slices_ostream );
     875           0 :   init_file_handlers( ctx, &ctx->bank_hashes_fd, tile->shredcap.bank_hashes_fd, &ctx->bank_hashes_buf, &ctx->bank_hashes_ostream );
     876           0 : }
     877             : 
     878           0 : #define STEM_BURST (1UL)
     879           0 : #define STEM_LAZY  (50UL)
     880             : 
     881           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_capture_tile_ctx_t
     882           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_capture_tile_ctx_t)
     883             : 
     884           0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
     885           0 : #define STEM_CALLBACK_DURING_FRAG during_frag
     886           0 : #define STEM_CALLBACK_AFTER_FRAG  after_frag
     887           0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
     888             : 
     889             : #include "../../disco/stem/fd_stem.c"
     890             : 
     891             : fd_topo_run_tile_t fd_tile_shredcap = {
     892             :   .name                     = "scap",
     893             :   .loose_footprint          = loose_footprint,
     894             :   .populate_allowed_seccomp = populate_allowed_seccomp,
     895             :   .populate_allowed_fds     = populate_allowed_fds,
     896             :   .scratch_align            = scratch_align,
     897             :   .scratch_footprint        = scratch_footprint,
     898             :   .privileged_init          = privileged_init,
     899             :   .unprivileged_init        = unprivileged_init,
     900             :   .run                      = stem_run,
     901             : };

Generated by: LCOV version 1.14