LCOV - code coverage report
Current view: top level - disco/archiver - fd_archiver_playback.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 168 0.0 %
Date: 2025-12-06 04:45:29 Functions: 0 10 0.0 %

          Line data    Source code
       1             : #define _GNU_SOURCE  /* Enable GNU and POSIX extensions */
       2             : 
       3             : #include "../tiles.h"
       4             : 
       5             : #include "fd_archiver.h"
       6             : #include <errno.h>
       7             : #include <fcntl.h>
       8             : #include <string.h>
       9             : #include <sys/mman.h>
      10             : #include <sys/stat.h>
      11             : #include <unistd.h>
      12             : #include <sys/socket.h>
      13             : #include "generated/archiver_playback_seccomp.h"
      14             : #include "../../util/pod/fd_pod_format.h"
      15             : /* The archiver playback tile consumes from the archive file, adds artificial delay
      16             : to reproduce exactly the timing from the capture, and forwards these fragments to the
      17             : receiver tiles (shred/quic/gossip/repair).
      18             : 
      19             : There should be a single archiver playback tile, and it should replace the input links to the
      20             : receiver tiles.
      21             : */
      22             : 
      23           0 : #define NET_SHRED_OUT_IDX  (0UL)
      24           0 : #define NET_REPAIR_OUT_IDX (1UL)
      25             : 
      26             : #define FD_ARCHIVER_STARTUP_DELAY_SECONDS (1)
      27           0 : #define FD_ARCHIVE_PLAYBACK_BUFFER_SZ      (FD_SHMEM_GIGANTIC_PAGE_SZ)
      28             : 
      29             : struct fd_archiver_playback_stats {
      30             :   ulong net_shred_out_cnt;
      31             :   ulong net_quic_out_cnt;
      32             :   ulong net_gossip_out_cnt;
      33             :   ulong net_repair_out_cnt;
      34             : 
      35             : };
      36             : typedef struct fd_archiver_playback_stats fd_archiver_playback_stats_t;
      37             : 
      38             : typedef struct {
      39             :   fd_wksp_t * mem;
      40             :   ulong       mtu;
      41             :   ulong       chunk0;
      42             :   ulong       wmark;
      43             :   ulong       chunk;
      44             : } fd_archiver_playback_out_ctx_t;
      45             : 
      46             : struct fd_archiver_playback_tile_ctx {
      47             :   fd_io_buffered_istream_t istream;
      48             :   uchar *                  istream_buf;
      49             : 
      50             :   fd_archiver_playback_stats_t stats;
      51             : 
      52             :   double tick_per_ns;
      53             : 
      54             :   ulong prev_publish_time;
      55             :   ulong now;
      56             :   ulong need_notify;
      57             :   ulong notified;
      58             : 
      59             :   fd_archiver_playback_out_ctx_t out[ 32 ];
      60             : 
      61             :   ulong playback_done;
      62             :   ulong done_time;
      63             :   ulong playback_started;
      64             :   ulong playback_cnt[FD_ARCHIVER_TILE_CNT];
      65             : 
      66             :   ulong * published_wmark; /* same as the one in replay tile */
      67             : };
      68             : typedef struct fd_archiver_playback_tile_ctx fd_archiver_playback_tile_ctx_t;
      69             : 
      70             : FD_FN_CONST static inline ulong
      71           0 : scratch_align( void ) {
      72           0 :   return 4096UL;
      73           0 : }
      74             : 
      75             : FD_FN_PURE static inline ulong
      76           0 : loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
      77           0 :   return 1UL * FD_SHMEM_GIGANTIC_PAGE_SZ;
      78           0 : }
      79             : 
      80             : static ulong
      81             : populate_allowed_seccomp( fd_topo_t const *      topo,
      82             :                           fd_topo_tile_t const * tile,
      83             :                           ulong                  out_cnt,
      84           0 :                           struct sock_filter *   out ) {
      85           0 :   (void)topo;
      86             : 
      87           0 :   populate_sock_filter_policy_archiver_playback( out_cnt,
      88           0 :                                                  out,
      89           0 :                                                  (uint)fd_log_private_logfile_fd(),
      90           0 :                                                  (uint)tile->archiver.archive_fd );
      91           0 :   return sock_filter_policy_archiver_playback_instr_cnt;
      92           0 : }
      93             : 
      94             : static ulong
      95             : populate_allowed_fds( fd_topo_t const *      topo        FD_PARAM_UNUSED,
      96             :                       fd_topo_tile_t const * tile,
      97             :                       ulong                  out_fds_cnt FD_PARAM_UNUSED,
      98           0 :                       int *                  out_fds ) {
      99           0 :   ulong out_cnt = 0UL;
     100             : 
     101           0 :   out_fds[ out_cnt++ ] = 2; /* stderr */
     102           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
     103           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
     104           0 :   if( FD_LIKELY( -1!=tile->archiver.archive_fd ) )
     105           0 :     out_fds[ out_cnt++ ] = tile->archiver.archive_fd; /* archive file */
     106             : 
     107           0 :   return out_cnt;
     108           0 : }
     109             : 
     110             : FD_FN_PURE static inline ulong
     111           0 : scratch_footprint( fd_topo_tile_t const * tile ) {
     112           0 :   (void)tile;
     113           0 :   ulong l = FD_LAYOUT_INIT;
     114           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_archiver_playback_tile_ctx_t), sizeof(fd_archiver_playback_tile_ctx_t) );
     115           0 :   l = FD_LAYOUT_APPEND( l, 4096, FD_ARCHIVE_PLAYBACK_BUFFER_SZ );
     116           0 :   return FD_LAYOUT_FINI( l, scratch_align() );
     117           0 : }
     118             : 
     119             : static void
     120             : privileged_init( fd_topo_t *      topo,
     121           0 :                  fd_topo_tile_t * tile ) {
     122           0 :     void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     123             : 
     124           0 :     FD_SCRATCH_ALLOC_INIT( l, scratch );
     125           0 :     fd_archiver_playback_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_archiver_playback_tile_ctx_t), sizeof(fd_archiver_playback_tile_ctx_t) );
     126           0 :     memset( ctx, 0, sizeof(fd_archiver_playback_tile_ctx_t) );
     127           0 :     ctx->istream_buf = FD_SCRATCH_ALLOC_APPEND( l, 4096, FD_ARCHIVE_PLAYBACK_BUFFER_SZ );
     128           0 :     FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
     129             : 
     130           0 :     tile->archiver.archive_fd = open( tile->archiver.rocksdb_path, O_RDONLY | O_DIRECT, 0666 );
     131           0 :     if ( FD_UNLIKELY( tile->archiver.archive_fd == -1 ) ) {
     132           0 :       FD_LOG_ERR(( "failed to open archive file %s %d %d %s", tile->archiver.rocksdb_path, tile->archiver.archive_fd, errno, strerror(errno) ));
     133           0 :     }
     134           0 : }
     135             : 
     136             : static void
     137             : unprivileged_init( fd_topo_t *      topo,
     138           0 :                    fd_topo_tile_t * tile ) {
     139             : 
     140           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     141           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     142           0 :   fd_archiver_playback_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_archiver_playback_tile_ctx_t), sizeof(fd_archiver_playback_tile_ctx_t) );
     143           0 :   ctx->istream_buf = FD_SCRATCH_ALLOC_APPEND( l, 4096, FD_ARCHIVE_PLAYBACK_BUFFER_SZ );
     144           0 :   FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
     145             : 
     146           0 :   ctx->tick_per_ns = fd_tempo_tick_per_ns( NULL );
     147             : 
     148             :   /* initialize the file reader */
     149           0 :   fd_io_buffered_istream_init( &ctx->istream, tile->archiver.archive_fd, ctx->istream_buf, FD_ARCHIVE_PLAYBACK_BUFFER_SZ );
     150             : 
     151             :   /* perform the initial read */
     152           0 :   if( FD_UNLIKELY(( !fd_io_buffered_istream_fetch( &ctx->istream ) )) ) {
     153           0 :     FD_LOG_WARNING(( "failed initial read" ));
     154           0 :   }
     155             : 
     156             :   /* Setup output links */
     157           0 :   for( ulong i=0; i<tile->out_cnt; i++ ) {
     158           0 :     fd_topo_link_t * link      = &topo->links[ tile->out_link_id[ i ] ];
     159           0 :     fd_topo_wksp_t * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ];
     160             : 
     161           0 :     ctx->out[ i ].mtu    = link->mtu;
     162           0 :     ctx->out[ i ].mem    = link_wksp->wksp;
     163           0 :     ctx->out[ i ].chunk0 = fd_dcache_compact_chunk0( link_wksp->wksp, link->dcache );
     164           0 :     ctx->out[ i ].wmark  = fd_dcache_compact_wmark( link_wksp->wksp, link->dcache, link->mtu );
     165           0 :     ctx->out[ i ].chunk  = ctx->out[ i ].chunk0;
     166           0 :   }
     167             : 
     168           0 :   ctx->playback_done                            = 0;
     169           0 :   ctx->playback_started                         = 0;
     170           0 :   ctx->now                                      = 0;
     171           0 :   ctx->prev_publish_time                        = 0;
     172             :   /* for now, we require a notification before playback another frag */
     173           0 :   FD_TEST( tile->in_cnt==1 );
     174           0 :   ctx->need_notify                              = 1;
     175           0 :   ctx->notified                                 = 1;
     176           0 :   ctx->playback_cnt[FD_ARCHIVER_TILE_ID_SHRED]  = 0;
     177           0 :   ctx->playback_cnt[FD_ARCHIVER_TILE_ID_REPAIR] = 0;
     178             : 
     179           0 :   ulong root_slot_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "root_slot" );
     180           0 :   FD_TEST( root_slot_obj_id!=ULONG_MAX );
     181           0 :   ctx->published_wmark = fd_fseq_join( fd_topo_obj_laddr( topo, root_slot_obj_id ) );
     182           0 :   if( FD_UNLIKELY( !ctx->published_wmark ) ) FD_LOG_ERR(( "replay tile has no root_slot fseq" ));
     183           0 :   FD_TEST( ULONG_MAX==fd_fseq_query( ctx->published_wmark ) );
     184             : 
     185           0 :   FD_LOG_WARNING(( "Playback tile finishes initialization" ));
     186           0 : }
     187             : 
     188             : static void
     189           0 : during_housekeeping( fd_archiver_playback_tile_ctx_t * ctx ) {
     190           0 :   ctx->now =(ulong)((double)(fd_tickcount()) / ctx->tick_per_ns);
     191           0 : }
     192             : 
     193             : static void
     194             : after_frag( fd_archiver_playback_tile_ctx_t * ctx,
     195             :             ulong                             in_idx,
     196             :             ulong                             seq    FD_PARAM_UNUSED,
     197             :             ulong                             sig    FD_PARAM_UNUSED,
     198             :             ulong                             sz     FD_PARAM_UNUSED,
     199             :             ulong                             tsorig FD_PARAM_UNUSED,
     200             :             ulong                             tspub  FD_PARAM_UNUSED,
     201           0 :             fd_stem_context_t *               stem   FD_PARAM_UNUSED ) {
     202           0 :   if( FD_UNLIKELY( in_idx!=0 ) ) FD_LOG_ERR(( "Playback seems corrupted." ));
     203           0 :   ctx->notified = 1;
     204           0 : }
     205             : 
     206             : static inline void
     207             : after_credit( fd_archiver_playback_tile_ctx_t *     ctx,
     208             :               fd_stem_context_t *                   stem,
     209             :               int *                                 opt_poll_in FD_PARAM_UNUSED,
     210           0 :               int *                                 charge_busy FD_PARAM_UNUSED ) {
     211           0 :   if( FD_UNLIKELY( ctx->playback_done ) ) {
     212           0 :     if( ctx->now>ctx->done_time+1000000000UL*5UL ) {
     213           0 :       FD_LOG_ERR(( "Playback is done with %lu shred frags and %lu repair frags.",
     214           0 :                    ctx->playback_cnt[FD_ARCHIVER_TILE_ID_SHRED],
     215           0 :                    ctx->playback_cnt[FD_ARCHIVER_TILE_ID_REPAIR] ));
     216           0 :     }
     217           0 :     return;
     218           0 :   }
     219             : 
     220           0 :   if( FD_UNLIKELY( !ctx->playback_started ) ) {
     221           0 :     ulong wmark = fd_fseq_query( ctx->published_wmark );
     222           0 :     if( wmark!=ULONG_MAX ) {
     223             :       /* Replay tile has updated root_slot (aka. published_wmark), meaning
     224             :        * (1) snapshot has been loaded; (2) blockstore has been initialized */
     225           0 :       ctx->playback_started = 1;
     226           0 :       FD_LOG_WARNING(( "playback starts with wmark=%lu", wmark ));
     227           0 :     } else {
     228           0 :       return;
     229           0 :     }
     230           0 :   }
     231             : 
     232             :   /* Peek the header without consuming anything, to see if we need to wait */
     233           0 :   char const * peek = fd_io_buffered_istream_peek( &ctx->istream );
     234           0 :   if( FD_UNLIKELY(( !peek )) ) {
     235           0 :     FD_LOG_ERR(( "failed to peek" ));
     236           0 :   }
     237             : 
     238             :   /* Consume the header */
     239           0 :   fd_archiver_frag_header_t * header = fd_type_pun( (char *)peek );
     240           0 :   if( FD_UNLIKELY( header->magic != FD_ARCHIVER_HEADER_MAGIC ) ) {
     241           0 :     FD_LOG_WARNING(( "bad magic in archive header: %lu", header->magic ));
     242           0 :     ctx->playback_done = 1;
     243           0 :     ctx->done_time     = ctx->now;
     244           0 :     return;
     245           0 :   }
     246             : 
     247             :   /* Determine if we should wait before publishing this
     248             :      need to delay if now > (when we should publish it)  */
     249           0 :   if( ctx->prev_publish_time != 0UL &&
     250           0 :     ( ctx->now < ( ctx->prev_publish_time + header->ns_since_prev_fragment ) )) {
     251           0 :     return;
     252           0 :   }
     253             : 
     254             :   /* Determine if playback receives the notification for
     255             :      the previous frag from storei tile. */
     256           0 :   if( FD_LIKELY( ctx->need_notify && !ctx->notified ) ) return;
     257             : 
     258             :   /* Consume the header from the stream */
     259           0 :   fd_archiver_frag_header_t header_tmp;
     260           0 :   if( FD_UNLIKELY( fd_io_buffered_istream_read( &ctx->istream, &header_tmp, FD_ARCHIVER_FRAG_HEADER_FOOTPRINT ) )) {
     261           0 :     FD_LOG_WARNING(( "failed to consume header" ));
     262           0 :     ctx->playback_done = 1;
     263           0 :     ctx->done_time     = ctx->now;
     264           0 :     return;
     265           0 :   }
     266             : 
     267             :   /* Determine the output link on which to send the frag */
     268           0 :   ulong out_link_idx = 0UL;
     269           0 :   switch ( header_tmp.tile_id ) {
     270           0 :     case FD_ARCHIVER_TILE_ID_SHRED:
     271           0 :     out_link_idx = NET_SHRED_OUT_IDX;
     272           0 :     ctx->playback_cnt[FD_ARCHIVER_TILE_ID_SHRED]++;
     273           0 :     break;
     274           0 :     case FD_ARCHIVER_TILE_ID_REPAIR:
     275           0 :     out_link_idx = NET_REPAIR_OUT_IDX;
     276           0 :     ctx->playback_cnt[FD_ARCHIVER_TILE_ID_REPAIR]++;
     277           0 :     break;
     278           0 :     default:
     279           0 :     FD_LOG_ERR(( "unsupported tile id" ));
     280           0 :   }
     281             : 
     282             :   /* Consume the fragment from the stream */
     283           0 :   uchar * dst = (uchar *)fd_chunk_to_laddr( ctx->out[ out_link_idx ].mem, ctx->out[ out_link_idx ].chunk );
     284           0 :   if( FD_UNLIKELY( fd_io_buffered_istream_read( &ctx->istream, dst, header_tmp.sz ) ) ) {
     285           0 :     FD_LOG_WARNING(( "failed to consume frag" ));
     286           0 :     ctx->playback_done = 1;
     287           0 :     ctx->done_time     = ctx->now;
     288           0 :     return;
     289           0 :   }
     290             : 
     291           0 :   if( FD_LIKELY( ctx->need_notify ) ) ctx->notified=0;
     292           0 :   if( FD_UNLIKELY(( ctx->out[ out_link_idx ].mtu<header_tmp.sz )) ) {
     293           0 :     FD_LOG_ERR(( "Try to playback frag with sz=%lu, exceeding mtu=%lu for link%lu",
     294           0 :                  header_tmp.sz, ctx->out[ out_link_idx ].mtu, out_link_idx ));
     295           0 :   }
     296           0 :   fd_stem_publish( stem, out_link_idx, header_tmp.sig, ctx->out[ out_link_idx ].chunk, header_tmp.sz, 0UL, 0UL, 0UL);
     297           0 :   ctx->out[ out_link_idx ].chunk = fd_dcache_compact_next( ctx->out[ out_link_idx ].chunk,
     298           0 :                                                            header_tmp.sz,
     299           0 :                                                            ctx->out[ out_link_idx ].chunk0,
     300           0 :                                                            ctx->out[ out_link_idx ].wmark );
     301           0 :   ctx->prev_publish_time = ctx->now;
     302           0 : }
     303             : 
     304           0 : #define STEM_BURST (1UL)
     305           0 : #define STEM_LAZY  (50UL)
     306             : 
     307           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_archiver_playback_tile_ctx_t
     308           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_archiver_playback_tile_ctx_t)
     309             : 
     310           0 : #define STEM_CALLBACK_AFTER_CREDIT        after_credit
     311           0 : #define STEM_CALLBACK_AFTER_FRAG          after_frag
     312           0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
     313             : 
     314             : #include "../stem/fd_stem.c"
     315             : 
     316             : fd_topo_run_tile_t fd_tile_archiver_playback = {
     317             :   .name                     = "arch_p",
     318             :   .loose_footprint          = loose_footprint,
     319             :   .populate_allowed_seccomp = populate_allowed_seccomp,
     320             :   .populate_allowed_fds     = populate_allowed_fds,
     321             :   .scratch_align            = scratch_align,
     322             :   .scratch_footprint        = scratch_footprint,
     323             :   .privileged_init          = privileged_init,
     324             :   .unprivileged_init        = unprivileged_init,
     325             :   .run                      = stem_run,
     326             : };

Generated by: LCOV version 1.14