LCOV - code coverage report
Current view: top level - disco/archiver - fd_archiver_playback.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 177 0.0 %
Date: 2025-07-01 05:00:49 Functions: 0 10 0.0 %

          Line data    Source code
       1             : #define _GNU_SOURCE  /* Enable GNU and POSIX extensions */
       2             : 
       3             : #include "../tiles.h"
       4             : 
       5             : #include "fd_archiver.h"
       6             : #include <errno.h>
       7             : #include <fcntl.h>
       8             : #include <string.h>
       9             : #include <sys/mman.h>
      10             : #include <sys/stat.h>
      11             : #include <unistd.h>
      12             : #include <linux/unistd.h>
      13             : #include <sys/socket.h>
      14             : #include <linux/if_xdp.h>
      15             : #include "generated/archiver_playback_seccomp.h"
      16             : #include "../../util/pod/fd_pod_format.h"
      17             : /* The archiver playback tile consumes from the archive file, adds artificial delay
      18             : to reproduce exactly the timing from the capture, and forwards these fragments to the
      19             : receiver tiles (shred/quic/gossip/repair).
      20             : 
      21             : There should be a single archiver playback tile, and it should replace the input links to the
      22             : receiver tiles.
      23             : */
      24             : 
      25           0 : #define NET_SHRED_OUT_IDX  (0UL)
      26           0 : #define NET_REPAIR_OUT_IDX (1UL)
      27             : 
      28           0 : #define FD_ARCHIVER_PLAYBACK_ALLOC_TAG   (3UL)
      29             : 
      30             : #define FD_ARCHIVER_STARTUP_DELAY_SECONDS (1)
      31           0 : #define FD_ARCHIVE_PLAYBACK_BUFFER_SZ      (FD_SHMEM_GIGANTIC_PAGE_SZ)
      32             : 
      33             : struct fd_archiver_playback_stats {
      34             :   ulong net_shred_out_cnt;
      35             :   ulong net_quic_out_cnt;
      36             :   ulong net_gossip_out_cnt;
      37             :   ulong net_repair_out_cnt;
      38             : 
      39             : };
      40             : typedef struct fd_archiver_playback_stats fd_archiver_playback_stats_t;
      41             : 
      42             : typedef struct {
      43             :   fd_wksp_t * mem;
      44             :   ulong       mtu;
      45             :   ulong       chunk0;
      46             :   ulong       wmark;
      47             :   ulong       chunk;
      48             : } fd_archiver_playback_out_ctx_t;
      49             : 
      50             : struct fd_archiver_playback_tile_ctx {
      51             :   fd_io_buffered_istream_t istream;
      52             :   uchar *                  istream_buf;
      53             : 
      54             :   fd_archiver_playback_stats_t stats;
      55             : 
      56             :   double tick_per_ns;
      57             : 
      58             :   ulong prev_publish_time;
      59             :   ulong now;
      60             :   ulong need_notify;
      61             :   ulong notified;
      62             : 
      63             :   fd_archiver_playback_out_ctx_t out[ 32 ];
      64             : 
      65             :   fd_alloc_t * alloc;
      66             :   fd_valloc_t  valloc;
      67             : 
      68             :   ulong playback_done;
      69             :   ulong done_time;
      70             :   ulong playback_started;
      71             :   ulong playback_cnt[FD_ARCHIVER_TILE_CNT];
      72             : 
      73             :   ulong * published_wmark; /* same as the one in replay tile */
      74             : };
      75             : typedef struct fd_archiver_playback_tile_ctx fd_archiver_playback_tile_ctx_t;
      76             : 
      77             : FD_FN_CONST static inline ulong
      78           0 : scratch_align( void ) {
      79           0 :   return 4096UL;
      80           0 : }
      81             : 
      82             : FD_FN_PURE static inline ulong
      83           0 : loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
      84           0 :   return 1UL * FD_SHMEM_GIGANTIC_PAGE_SZ;
      85           0 : }
      86             : 
      87             : static ulong
      88             : populate_allowed_seccomp( fd_topo_t const *      topo,
      89             :                           fd_topo_tile_t const * tile,
      90             :                           ulong                  out_cnt,
      91           0 :                           struct sock_filter *   out ) {
      92           0 :   (void)topo;
      93             : 
      94           0 :   populate_sock_filter_policy_archiver_playback( out_cnt,
      95           0 :                                                  out,
      96           0 :                                                  (uint)fd_log_private_logfile_fd(),
      97           0 :                                                  (uint)tile->archiver.archive_fd );
      98           0 :   return sock_filter_policy_archiver_playback_instr_cnt;
      99           0 : }
     100             : 
     101             : static ulong
     102             : populate_allowed_fds( fd_topo_t const *      topo        FD_PARAM_UNUSED,
     103             :                       fd_topo_tile_t const * tile,
     104             :                       ulong                  out_fds_cnt FD_PARAM_UNUSED,
     105           0 :                       int *                  out_fds ) {
     106           0 :   ulong out_cnt = 0UL;
     107             : 
     108           0 :   out_fds[ out_cnt++ ] = 2; /* stderr */
     109           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
     110           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
     111           0 :   if( FD_LIKELY( -1!=tile->archiver.archive_fd ) )
     112           0 :     out_fds[ out_cnt++ ] = tile->archiver.archive_fd; /* archive file */
     113             : 
     114           0 :   return out_cnt;
     115           0 : }
     116             : 
     117             : FD_FN_PURE static inline ulong
     118           0 : scratch_footprint( fd_topo_tile_t const * tile ) {
     119           0 :   (void)tile;
     120           0 :   ulong l = FD_LAYOUT_INIT;
     121           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_archiver_playback_tile_ctx_t), sizeof(fd_archiver_playback_tile_ctx_t) );
     122           0 :   return FD_LAYOUT_FINI( l, scratch_align() );
     123           0 : }
     124             : 
     125             : static void
     126             : privileged_init( fd_topo_t *      topo,
     127           0 :                  fd_topo_tile_t * tile ) {
     128           0 :     void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     129             : 
     130           0 :     FD_SCRATCH_ALLOC_INIT( l, scratch );
     131           0 :     fd_archiver_playback_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_archiver_playback_tile_ctx_t), sizeof(fd_archiver_playback_tile_ctx_t) );
     132           0 :     memset( ctx, 0, sizeof(fd_archiver_playback_tile_ctx_t) );
     133           0 :     FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
     134           0 :     FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
     135             : 
     136           0 :     tile->archiver.archive_fd = open( tile->archiver.archiver_path, O_RDONLY | O_DIRECT, 0666 );
     137           0 :     if ( FD_UNLIKELY( tile->archiver.archive_fd == -1 ) ) {
     138           0 :       FD_LOG_ERR(( "failed to open archive file %s %d %d %s", tile->archiver.archiver_path, tile->archiver.archive_fd, errno, strerror(errno) ));
     139           0 :     }
     140           0 : }
     141             : 
     142             : static void
     143             : unprivileged_init( fd_topo_t *      topo,
     144           0 :                    fd_topo_tile_t * tile ) {
     145             : 
     146           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     147           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     148           0 :   fd_archiver_playback_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_archiver_playback_tile_ctx_t), sizeof(fd_archiver_playback_tile_ctx_t) );
     149           0 :   void * alloc_shmem                    = FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
     150           0 :   FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
     151             : 
     152           0 :   ctx->tick_per_ns = fd_tempo_tick_per_ns( NULL );
     153             : 
     154             :   /* Allocator */
     155           0 :   ctx->alloc = fd_alloc_join( fd_alloc_new( alloc_shmem, FD_ARCHIVER_PLAYBACK_ALLOC_TAG ), fd_tile_idx() );
     156           0 :   if( FD_UNLIKELY( !ctx->alloc ) ) {
     157           0 :     FD_LOG_ERR( ( "fd_alloc_join failed" ) );
     158           0 :   }
     159           0 :   ctx->valloc = fd_alloc_virtual( ctx->alloc );
     160             : 
     161             :   /* Allocate output buffer */
     162           0 :   ctx->istream_buf = fd_valloc_malloc( ctx->valloc, 4096, FD_ARCHIVE_PLAYBACK_BUFFER_SZ );
     163           0 :   if( FD_UNLIKELY( !ctx->istream_buf ) ) {
     164           0 :     FD_LOG_ERR(( "failed to allocate input buffer" ));
     165           0 :   }
     166             : 
     167             :   /* initialize the file reader */
     168           0 :   fd_io_buffered_istream_init( &ctx->istream, tile->archiver.archive_fd, ctx->istream_buf, FD_ARCHIVE_PLAYBACK_BUFFER_SZ );
     169             : 
     170             :   /* perform the initial read */
     171           0 :   if( FD_UNLIKELY(( !fd_io_buffered_istream_fetch( &ctx->istream ) )) ) {
     172           0 :     FD_LOG_WARNING(( "failed initial read" ));
     173           0 :   }
     174             : 
     175             :   /* Setup output links */
     176           0 :   for( ulong i=0; i<tile->out_cnt; i++ ) {
     177           0 :     fd_topo_link_t * link      = &topo->links[ tile->out_link_id[ i ] ];
     178           0 :     fd_topo_wksp_t * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ];
     179             : 
     180           0 :     ctx->out[ i ].mtu    = link->mtu;
     181           0 :     ctx->out[ i ].mem    = link_wksp->wksp;
     182           0 :     ctx->out[ i ].chunk0 = fd_dcache_compact_chunk0( link_wksp->wksp, link->dcache );
     183           0 :     ctx->out[ i ].wmark  = fd_dcache_compact_wmark( link_wksp->wksp, link->dcache, link->mtu );
     184           0 :     ctx->out[ i ].chunk  = ctx->out[ i ].chunk0;
     185           0 :   }
     186             : 
     187           0 :   ctx->playback_done                            = 0;
     188           0 :   ctx->playback_started                         = 0;
     189           0 :   ctx->now                                      = 0;
     190           0 :   ctx->prev_publish_time                        = 0;
     191             :   /* for now, we require a notification before playback another frag */
     192           0 :   FD_TEST( tile->in_cnt==1 );
     193           0 :   ctx->need_notify                              = 1;
     194           0 :   ctx->notified                                 = 1;
     195           0 :   ctx->playback_cnt[FD_ARCHIVER_TILE_ID_SHRED]  = 0;
     196           0 :   ctx->playback_cnt[FD_ARCHIVER_TILE_ID_REPAIR] = 0;
     197             : 
     198           0 :   ulong root_slot_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "root_slot" );
     199           0 :   FD_TEST( root_slot_obj_id!=ULONG_MAX );
     200           0 :   ctx->published_wmark = fd_fseq_join( fd_topo_obj_laddr( topo, root_slot_obj_id ) );
     201           0 :   if( FD_UNLIKELY( !ctx->published_wmark ) ) FD_LOG_ERR(( "replay tile has no root_slot fseq" ));
     202           0 :   FD_TEST( ULONG_MAX==fd_fseq_query( ctx->published_wmark ) );
     203             : 
     204           0 :   FD_LOG_WARNING(( "Playback tile finishes initialization" ));
     205           0 : }
     206             : 
     207             : static void
     208           0 : during_housekeeping( fd_archiver_playback_tile_ctx_t * ctx ) {
     209           0 :   ctx->now =(ulong)((double)(fd_tickcount()) / ctx->tick_per_ns);
     210           0 : }
     211             : 
     212             : static void
     213             : after_frag( fd_archiver_playback_tile_ctx_t * ctx,
     214             :             ulong                             in_idx,
     215             :             ulong                             seq    FD_PARAM_UNUSED,
     216             :             ulong                             sig    FD_PARAM_UNUSED,
     217             :             ulong                             sz     FD_PARAM_UNUSED,
     218             :             ulong                             tsorig FD_PARAM_UNUSED,
     219             :             ulong                             tspub  FD_PARAM_UNUSED,
     220           0 :             fd_stem_context_t *               stem   FD_PARAM_UNUSED ) {
     221           0 :   if( FD_UNLIKELY( in_idx!=0 ) ) FD_LOG_ERR(( "Playback seems corrupted." ));
     222           0 :   ctx->notified = 1;
     223           0 : }
     224             : 
     225             : static inline void
     226             : after_credit( fd_archiver_playback_tile_ctx_t *     ctx,
     227             :               fd_stem_context_t *                   stem,
     228             :               int *                                 opt_poll_in FD_PARAM_UNUSED,
     229           0 :               int *                                 charge_busy FD_PARAM_UNUSED ) {
     230           0 :   if( FD_UNLIKELY( ctx->playback_done ) ) {
     231           0 :     if( ctx->now>ctx->done_time+1000000000UL*5UL ) {
     232           0 :       FD_LOG_ERR(( "Playback is done with %lu shred frags and %lu repair frags.",
     233           0 :                    ctx->playback_cnt[FD_ARCHIVER_TILE_ID_SHRED],
     234           0 :                    ctx->playback_cnt[FD_ARCHIVER_TILE_ID_REPAIR] ));
     235           0 :     }
     236           0 :     return;
     237           0 :   }
     238             : 
     239           0 :   if( FD_UNLIKELY( !ctx->playback_started ) ) {
     240           0 :     ulong wmark = fd_fseq_query( ctx->published_wmark );
     241           0 :     if( wmark!=ULONG_MAX ) {
     242             :       /* Replay tile has updated root_slot (aka. published_wmark), meaning
     243             :        * (1) snapshot has been loaded; (2) blockstore has been initialized */
     244           0 :       ctx->playback_started = 1;
     245           0 :       FD_LOG_WARNING(( "playback starts with wmark=%lu", wmark ));
     246           0 :     } else {
     247           0 :       return;
     248           0 :     }
     249           0 :   }
     250             : 
     251             :   /* Peek the header without consuming anything, to see if we need to wait */
     252           0 :   char const * peek = fd_io_buffered_istream_peek( &ctx->istream );
     253           0 :   if( FD_UNLIKELY(( !peek )) ) {
     254           0 :     FD_LOG_ERR(( "failed to peek" ));
     255           0 :   }
     256             : 
     257             :   /* Consume the header */
     258           0 :   fd_archiver_frag_header_t * header = fd_type_pun( (char *)peek );
     259           0 :   if( FD_UNLIKELY( header->magic != FD_ARCHIVER_HEADER_MAGIC ) ) {
     260           0 :     FD_LOG_WARNING(( "bad magic in archive header: %lu", header->magic ));
     261           0 :     ctx->playback_done = 1;
     262           0 :     ctx->done_time     = ctx->now;
     263           0 :     return;
     264           0 :   }
     265             : 
     266             :   /* Determine if we should wait before publishing this
     267             :      need to delay if now > (when we should publish it)  */
     268           0 :   if( ctx->prev_publish_time != 0UL &&
     269           0 :     ( ctx->now < ( ctx->prev_publish_time + header->ns_since_prev_fragment ) )) {
     270           0 :     return;
     271           0 :   }
     272             : 
     273             :   /* Determine if playback receives the notification for
     274             :      the previous frag from storei tile. */
     275           0 :   if( FD_LIKELY( ctx->need_notify && !ctx->notified ) ) return;
     276             : 
     277             :   /* Consume the header from the stream */
     278           0 :   fd_archiver_frag_header_t header_tmp;
     279           0 :   if( FD_UNLIKELY( fd_io_buffered_istream_read( &ctx->istream, &header_tmp, FD_ARCHIVER_FRAG_HEADER_FOOTPRINT ) )) {
     280           0 :     FD_LOG_WARNING(( "failed to consume header" ));
     281           0 :     ctx->playback_done = 1;
     282           0 :     ctx->done_time     = ctx->now;
     283           0 :     return;
     284           0 :   }
     285             : 
     286             :   /* Determine the output link on which to send the frag */
     287           0 :   ulong out_link_idx = 0UL;
     288           0 :   switch ( header_tmp.tile_id ) {
     289           0 :     case FD_ARCHIVER_TILE_ID_SHRED:
     290           0 :     out_link_idx = NET_SHRED_OUT_IDX;
     291           0 :     ctx->playback_cnt[FD_ARCHIVER_TILE_ID_SHRED]++;
     292           0 :     break;
     293           0 :     case FD_ARCHIVER_TILE_ID_REPAIR:
     294           0 :     out_link_idx = NET_REPAIR_OUT_IDX;
     295           0 :     ctx->playback_cnt[FD_ARCHIVER_TILE_ID_REPAIR]++;
     296           0 :     break;
     297           0 :     default:
     298           0 :     FD_LOG_ERR(( "unsupported tile id" ));
     299           0 :   }
     300             : 
     301             :   /* Consume the fragment from the stream */
     302           0 :   uchar * dst = (uchar *)fd_chunk_to_laddr( ctx->out[ out_link_idx ].mem, ctx->out[ out_link_idx ].chunk );
     303           0 :   if( FD_UNLIKELY( fd_io_buffered_istream_read( &ctx->istream, dst, header_tmp.sz ) ) ) {
     304           0 :     FD_LOG_WARNING(( "failed to consume frag" ));
     305           0 :     ctx->playback_done = 1;
     306           0 :     ctx->done_time     = ctx->now;
     307           0 :     return;
     308           0 :   }
     309             : 
     310           0 :   if( FD_LIKELY( ctx->need_notify ) ) ctx->notified=0;
     311           0 :   if( FD_UNLIKELY(( ctx->out[ out_link_idx ].mtu<header_tmp.sz )) ) {
     312           0 :     FD_LOG_ERR(( "Try to playback frag with sz=%lu, exceeding mtu=%lu for link%lu",
     313           0 :                  header_tmp.sz, ctx->out[ out_link_idx ].mtu, out_link_idx ));
     314           0 :   }
     315           0 :   fd_stem_publish( stem, out_link_idx, header_tmp.sig, ctx->out[ out_link_idx ].chunk, header_tmp.sz, 0UL, 0UL, 0UL);
     316           0 :   ctx->out[ out_link_idx ].chunk = fd_dcache_compact_next( ctx->out[ out_link_idx ].chunk,
     317           0 :                                                            header_tmp.sz,
     318           0 :                                                            ctx->out[ out_link_idx ].chunk0,
     319           0 :                                                            ctx->out[ out_link_idx ].wmark );
     320           0 :   ctx->prev_publish_time = ctx->now;
     321           0 : }
     322             : 
     323           0 : #define STEM_BURST (1UL)
     324           0 : #define STEM_LAZY  (50UL)
     325             : 
     326           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_archiver_playback_tile_ctx_t
     327           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_archiver_playback_tile_ctx_t)
     328             : 
     329           0 : #define STEM_CALLBACK_AFTER_CREDIT        after_credit
     330           0 : #define STEM_CALLBACK_AFTER_FRAG          after_frag
     331           0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
     332             : 
     333             : #include "../stem/fd_stem.c"
     334             : 
     335             : fd_topo_run_tile_t fd_tile_archiver_playback = {
     336             :   .name                     = "arch_p",
     337             :   .loose_footprint          = loose_footprint,
     338             :   .populate_allowed_seccomp = populate_allowed_seccomp,
     339             :   .populate_allowed_fds     = populate_allowed_fds,
     340             :   .scratch_align            = scratch_align,
     341             :   .scratch_footprint        = scratch_footprint,
     342             :   .privileged_init          = privileged_init,
     343             :   .unprivileged_init        = unprivileged_init,
     344             :   .run                      = stem_run,
     345             : };

Generated by: LCOV version 1.14