LCOV - code coverage report
Current view: top level - disco/archiver - fd_archiver_playback.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 168 0.0 %
Date: 2025-08-05 05:04:49 Functions: 0 10 0.0 %

          Line data    Source code
       1             : #define _GNU_SOURCE  /* Enable GNU and POSIX extensions */
       2             : 
       3             : #include "../tiles.h"
       4             : 
       5             : #include "fd_archiver.h"
       6             : #include <errno.h>
       7             : #include <fcntl.h>
       8             : #include <string.h>
       9             : #include <sys/mman.h>
      10             : #include <sys/stat.h>
      11             : #include <unistd.h>
      12             : #include <linux/unistd.h>
      13             : #include <sys/socket.h>
      14             : #include <linux/if_xdp.h>
      15             : #include "generated/archiver_playback_seccomp.h"
      16             : #include "../../util/pod/fd_pod_format.h"
      17             : /* The archiver playback tile consumes from the archive file, adds artificial delay
      18             : to reproduce exactly the timing from the capture, and forwards these fragments to the
      19             : receiver tiles (shred/quic/gossip/repair).
      20             : 
      21             : There should be a single archiver playback tile, and it should replace the input links to the
      22             : receiver tiles.
      23             : */
      24             : 
      25           0 : #define NET_SHRED_OUT_IDX  (0UL)
      26           0 : #define NET_REPAIR_OUT_IDX (1UL)
      27             : 
      28             : #define FD_ARCHIVER_STARTUP_DELAY_SECONDS (1)
      29           0 : #define FD_ARCHIVE_PLAYBACK_BUFFER_SZ      (FD_SHMEM_GIGANTIC_PAGE_SZ)
      30             : 
      31             : struct fd_archiver_playback_stats {
      32             :   ulong net_shred_out_cnt;
      33             :   ulong net_quic_out_cnt;
      34             :   ulong net_gossip_out_cnt;
      35             :   ulong net_repair_out_cnt;
      36             : 
      37             : };
      38             : typedef struct fd_archiver_playback_stats fd_archiver_playback_stats_t;
      39             : 
      40             : typedef struct {
      41             :   fd_wksp_t * mem;
      42             :   ulong       mtu;
      43             :   ulong       chunk0;
      44             :   ulong       wmark;
      45             :   ulong       chunk;
      46             : } fd_archiver_playback_out_ctx_t;
      47             : 
      48             : struct fd_archiver_playback_tile_ctx {
      49             :   fd_io_buffered_istream_t istream;
      50             :   uchar *                  istream_buf;
      51             : 
      52             :   fd_archiver_playback_stats_t stats;
      53             : 
      54             :   double tick_per_ns;
      55             : 
      56             :   ulong prev_publish_time;
      57             :   ulong now;
      58             :   ulong need_notify;
      59             :   ulong notified;
      60             : 
      61             :   fd_archiver_playback_out_ctx_t out[ 32 ];
      62             : 
      63             :   ulong playback_done;
      64             :   ulong done_time;
      65             :   ulong playback_started;
      66             :   ulong playback_cnt[FD_ARCHIVER_TILE_CNT];
      67             : 
      68             :   ulong * published_wmark; /* same as the one in replay tile */
      69             : };
      70             : typedef struct fd_archiver_playback_tile_ctx fd_archiver_playback_tile_ctx_t;
      71             : 
      72             : FD_FN_CONST static inline ulong
      73           0 : scratch_align( void ) {
      74           0 :   return 4096UL;
      75           0 : }
      76             : 
      77             : FD_FN_PURE static inline ulong
      78           0 : loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
      79           0 :   return 1UL * FD_SHMEM_GIGANTIC_PAGE_SZ;
      80           0 : }
      81             : 
      82             : static ulong
      83             : populate_allowed_seccomp( fd_topo_t const *      topo,
      84             :                           fd_topo_tile_t const * tile,
      85             :                           ulong                  out_cnt,
      86           0 :                           struct sock_filter *   out ) {
      87           0 :   (void)topo;
      88             : 
      89           0 :   populate_sock_filter_policy_archiver_playback( out_cnt,
      90           0 :                                                  out,
      91           0 :                                                  (uint)fd_log_private_logfile_fd(),
      92           0 :                                                  (uint)tile->archiver.archive_fd );
      93           0 :   return sock_filter_policy_archiver_playback_instr_cnt;
      94           0 : }
      95             : 
      96             : static ulong
      97             : populate_allowed_fds( fd_topo_t const *      topo        FD_PARAM_UNUSED,
      98             :                       fd_topo_tile_t const * tile,
      99             :                       ulong                  out_fds_cnt FD_PARAM_UNUSED,
     100           0 :                       int *                  out_fds ) {
     101           0 :   ulong out_cnt = 0UL;
     102             : 
     103           0 :   out_fds[ out_cnt++ ] = 2; /* stderr */
     104           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
     105           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
     106           0 :   if( FD_LIKELY( -1!=tile->archiver.archive_fd ) )
     107           0 :     out_fds[ out_cnt++ ] = tile->archiver.archive_fd; /* archive file */
     108             : 
     109           0 :   return out_cnt;
     110           0 : }
     111             : 
     112             : FD_FN_PURE static inline ulong
     113           0 : scratch_footprint( fd_topo_tile_t const * tile ) {
     114           0 :   (void)tile;
     115           0 :   ulong l = FD_LAYOUT_INIT;
     116           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_archiver_playback_tile_ctx_t), sizeof(fd_archiver_playback_tile_ctx_t) );
     117           0 :   l = FD_LAYOUT_APPEND( l, 4096, FD_ARCHIVE_PLAYBACK_BUFFER_SZ );
     118           0 :   return FD_LAYOUT_FINI( l, scratch_align() );
     119           0 : }
     120             : 
     121             : static void
     122             : privileged_init( fd_topo_t *      topo,
     123           0 :                  fd_topo_tile_t * tile ) {
     124           0 :     void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     125             : 
     126           0 :     FD_SCRATCH_ALLOC_INIT( l, scratch );
     127           0 :     fd_archiver_playback_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_archiver_playback_tile_ctx_t), sizeof(fd_archiver_playback_tile_ctx_t) );
     128           0 :     memset( ctx, 0, sizeof(fd_archiver_playback_tile_ctx_t) );
     129           0 :     ctx->istream_buf = FD_SCRATCH_ALLOC_APPEND( l, 4096, FD_ARCHIVE_PLAYBACK_BUFFER_SZ );
     130           0 :     FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
     131             : 
     132           0 :     tile->archiver.archive_fd = open( tile->archiver.rocksdb_path, O_RDONLY | O_DIRECT, 0666 );
     133           0 :     if ( FD_UNLIKELY( tile->archiver.archive_fd == -1 ) ) {
     134           0 :       FD_LOG_ERR(( "failed to open archive file %s %d %d %s", tile->archiver.rocksdb_path, tile->archiver.archive_fd, errno, strerror(errno) ));
     135           0 :     }
     136           0 : }
     137             : 
     138             : static void
     139             : unprivileged_init( fd_topo_t *      topo,
     140           0 :                    fd_topo_tile_t * tile ) {
     141             : 
     142           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     143           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     144           0 :   fd_archiver_playback_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_archiver_playback_tile_ctx_t), sizeof(fd_archiver_playback_tile_ctx_t) );
     145           0 :   ctx->istream_buf = FD_SCRATCH_ALLOC_APPEND( l, 4096, FD_ARCHIVE_PLAYBACK_BUFFER_SZ );
     146           0 :   FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
     147             : 
     148           0 :   ctx->tick_per_ns = fd_tempo_tick_per_ns( NULL );
     149             : 
     150             :   /* initialize the file reader */
     151           0 :   fd_io_buffered_istream_init( &ctx->istream, tile->archiver.archive_fd, ctx->istream_buf, FD_ARCHIVE_PLAYBACK_BUFFER_SZ );
     152             : 
     153             :   /* perform the initial read */
     154           0 :   if( FD_UNLIKELY(( !fd_io_buffered_istream_fetch( &ctx->istream ) )) ) {
     155           0 :     FD_LOG_WARNING(( "failed initial read" ));
     156           0 :   }
     157             : 
     158             :   /* Setup output links */
     159           0 :   for( ulong i=0; i<tile->out_cnt; i++ ) {
     160           0 :     fd_topo_link_t * link      = &topo->links[ tile->out_link_id[ i ] ];
     161           0 :     fd_topo_wksp_t * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ];
     162             : 
     163           0 :     ctx->out[ i ].mtu    = link->mtu;
     164           0 :     ctx->out[ i ].mem    = link_wksp->wksp;
     165           0 :     ctx->out[ i ].chunk0 = fd_dcache_compact_chunk0( link_wksp->wksp, link->dcache );
     166           0 :     ctx->out[ i ].wmark  = fd_dcache_compact_wmark( link_wksp->wksp, link->dcache, link->mtu );
     167           0 :     ctx->out[ i ].chunk  = ctx->out[ i ].chunk0;
     168           0 :   }
     169             : 
     170           0 :   ctx->playback_done                            = 0;
     171           0 :   ctx->playback_started                         = 0;
     172           0 :   ctx->now                                      = 0;
     173           0 :   ctx->prev_publish_time                        = 0;
     174             :   /* for now, we require a notification before playback another frag */
     175           0 :   FD_TEST( tile->in_cnt==1 );
     176           0 :   ctx->need_notify                              = 1;
     177           0 :   ctx->notified                                 = 1;
     178           0 :   ctx->playback_cnt[FD_ARCHIVER_TILE_ID_SHRED]  = 0;
     179           0 :   ctx->playback_cnt[FD_ARCHIVER_TILE_ID_REPAIR] = 0;
     180             : 
     181           0 :   ulong root_slot_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "root_slot" );
     182           0 :   FD_TEST( root_slot_obj_id!=ULONG_MAX );
     183           0 :   ctx->published_wmark = fd_fseq_join( fd_topo_obj_laddr( topo, root_slot_obj_id ) );
     184           0 :   if( FD_UNLIKELY( !ctx->published_wmark ) ) FD_LOG_ERR(( "replay tile has no root_slot fseq" ));
     185           0 :   FD_TEST( ULONG_MAX==fd_fseq_query( ctx->published_wmark ) );
     186             : 
     187           0 :   FD_LOG_WARNING(( "Playback tile finishes initialization" ));
     188           0 : }
     189             : 
     190             : static void
     191           0 : during_housekeeping( fd_archiver_playback_tile_ctx_t * ctx ) {
     192           0 :   ctx->now =(ulong)((double)(fd_tickcount()) / ctx->tick_per_ns);
     193           0 : }
     194             : 
     195             : static void
     196             : after_frag( fd_archiver_playback_tile_ctx_t * ctx,
     197             :             ulong                             in_idx,
     198             :             ulong                             seq    FD_PARAM_UNUSED,
     199             :             ulong                             sig    FD_PARAM_UNUSED,
     200             :             ulong                             sz     FD_PARAM_UNUSED,
     201             :             ulong                             tsorig FD_PARAM_UNUSED,
     202             :             ulong                             tspub  FD_PARAM_UNUSED,
     203           0 :             fd_stem_context_t *               stem   FD_PARAM_UNUSED ) {
     204           0 :   if( FD_UNLIKELY( in_idx!=0 ) ) FD_LOG_ERR(( "Playback seems corrupted." ));
     205           0 :   ctx->notified = 1;
     206           0 : }
     207             : 
     208             : static inline void
     209             : after_credit( fd_archiver_playback_tile_ctx_t *     ctx,
     210             :               fd_stem_context_t *                   stem,
     211             :               int *                                 opt_poll_in FD_PARAM_UNUSED,
     212           0 :               int *                                 charge_busy FD_PARAM_UNUSED ) {
     213           0 :   if( FD_UNLIKELY( ctx->playback_done ) ) {
     214           0 :     if( ctx->now>ctx->done_time+1000000000UL*5UL ) {
     215           0 :       FD_LOG_ERR(( "Playback is done with %lu shred frags and %lu repair frags.",
     216           0 :                    ctx->playback_cnt[FD_ARCHIVER_TILE_ID_SHRED],
     217           0 :                    ctx->playback_cnt[FD_ARCHIVER_TILE_ID_REPAIR] ));
     218           0 :     }
     219           0 :     return;
     220           0 :   }
     221             : 
     222           0 :   if( FD_UNLIKELY( !ctx->playback_started ) ) {
     223           0 :     ulong wmark = fd_fseq_query( ctx->published_wmark );
     224           0 :     if( wmark!=ULONG_MAX ) {
     225             :       /* Replay tile has updated root_slot (aka. published_wmark), meaning
     226             :        * (1) snapshot has been loaded; (2) blockstore has been initialized */
     227           0 :       ctx->playback_started = 1;
     228           0 :       FD_LOG_WARNING(( "playback starts with wmark=%lu", wmark ));
     229           0 :     } else {
     230           0 :       return;
     231           0 :     }
     232           0 :   }
     233             : 
     234             :   /* Peek the header without consuming anything, to see if we need to wait */
     235           0 :   char const * peek = fd_io_buffered_istream_peek( &ctx->istream );
     236           0 :   if( FD_UNLIKELY(( !peek )) ) {
     237           0 :     FD_LOG_ERR(( "failed to peek" ));
     238           0 :   }
     239             : 
     240             :   /* Consume the header */
     241           0 :   fd_archiver_frag_header_t * header = fd_type_pun( (char *)peek );
     242           0 :   if( FD_UNLIKELY( header->magic != FD_ARCHIVER_HEADER_MAGIC ) ) {
     243           0 :     FD_LOG_WARNING(( "bad magic in archive header: %lu", header->magic ));
     244           0 :     ctx->playback_done = 1;
     245           0 :     ctx->done_time     = ctx->now;
     246           0 :     return;
     247           0 :   }
     248             : 
     249             :   /* Determine if we should wait before publishing this
     250             :      need to delay if now > (when we should publish it)  */
     251           0 :   if( ctx->prev_publish_time != 0UL &&
     252           0 :     ( ctx->now < ( ctx->prev_publish_time + header->ns_since_prev_fragment ) )) {
     253           0 :     return;
     254           0 :   }
     255             : 
     256             :   /* Determine if playback receives the notification for
     257             :      the previous frag from storei tile. */
     258           0 :   if( FD_LIKELY( ctx->need_notify && !ctx->notified ) ) return;
     259             : 
     260             :   /* Consume the header from the stream */
     261           0 :   fd_archiver_frag_header_t header_tmp;
     262           0 :   if( FD_UNLIKELY( fd_io_buffered_istream_read( &ctx->istream, &header_tmp, FD_ARCHIVER_FRAG_HEADER_FOOTPRINT ) )) {
     263           0 :     FD_LOG_WARNING(( "failed to consume header" ));
     264           0 :     ctx->playback_done = 1;
     265           0 :     ctx->done_time     = ctx->now;
     266           0 :     return;
     267           0 :   }
     268             : 
     269             :   /* Determine the output link on which to send the frag */
     270           0 :   ulong out_link_idx = 0UL;
     271           0 :   switch ( header_tmp.tile_id ) {
     272           0 :     case FD_ARCHIVER_TILE_ID_SHRED:
     273           0 :     out_link_idx = NET_SHRED_OUT_IDX;
     274           0 :     ctx->playback_cnt[FD_ARCHIVER_TILE_ID_SHRED]++;
     275           0 :     break;
     276           0 :     case FD_ARCHIVER_TILE_ID_REPAIR:
     277           0 :     out_link_idx = NET_REPAIR_OUT_IDX;
     278           0 :     ctx->playback_cnt[FD_ARCHIVER_TILE_ID_REPAIR]++;
     279           0 :     break;
     280           0 :     default:
     281           0 :     FD_LOG_ERR(( "unsupported tile id" ));
     282           0 :   }
     283             : 
     284             :   /* Consume the fragment from the stream */
     285           0 :   uchar * dst = (uchar *)fd_chunk_to_laddr( ctx->out[ out_link_idx ].mem, ctx->out[ out_link_idx ].chunk );
     286           0 :   if( FD_UNLIKELY( fd_io_buffered_istream_read( &ctx->istream, dst, header_tmp.sz ) ) ) {
     287           0 :     FD_LOG_WARNING(( "failed to consume frag" ));
     288           0 :     ctx->playback_done = 1;
     289           0 :     ctx->done_time     = ctx->now;
     290           0 :     return;
     291           0 :   }
     292             : 
     293           0 :   if( FD_LIKELY( ctx->need_notify ) ) ctx->notified=0;
     294           0 :   if( FD_UNLIKELY(( ctx->out[ out_link_idx ].mtu<header_tmp.sz )) ) {
     295           0 :     FD_LOG_ERR(( "Try to playback frag with sz=%lu, exceeding mtu=%lu for link%lu",
     296           0 :                  header_tmp.sz, ctx->out[ out_link_idx ].mtu, out_link_idx ));
     297           0 :   }
     298           0 :   fd_stem_publish( stem, out_link_idx, header_tmp.sig, ctx->out[ out_link_idx ].chunk, header_tmp.sz, 0UL, 0UL, 0UL);
     299           0 :   ctx->out[ out_link_idx ].chunk = fd_dcache_compact_next( ctx->out[ out_link_idx ].chunk,
     300           0 :                                                            header_tmp.sz,
     301           0 :                                                            ctx->out[ out_link_idx ].chunk0,
     302           0 :                                                            ctx->out[ out_link_idx ].wmark );
     303           0 :   ctx->prev_publish_time = ctx->now;
     304           0 : }
     305             : 
     306           0 : #define STEM_BURST (1UL)
     307           0 : #define STEM_LAZY  (50UL)
     308             : 
     309           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_archiver_playback_tile_ctx_t
     310           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_archiver_playback_tile_ctx_t)
     311             : 
     312           0 : #define STEM_CALLBACK_AFTER_CREDIT        after_credit
     313           0 : #define STEM_CALLBACK_AFTER_FRAG          after_frag
     314           0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
     315             : 
     316             : #include "../stem/fd_stem.c"
     317             : 
     318             : fd_topo_run_tile_t fd_tile_archiver_playback = {
     319             :   .name                     = "arch_p",
     320             :   .loose_footprint          = loose_footprint,
     321             :   .populate_allowed_seccomp = populate_allowed_seccomp,
     322             :   .populate_allowed_fds     = populate_allowed_fds,
     323             :   .scratch_align            = scratch_align,
     324             :   .scratch_footprint        = scratch_footprint,
     325             :   .privileged_init          = privileged_init,
     326             :   .unprivileged_init        = unprivileged_init,
     327             :   .run                      = stem_run,
     328             : };

Generated by: LCOV version 1.14