LCOV - code coverage report
Current view: top level - discof/shredcap - fd_shredcap_tile.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 302 0.0 %
Date: 2025-07-01 05:00:49 Functions: 0 13 0.0 %

          Line data    Source code
       1             : #define _GNU_SOURCE  /* Enable GNU and POSIX extensions */
       2             : #include "../../disco/tiles.h"
       3             : #include "../../disco/topo/fd_topo.h"
       4             : #include "../../disco/net/fd_net_tile.h"
       5             : #include "../../flamenco/types/fd_types.h"
       6             : #include "../../flamenco/fd_flamenco_base.h"
       7             : #include "../../disco/fd_disco.h"
       8             : 
       9             : #include <errno.h>
      10             : #include <fcntl.h>
      11             : #include <sys/mman.h>
      12             : #include <sys/stat.h>
      13             : #include <string.h>
      14             : #include <stdio.h>
      15             : #include <unistd.h>
      16             : #include <linux/unistd.h>
      17             : #include <sys/socket.h>
      18             : #include <linux/if_xdp.h>
      19             : #include "generated/fd_shredcap_tile_seccomp.h"
      20             : 
      21             : 
      22             : /* This tile spies on the net_shred, repair_net, and shred_repair links
      23             :    and currently outputs to a csv that can analyze repair performance
      24             :    in post. */
      25             : 
      26           0 : #define FD_SHREDCAP_DEFAULT_WRITER_BUF_SZ  (4096UL)  /* local filesystem block size */
      27           0 : #define FD_SHREDCAP_ALLOC_TAG              (4UL)
      28             : #define MAX_BUFFER_SIZE  ( 20000UL * sizeof(fd_shred_dest_wire_t))
      29             : 
      30           0 : #define NET_SHRED  (0UL)
      31           0 : #define REPAIR_NET (1UL)
      32           0 : #define SHRED_REPAIR (2UL)
      33           0 : #define GOSSIP_SHRED (3UL)
      34           0 : #define GOSSIP_REPAIR (4UL)
      35             : 
      36             : typedef union {
      37             :   struct {
      38             :     fd_wksp_t * mem;
      39             :     ulong       chunk0;
      40             :     ulong       wmark;
      41             :   };
      42             :   fd_net_rx_bounds_t net_rx;
      43             : } fd_capture_in_ctx_t;
      44             : 
      45             : struct fd_capture_tile_ctx {
      46             :   uchar               in_kind[ 32 ];
      47             :   fd_capture_in_ctx_t in_links[ 32 ];
      48             : 
      49             :   int skip_frag;
      50             :   ushort repair_intake_listen_port;
      51             : 
      52             :   ulong shred_buffer_sz;
      53             :   uchar shred_buffer[ FD_NET_MTU ];
      54             : 
      55             :   ulong repair_buffer_sz;
      56             :   uchar repair_buffer[ FD_NET_MTU ];
      57             : 
      58             : 
      59             :   fd_ip4_udp_hdrs_t intake_hdr[1];
      60             : 
      61             :   ulong now;
      62             :   ulong  last_packet_ns;
      63             :   double tick_per_ns;
      64             : 
      65             :   fd_io_buffered_ostream_t shred_ostream;
      66             :   fd_io_buffered_ostream_t repair_ostream;
      67             :   fd_io_buffered_ostream_t fecs_ostream;
      68             :   fd_io_buffered_ostream_t peers_ostream;
      69             : 
      70             :   int  shreds_fd;
      71             :   int  requests_fd;
      72             :   int  fecs_fd;
      73             :   int  peers_fd;
      74             : 
      75             :   ulong write_buf_sz;
      76             : 
      77             :   uchar * shreds_buf;
      78             :   uchar * requests_buf;
      79             :   uchar * fecs_buf;
      80             :   uchar * peers_buf;
      81             : 
      82             :   fd_alloc_t * alloc;
      83             :   uchar contact_info_buffer[ MAX_BUFFER_SIZE ];
      84             : };
      85             : typedef struct fd_capture_tile_ctx fd_capture_tile_ctx_t;
      86             : 
      87             : FD_FN_CONST static inline ulong
      88           0 : scratch_align( void ) {
      89           0 :   return 4096UL;
      90           0 : }
      91             : 
      92             : FD_FN_PURE static inline ulong
      93           0 : loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
      94           0 :   return 1UL * FD_SHMEM_GIGANTIC_PAGE_SZ;
      95           0 : }
      96             : 
      97             : static ulong
      98             : populate_allowed_seccomp( fd_topo_t const *      topo FD_PARAM_UNUSED,
      99             :                           fd_topo_tile_t const * tile,
     100             :                           ulong                  out_cnt,
     101           0 :                           struct sock_filter *   out ) {
     102           0 :   populate_sock_filter_policy_fd_shredcap_tile( out_cnt,
     103           0 :                                              out,
     104           0 :                                              (uint)fd_log_private_logfile_fd(),
     105           0 :                                              (uint)tile->shredcap.shreds_fd,
     106           0 :                                              (uint)tile->shredcap.requests_fd,
     107           0 :                                              (uint)tile->shredcap.fecs_fd,
     108           0 :                                              (uint)tile->shredcap.peers_fd );
     109           0 :   return sock_filter_policy_fd_shredcap_tile_instr_cnt;
     110           0 : }
     111             : 
     112             : 
     113             : FD_FN_PURE static inline ulong
     114           0 : scratch_footprint( fd_topo_tile_t const * tile ) {
     115           0 :   (void)tile;
     116           0 :   ulong l = FD_LAYOUT_INIT;
     117           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_capture_tile_ctx_t), sizeof(fd_capture_tile_ctx_t) );
     118           0 :   l = FD_LAYOUT_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
     119           0 :   return FD_LAYOUT_FINI( l, scratch_align() );
     120           0 : }
     121             : 
     122             : static inline int
     123             : before_frag( fd_capture_tile_ctx_t * ctx,
     124             :              ulong            in_idx,
     125             :              ulong            seq FD_PARAM_UNUSED,
     126           0 :              ulong            sig ) {
     127           0 :   if( FD_LIKELY( ctx->in_kind[ in_idx ]==NET_SHRED ) ) {
     128           0 :     return (fd_disco_netmux_sig_proto( sig )!=DST_PROTO_SHRED) & (fd_disco_netmux_sig_proto( sig )!=DST_PROTO_REPAIR);
     129           0 :   }
     130           0 :   return 0;
     131           0 : }
     132             : 
     133             : static inline void
     134             : handle_new_turbine_contact_info( fd_capture_tile_ctx_t * ctx,
     135           0 :                                  uchar const *          buf ) {
     136           0 :   ulong const * header = (ulong const *)fd_type_pun_const( buf );
     137           0 :   ulong dest_cnt = header[ 0 ];
     138             : 
     139           0 :   fd_shred_dest_wire_t const * in_dests = fd_type_pun_const( header+1UL );
     140             : 
     141           0 :   for( ulong i=0UL; i<dest_cnt; i++ ) {
     142             :     // need to bswap the port
     143             :     //ushort port = fd_ushort_bswap( in_dests[i].udp_port );
     144           0 :     char peers_buf[1024];
     145           0 :     snprintf( peers_buf, sizeof(peers_buf),
     146           0 :               "%u,%u,%s,%d\n",
     147           0 :               in_dests[i].ip4_addr, in_dests[i].udp_port, FD_BASE58_ENC_32_ALLOCA(in_dests[i].pubkey), 1);
     148           0 :     int err = fd_io_buffered_ostream_write( &ctx->peers_ostream, peers_buf, strlen(peers_buf) );
     149           0 :     FD_TEST( err==0 );
     150           0 :   }
     151           0 : }
     152             : 
     153             : 
     154             : static int
     155           0 : is_fec_completes_msg( ulong sz ) {
     156           0 :   return sz == FD_SHRED_DATA_HEADER_SZ + FD_SHRED_MERKLE_ROOT_SZ;
     157           0 : }
     158             : 
     159             : static inline void
     160             : during_frag( fd_capture_tile_ctx_t * ctx,
     161             :              ulong                   in_idx,
     162             :              ulong                   seq     FD_PARAM_UNUSED,
     163             :              ulong                   sig     FD_PARAM_UNUSED,
     164             :              ulong                   chunk,
     165             :              ulong                   sz,
     166           0 :              ulong                   ctl FD_PARAM_UNUSED ) {
     167           0 :   ctx->skip_frag = 0;
     168           0 :   if( ctx->in_kind[ in_idx ]==SHRED_REPAIR ) {
     169           0 :     if( !is_fec_completes_msg( sz ) ) {
     170           0 :       ctx->skip_frag = 1;
     171           0 :       return;
     172           0 :     }
     173           0 :     fd_memcpy( ctx->shred_buffer, fd_chunk_to_laddr_const( ctx->in_links[ in_idx ].mem, chunk ), sz );
     174           0 :     ctx->shred_buffer_sz = sz;
     175           0 :   } else if( ctx->in_kind[ in_idx ] == NET_SHRED ) {
     176           0 :     uchar const * dcache_entry = fd_net_rx_translate_frag( &ctx->in_links[ in_idx ].net_rx, chunk, ctl, sz );
     177           0 :     ulong hdr_sz = fd_disco_netmux_sig_hdr_sz( sig );
     178           0 :     FD_TEST( hdr_sz <= sz ); /* Should be ensured by the net tile */
     179           0 :     fd_shred_t const * shred = fd_shred_parse( dcache_entry+hdr_sz, sz-hdr_sz );
     180           0 :     if( FD_UNLIKELY( !shred ) ) {
     181           0 :       ctx->skip_frag = 1;
     182           0 :       return;
     183           0 :     };
     184           0 :     fd_memcpy( ctx->shred_buffer, dcache_entry, sz );
     185           0 :     ctx->shred_buffer_sz = sz-hdr_sz;
     186           0 :   } else if( ctx->in_kind[ in_idx ] == REPAIR_NET ) {
     187             :     /* Repair will have outgoing pings, outgoing repair requests, and
     188             :        outgoing served shreds we want to filter everything but the
     189             :        repair requests.
     190             :        1. We can index into the ip4 udp packet hdr and check if the src
     191             :           port is the intake listen port or serve port
     192             :        2. Then we can filter on the discriminant which luckily does not
     193             :           require decoding! */
     194             : 
     195           0 :     uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in_links[ in_idx ].mem, chunk );
     196           0 :     fd_ip4_udp_hdrs_t const * hdr = (fd_ip4_udp_hdrs_t const *)dcache_entry;
     197           0 :     if( hdr->udp->net_sport != fd_ushort_bswap( ctx->repair_intake_listen_port ) ) {
     198           0 :       ctx->skip_frag = 1;
     199           0 :       return;
     200           0 :     }
     201           0 :     const uchar * encoded_protocol = dcache_entry + sizeof(fd_ip4_udp_hdrs_t);
     202           0 :     uint discriminant = FD_LOAD(uint, encoded_protocol);
     203             : 
     204           0 :     if( FD_UNLIKELY( discriminant <= fd_repair_protocol_enum_pong ) ) {
     205           0 :       ctx->skip_frag = 1;
     206           0 :       return;
     207           0 :     }
     208           0 :     fd_memcpy( ctx->repair_buffer, dcache_entry, sz );
     209           0 :     ctx->repair_buffer_sz = sz;
     210           0 :   } else {
     211             :     // contact infos can be copied into a buffer
     212           0 :     if( FD_UNLIKELY( chunk<ctx->in_links[ in_idx ].chunk0 || chunk>ctx->in_links[ in_idx ].wmark ) ) {
     213           0 :       FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
     214           0 :                    ctx->in_links[ in_idx ].chunk0, ctx->in_links[ in_idx ].wmark ));
     215           0 :     }
     216           0 :     uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in_links[ in_idx ].mem, chunk );
     217           0 :     fd_memcpy( ctx->contact_info_buffer, dcache_entry, sz );
     218           0 :   }
     219           0 : }
     220             : 
     221             : static inline void
     222             : after_frag( fd_capture_tile_ctx_t * ctx,
     223             :             ulong                   in_idx,
     224             :             ulong                   seq    FD_PARAM_UNUSED,
     225             :             ulong                   sig,
     226             :             ulong                   sz,
     227             :             ulong                   tsorig FD_PARAM_UNUSED,
     228             :             ulong                   tspub  FD_PARAM_UNUSED,
     229           0 :             fd_stem_context_t *     stem   FD_PARAM_UNUSED ) {
     230           0 :   if( FD_UNLIKELY( ctx->skip_frag ) ) return;
     231             : 
     232           0 :   if( ctx->in_kind[ in_idx ] == SHRED_REPAIR ) {
     233             :     /* This is a fec completes message! we can use it to check how long
     234             :        it takes to complete a fec */
     235             : 
     236           0 :     fd_shred_t const * shred = (fd_shred_t *)fd_type_pun( ctx->shred_buffer );
     237           0 :     uint data_cnt = fd_disco_shred_repair_fec_sig_data_cnt( sig );
     238           0 :     uint ref_tick = shred->data.flags & FD_SHRED_DATA_REF_TICK_MASK;
     239           0 :     char fec_complete[1024];
     240           0 :     snprintf( fec_complete, sizeof(fec_complete),
     241           0 :              "%ld,%lu,%u,%u,%u\n",
     242           0 :               fd_log_wallclock(), shred->slot, ref_tick, shred->fec_set_idx, data_cnt );
     243             : 
     244             :     // Last shred is guaranteed to be a data shred
     245             : 
     246             : 
     247           0 :     int err = fd_io_buffered_ostream_write( &ctx->fecs_ostream, fec_complete, strlen(fec_complete) );
     248           0 :     FD_TEST( err==0 );
     249           0 :   } else if( ctx->in_kind[ in_idx ] == NET_SHRED ) {
     250             :     /* TODO: leader schedule early exits in shred tile right around
     251             :        startup, which discards some turbine shreds, but there is a
     252             :        chance we capture this shred here. Currently handled in post, but
     253             :        in the future will want to get the leader schedule here so we can
     254             :        also benchmark whether the excepcted sender in the turbine tree
     255             :        matches the actual sender. */
     256             : 
     257           0 :     ulong hdr_sz     = fd_disco_netmux_sig_hdr_sz( sig );
     258           0 :     fd_ip4_udp_hdrs_t * hdr = (fd_ip4_udp_hdrs_t *)ctx->shred_buffer;
     259           0 :     uint src_ip4_addr = hdr->ip4->saddr;
     260           0 :     ushort src_port   = hdr->udp->net_sport;
     261             : 
     262           0 :     fd_shred_t const * shred = fd_shred_parse( ctx->shred_buffer + hdr_sz, sz - hdr_sz );
     263           0 :     int   is_turbine = fd_disco_netmux_sig_proto( sig ) == DST_PROTO_SHRED;
     264           0 :     uint  nonce      = is_turbine ? 0 : FD_LOAD(uint, ctx->shred_buffer + hdr_sz + fd_shred_sz( shred ) );
     265           0 :     int   is_data    = fd_shred_is_data( fd_shred_type( shred->variant ) );
     266           0 :     ulong slot       = shred->slot;
     267           0 :     uint  idx        = shred->idx;
     268           0 :     uint  fec_idx    = shred->fec_set_idx;
     269           0 :     uint  ref_tick   = 65;
     270           0 :     if( FD_UNLIKELY( is_turbine && is_data ) ) {
     271             :       /* We can then index into the flag and get a REFTICK */
     272           0 :       ref_tick = shred->data.flags & FD_SHRED_DATA_REF_TICK_MASK;
     273           0 :     }
     274             : 
     275           0 :     char repair_data_buf[1024];
     276           0 :     snprintf( repair_data_buf, sizeof(repair_data_buf),
     277           0 :              "%u,%u,%ld,%lu,%u,%u,%u,%d,%d,%u\n",
     278           0 :               src_ip4_addr, src_port, fd_log_wallclock(), slot, ref_tick, fec_idx, idx, is_turbine, is_data, nonce );
     279             : 
     280           0 :     int err = fd_io_buffered_ostream_write( &ctx->shred_ostream, repair_data_buf, strlen(repair_data_buf) );
     281           0 :     FD_TEST( err==0 );
     282           0 :   } else if( ctx->in_kind[ in_idx ] == REPAIR_NET ) {
     283             :     /* We have a valid repair request that we can finally decode.
     284             :        Unfortunately we actually have to decode because we cant cast
     285             :        directly to the protocol */
     286           0 :     fd_ip4_udp_hdrs_t * hdr = (fd_ip4_udp_hdrs_t *)ctx->repair_buffer;
     287           0 :     fd_repair_protocol_t protocol;
     288           0 :     fd_bincode_decode_ctx_t bctx = { .data = ctx->repair_buffer + sizeof(fd_ip4_udp_hdrs_t), .dataend = ctx->repair_buffer + ctx->repair_buffer_sz };
     289           0 :     fd_repair_protocol_t * decoded = fd_repair_protocol_decode( &protocol, &bctx );
     290             : 
     291           0 :     FD_TEST( decoded == &protocol );
     292           0 :     FD_TEST( decoded != NULL );
     293             : 
     294           0 :     uint   peer_ip4_addr = hdr->ip4->daddr;
     295           0 :     ushort peer_port     = hdr->udp->net_dport;
     296           0 :     ulong  slot          = 0UL;
     297           0 :     ulong  shred_index   = UINT_MAX;
     298           0 :     uint   nonce         = 0U;
     299             : 
     300           0 :     switch( protocol.discriminant ) {
     301           0 :       case fd_repair_protocol_enum_window_index: {
     302           0 :         slot        = protocol.inner.window_index.slot;
     303           0 :         shred_index = protocol.inner.window_index.shred_index;
     304           0 :         nonce       = protocol.inner.window_index.header.nonce;
     305           0 :         break;
     306           0 :       }
     307           0 :       case fd_repair_protocol_enum_highest_window_index: {
     308           0 :         slot        = protocol.inner.highest_window_index.slot;
     309           0 :         shred_index = protocol.inner.highest_window_index.shred_index;
     310           0 :         nonce       = protocol.inner.highest_window_index.header.nonce;
     311           0 :         break;
     312           0 :       }
     313           0 :       case fd_repair_protocol_enum_orphan: {
     314           0 :         slot  = protocol.inner.orphan.slot;
     315           0 :         nonce = protocol.inner.orphan.header.nonce;
     316           0 :         break;
     317           0 :       }
     318           0 :       default:
     319           0 :         break;
     320           0 :     }
     321             : 
     322           0 :     char repair_data_buf[1024];
     323           0 :     snprintf( repair_data_buf, sizeof(repair_data_buf),
     324           0 :               "%u,%u,%ld,%u,%lu,%lu\n",
     325           0 :               peer_ip4_addr, peer_port, fd_log_wallclock(), nonce, slot, shred_index );
     326           0 :     int err = fd_io_buffered_ostream_write( &ctx->repair_ostream, repair_data_buf, strlen(repair_data_buf) );
     327           0 :     FD_TEST( err==0 );
     328           0 :   } else if( ctx->in_kind[ in_idx ] == GOSSIP_REPAIR ) {
     329           0 :     fd_shred_dest_wire_t const * in_dests = (fd_shred_dest_wire_t const *)fd_type_pun_const( ctx->contact_info_buffer );
     330           0 :     ulong dest_cnt = sz;
     331           0 :     for( ulong i=0UL; i<dest_cnt; i++ ) {
     332           0 :       char peers_buf[1024];
     333           0 :       snprintf( peers_buf, sizeof(peers_buf),
     334           0 :                 "%u,%u,%s,%d\n",
     335           0 :                  in_dests[i].ip4_addr, in_dests[i].udp_port, FD_BASE58_ENC_32_ALLOCA(in_dests[i].pubkey), 0);
     336           0 :       int err = fd_io_buffered_ostream_write( &ctx->peers_ostream, peers_buf, strlen(peers_buf) );
     337           0 :       FD_TEST( err==0 );
     338           0 :     }
     339           0 :   } else { // crds_shred contact infos
     340           0 :     handle_new_turbine_contact_info( ctx, ctx->contact_info_buffer );
     341           0 :   }
     342           0 : }
     343             : 
     344             : static ulong
     345             : populate_allowed_fds( fd_topo_t const      * topo        FD_PARAM_UNUSED,
     346             :                       fd_topo_tile_t const * tile        FD_PARAM_UNUSED,
     347             :                       ulong                  out_fds_cnt FD_PARAM_UNUSED,
     348           0 :                       int *                  out_fds ) {
     349           0 :   ulong out_cnt = 0UL;
     350             : 
     351           0 :   out_fds[ out_cnt++ ] = 2; /* stderr */
     352           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
     353           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
     354           0 :   if( FD_LIKELY( -1!=tile->shredcap.shreds_fd ) )
     355           0 :     out_fds[ out_cnt++ ] = tile->shredcap.shreds_fd; /* shred file */
     356           0 :   if( FD_LIKELY( -1!=tile->shredcap.requests_fd ) )
     357           0 :     out_fds[ out_cnt++ ] = tile->shredcap.requests_fd; /* request file */
     358           0 :   if( FD_LIKELY( -1!=tile->shredcap.fecs_fd ) )
     359           0 :     out_fds[ out_cnt++ ] = tile->shredcap.fecs_fd; /* fec complete file */
     360           0 :   if( FD_LIKELY( -1!=tile->shredcap.peers_fd ) )
     361           0 :     out_fds[ out_cnt++ ] = tile->shredcap.peers_fd; /* peers file */
     362             : 
     363           0 :   return out_cnt;
     364           0 : }
     365             : 
     366             : static void
     367             : privileged_init( fd_topo_t *      topo FD_PARAM_UNUSED,
     368           0 :                  fd_topo_tile_t * tile ) {
     369           0 :   char file_path[PATH_MAX];
     370           0 :   strcpy( file_path, tile->shredcap.folder_path );
     371           0 :   strcat( file_path, "/shred_data.csv" );
     372           0 :   tile->shredcap.shreds_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
     373           0 :   if ( FD_UNLIKELY( tile->shredcap.shreds_fd == -1 ) ) {
     374           0 :     FD_LOG_ERR(( "failed to open or create shred csv dump file %s %d %s", file_path, errno, strerror(errno) ));
     375           0 :   }
     376             : 
     377           0 :   strcpy( file_path, tile->shredcap.folder_path );
     378           0 :   strcat( file_path, "/request_data.csv" );
     379           0 :   tile->shredcap.requests_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
     380           0 :   if ( FD_UNLIKELY( tile->shredcap.requests_fd == -1 ) ) {
     381           0 :     FD_LOG_ERR(( "failed to open or create request csv dump file %s %d %s", file_path, errno, strerror(errno) ));
     382           0 :   }
     383             : 
     384           0 :   strcpy( file_path, tile->shredcap.folder_path );
     385           0 :   strcat( file_path, "/fec_complete.csv" );
     386           0 :   tile->shredcap.fecs_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
     387           0 :   if ( FD_UNLIKELY( tile->shredcap.fecs_fd == -1 ) ) {
     388           0 :     FD_LOG_ERR(( "failed to open or create fec complete csv dump file %s %d %s", file_path, errno, strerror(errno) ));
     389           0 :   }
     390           0 :   FD_LOG_NOTICE(( "Opening shred csv dump file at %s", file_path ));
     391             : 
     392           0 :   strcpy( file_path, tile->shredcap.folder_path );
     393           0 :   strcat( file_path, "/peers.csv" );
     394           0 :   tile->shredcap.peers_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
     395           0 :   if ( FD_UNLIKELY( tile->shredcap.peers_fd == -1 ) ) {
     396           0 :     FD_LOG_ERR(( "failed to open or create peers csv dump file %s %d %s", file_path, errno, strerror(errno) ));
     397           0 :   }
     398           0 : }
     399             : 
     400             : static void
     401             : init_file_handlers( fd_capture_tile_ctx_t    * ctx,
     402             :                     int                      * ctx_file,
     403             :                     int                        tile_file,
     404             :                     uchar                   ** ctx_buf,
     405           0 :                     fd_io_buffered_ostream_t * ctx_ostream ) {
     406           0 :   *ctx_file =  tile_file ;
     407             : 
     408           0 :   int err = ftruncate( *ctx_file, 0UL );
     409           0 :   if( FD_UNLIKELY( err ) ) {
     410           0 :     FD_LOG_ERR(( "failed to truncate file (%i-%s)", errno, fd_io_strerror( errno ) ));
     411           0 :   }
     412           0 :   long seek = lseek( *ctx_file, 0UL, SEEK_SET );
     413           0 :   if( FD_UNLIKELY( seek!=0L ) ) {
     414           0 :     FD_LOG_ERR(( "failed to seek to the beginning of file" ));
     415           0 :   }
     416             : 
     417           0 :   *ctx_buf = fd_alloc_malloc( ctx->alloc, 4096, ctx->write_buf_sz );
     418           0 :   if( FD_UNLIKELY( *ctx_buf == NULL ) ) {
     419           0 :     FD_LOG_ERR(( "failed to allocate ostream buffer" ));
     420           0 :   }
     421             : 
     422           0 :   if( FD_UNLIKELY( !fd_io_buffered_ostream_init(
     423           0 :     ctx_ostream,
     424           0 :     *ctx_file,
     425           0 :     *ctx_buf,
     426           0 :     ctx->write_buf_sz ) ) ) {
     427           0 :     FD_LOG_ERR(( "failed to initialize ostream" ));
     428           0 :   }
     429           0 : }
     430             : 
     431             : 
     432             : static void
     433             : unprivileged_init( fd_topo_t *      topo,
     434           0 :                    fd_topo_tile_t * tile ) {
     435             : 
     436           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     437           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     438           0 :   fd_capture_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_capture_tile_ctx_t), sizeof(fd_capture_tile_ctx_t) );
     439           0 :   void * alloc_mem            = FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
     440           0 :   FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
     441             : 
     442             :   /* Input links */
     443           0 :   for( ulong i=0; i<tile->in_cnt; i++ ) {
     444           0 :     fd_topo_link_t * link = &topo->links[ tile->in_link_id[ i ] ];
     445           0 :     fd_topo_wksp_t * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ];
     446           0 :     if( 0==strcmp( link->name, "net_shred" ) ) {
     447           0 :       ctx->in_kind[ i ] = NET_SHRED;
     448           0 :       fd_net_rx_bounds_init( &ctx->in_links[ i ].net_rx, link->dcache );
     449           0 :       continue;
     450           0 :     } else if( 0==strcmp( link->name, "repair_net" ) ) {
     451           0 :       ctx->in_kind[ i ] = REPAIR_NET;
     452           0 :     } else if( 0==strcmp( link->name, "shred_repair" ) ) {
     453           0 :       ctx->in_kind[ i ] = SHRED_REPAIR;
     454           0 :     } else if( 0==strcmp( link->name, "crds_shred" ) ) {
     455           0 :       ctx->in_kind[ i ] = GOSSIP_SHRED;
     456           0 :     } else if( 0==strcmp( link->name, "gossip_repai" ) ) {
     457           0 :       ctx->in_kind[ i ] = GOSSIP_REPAIR;
     458           0 :     } else {
     459           0 :       FD_LOG_ERR(( "repair tile has unexpected input link %s", link->name ));
     460           0 :     }
     461             : 
     462           0 :     ctx->in_links[ i ].mem    = link_wksp->wksp;
     463           0 :     ctx->in_links[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->in_links[ i ].mem, link->dcache );
     464           0 :     ctx->in_links[ i ].wmark  = fd_dcache_compact_wmark ( ctx->in_links[ i ].mem, link->dcache, link->mtu );
     465           0 :   }
     466             : 
     467           0 :   ctx->repair_intake_listen_port = tile->shredcap.repair_intake_listen_port;
     468           0 :   ctx->write_buf_sz = tile->shredcap.write_buffer_size ? tile->shredcap.write_buffer_size : FD_SHREDCAP_DEFAULT_WRITER_BUF_SZ;
     469             : 
     470             :   /* Allocate the write buffers */
     471           0 :   ctx->alloc = fd_alloc_join( fd_alloc_new( alloc_mem, FD_SHREDCAP_ALLOC_TAG ), fd_tile_idx() );
     472           0 :   if( FD_UNLIKELY( !ctx->alloc ) ) {
     473           0 :     FD_LOG_ERR( ( "fd_alloc_join failed" ) );
     474           0 :   }
     475             : 
     476             :   /* Setup the csv files to be in the expected state */
     477             : 
     478           0 :   init_file_handlers( ctx, &ctx->shreds_fd,   tile->shredcap.shreds_fd,   &ctx->shreds_buf,   &ctx->shred_ostream );
     479           0 :   init_file_handlers( ctx, &ctx->requests_fd, tile->shredcap.requests_fd, &ctx->requests_buf, &ctx->repair_ostream );
     480           0 :   init_file_handlers( ctx, &ctx->fecs_fd,     tile->shredcap.fecs_fd,     &ctx->fecs_buf,     &ctx->fecs_ostream );
     481           0 :   init_file_handlers( ctx, &ctx->peers_fd,    tile->shredcap.peers_fd,    &ctx->peers_buf,    &ctx->peers_ostream );
     482             : 
     483           0 :   int err = fd_io_buffered_ostream_write( &ctx->shred_ostream,  "src_ip,src_port,timestamp,slot,ref_tick,fec_set_idx,idx,is_turbine,is_data,nonce\n", 81UL );
     484           0 :   err    |= fd_io_buffered_ostream_write( &ctx->repair_ostream, "dst_ip,dst_port,timestamp,nonce,slot,idx\n", 41UL );
     485           0 :   err    |= fd_io_buffered_ostream_write( &ctx->fecs_ostream,   "timestamp,slot,ref_tick,fec_set_idx,data_cnt\n", 45UL );
     486           0 :   err    |= fd_io_buffered_ostream_write( &ctx->peers_ostream,  "peer_ip4_addr,peer_port,pubkey,turbine\n", 48UL );
     487             : 
     488           0 :   if( FD_UNLIKELY( err ) ) {
     489           0 :     FD_LOG_ERR(( "failed to write header to any of the 4 files (%i-%s)", errno, fd_io_strerror( errno ) ));
     490           0 :   }
     491           0 : }
     492             : 
     493           0 : #define STEM_BURST (1UL)
     494           0 : #define STEM_LAZY  (50UL)
     495             : 
     496           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_capture_tile_ctx_t
     497           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_capture_tile_ctx_t)
     498             : 
     499           0 : #define STEM_CALLBACK_DURING_FRAG          during_frag
     500           0 : #define STEM_CALLBACK_AFTER_FRAG           after_frag
     501           0 : #define STEM_CALLBACK_BEFORE_FRAG          before_frag
     502             : 
     503             : #include "../../disco/stem/fd_stem.c"
     504             : 
     505             : fd_topo_run_tile_t fd_tile_shredcap = {
     506             :   .name                     = "shrdcp",
     507             :   .loose_footprint          = loose_footprint,
     508             :   .populate_allowed_seccomp = populate_allowed_seccomp,
     509             :   .populate_allowed_fds     = populate_allowed_fds,
     510             :   .scratch_align            = scratch_align,
     511             :   .scratch_footprint        = scratch_footprint,
     512             :   .privileged_init          = privileged_init,
     513             :   .unprivileged_init        = unprivileged_init,
     514             :   .run                      = stem_run,
     515             : };

Generated by: LCOV version 1.14