LCOV - code coverage report
Current view: top level - discof/restore - fd_snapdc_tile.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 192 0.0 %
Date: 2025-10-13 04:42:14 Functions: 0 11 0.0 %

          Line data    Source code
       1             : #include "utils/fd_ssctrl.h"
       2             : 
       3             : #include "../../disco/topo/fd_topo.h"
       4             : #include "../../disco/metrics/fd_metrics.h"
       5             : 
       6             : #include "generated/fd_snapdc_tile_seccomp.h"
       7             : 
       8             : #define ZSTD_STATIC_LINKING_ONLY
       9             : #include <zstd.h>
      10             : 
      11             : #define NAME "snapdc"
      12             : 
      13           0 : #define ZSTD_WINDOW_SZ (1UL<<25UL) /* 32MiB */
      14             : 
      15             : /* The snapdc tile is a state machine that decompresses the full and
      16             :    optionally incremental snapshot byte stream that it receives from the
      17             :    snaprd tile.
      18             : 
      19             :    snaprd may send a reset notification, which causes snapdc to reset
      20             :    its decompressor state to waiting for either the full or incremental
      21             :    snapshot respectively. */
      22             : 
      23           0 : #define FD_SNAPDC_STATE_DECOMPRESSING (0) /* We are in the process of decompressing a valid stream */
      24           0 : #define FD_SNAPDC_STATE_FINISHING     (1) /* The frame has been fully decompressed, we are waiting to make sure the snapshot has no more data */
      25           0 : #define FD_SNAPDC_STATE_MALFORMED     (2) /* The decompression stream is malformed, we are waiting for a reset notification */
      26           0 : #define FD_SNAPDC_STATE_DONE          (3) /* The decompression stream is done, the tile is waiting for a shutdown message */
      27           0 : #define FD_SNAPDC_STATE_SHUTDOWN      (4) /* The tile is done, been told to shut down, and has likely already exited */
      28             : 
      29             : struct fd_snapdc_tile {
      30             :   int full;
      31             :   int state;
      32             : 
      33             :   ZSTD_DCtx * zstd;
      34             : 
      35             :   struct {
      36             :     fd_wksp_t * wksp;
      37             :     ulong       chunk0;
      38             :     ulong       wmark;
      39             :     ulong       mtu;
      40             :     ulong       frag_pos;
      41             :   } in;
      42             : 
      43             :   struct {
      44             :     fd_wksp_t * wksp;
      45             :     ulong       chunk0;
      46             :     ulong       wmark;
      47             :     ulong       chunk;
      48             :     ulong       mtu;
      49             :   } out;
      50             : 
      51             :   struct {
      52             :     struct {
      53             :       ulong compressed_bytes_read;
      54             :       ulong decompressed_bytes_read;
      55             :     } full;
      56             : 
      57             :     struct {
      58             :       ulong compressed_bytes_read;
      59             :       ulong decompressed_bytes_read;
      60             :     } incremental;
      61             :   } metrics;
      62             : };
      63             : typedef struct fd_snapdc_tile fd_snapdc_tile_t;
      64             : 
      65             : FD_FN_PURE static ulong
      66           0 : scratch_align( void ) {
      67           0 :   return alignof(fd_snapdc_tile_t);
      68           0 : }
      69             : 
      70             : FD_FN_PURE static ulong
      71           0 : scratch_footprint( fd_topo_tile_t const * tile ) {
      72           0 :   (void)tile;
      73           0 :   ulong l = FD_LAYOUT_INIT;
      74           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_snapdc_tile_t), sizeof(fd_snapdc_tile_t)                   );
      75           0 :   l = FD_LAYOUT_APPEND( l, 32UL,                      ZSTD_estimateDStreamSize( ZSTD_WINDOW_SZ ) );
      76           0 :   return FD_LAYOUT_FINI( l, scratch_align() );
      77           0 : }
      78             : 
      79             : static inline int
      80           0 : should_shutdown( fd_snapdc_tile_t * ctx ) {
      81           0 :   return ctx->state==FD_SNAPDC_STATE_SHUTDOWN;
      82           0 : }
      83             : 
      84             : static void
      85           0 : metrics_write( fd_snapdc_tile_t * ctx ) {
      86           0 :   FD_MGAUGE_SET( SNAPDC, FULL_COMPRESSED_BYTES_READ,    ctx->metrics.full.compressed_bytes_read );
      87           0 :   FD_MGAUGE_SET( SNAPDC, FULL_DECOMPRESSED_BYTES_READ,  ctx->metrics.full.decompressed_bytes_read );
      88             : 
      89           0 :   FD_MGAUGE_SET( SNAPDC, INCREMENTAL_COMPRESSED_BYTES_READ,    ctx->metrics.incremental.compressed_bytes_read );
      90           0 :   FD_MGAUGE_SET( SNAPDC, INCREMENTAL_DECOMPRESSED_BYTES_READ,  ctx->metrics.incremental.decompressed_bytes_read );
      91             : 
      92           0 :   FD_MGAUGE_SET( SNAPDC, STATE, (ulong)(ctx->state) );
      93           0 : }
      94             : 
      95             : static inline void
      96             : transition_malformed( fd_snapdc_tile_t *  ctx,
      97           0 :                       fd_stem_context_t * stem ) {
      98           0 :   ctx->state = FD_SNAPDC_STATE_MALFORMED;
      99           0 :   ctx->in.frag_pos = 0UL;
     100           0 :   fd_stem_publish( stem, 1UL, FD_SNAPSHOT_MSG_CTRL_MALFORMED, 0UL, 0UL, 0UL, 0UL, 0UL );
     101           0 : }
     102             : 
     103             : static inline void
     104             : handle_control_frag( fd_snapdc_tile_t *  ctx,
     105             :                      fd_stem_context_t * stem,
     106           0 :                      ulong               sig ) {
     107             :   /* 1. Pass the control message downstream to the next consumer. */
     108           0 :   fd_stem_publish( stem, 0UL, sig, ctx->out.chunk, 0UL, 0UL, 0UL, 0UL );
     109           0 :   ulong error = ZSTD_DCtx_reset( ctx->zstd, ZSTD_reset_session_only );
     110           0 :   if( FD_UNLIKELY( ZSTD_isError( error ) ) ) FD_LOG_ERR(( "ZSTD_DCtx_reset failed (%lu-%s)", error, ZSTD_getErrorName( error ) ));
     111             : 
     112             :   /* 2. Check if the control message is actually valid given the state
     113             :         machine, and if not, return a malformed message to the sender. */
     114           0 :   switch( sig ) {
     115           0 :     case FD_SNAPSHOT_MSG_CTRL_RESET_FULL:
     116           0 :       ctx->state = FD_SNAPDC_STATE_DECOMPRESSING;
     117           0 :       ctx->full = 1;
     118           0 :       ctx->metrics.full.compressed_bytes_read   = 0UL;
     119           0 :       ctx->metrics.full.decompressed_bytes_read = 0UL;
     120           0 :       ctx->metrics.incremental.compressed_bytes_read   = 0UL;
     121           0 :       ctx->metrics.incremental.decompressed_bytes_read = 0UL;
     122           0 :       break;
     123           0 :     case FD_SNAPSHOT_MSG_CTRL_RESET_INCREMENTAL:
     124           0 :       ctx->state = FD_SNAPDC_STATE_DECOMPRESSING;
     125           0 :       ctx->full = 0;
     126           0 :       ctx->metrics.full.compressed_bytes_read   = 0UL;
     127           0 :       ctx->metrics.full.decompressed_bytes_read = 0UL;
     128           0 :       ctx->metrics.incremental.compressed_bytes_read   = 0UL;
     129           0 :       ctx->metrics.incremental.decompressed_bytes_read = 0UL;
     130           0 :       break;
     131           0 :     case FD_SNAPSHOT_MSG_CTRL_EOF_FULL:
     132           0 :       FD_TEST( ctx->full );
     133           0 :       if( FD_UNLIKELY( ctx->state==FD_SNAPDC_STATE_MALFORMED ) ) break;
     134           0 :       else if( FD_UNLIKELY( ctx->state==FD_SNAPDC_STATE_DECOMPRESSING ) ) {
     135           0 :         transition_malformed( ctx, stem );
     136           0 :         break;
     137           0 :       }
     138           0 :       ctx->state = FD_SNAPDC_STATE_DECOMPRESSING;
     139           0 :       ctx->full = 0;
     140           0 :       break;
     141           0 :     case FD_SNAPSHOT_MSG_CTRL_DONE:
     142           0 :       if( FD_UNLIKELY( ctx->state==FD_SNAPDC_STATE_MALFORMED ) ) break;
     143           0 :       else if( FD_UNLIKELY( ctx->state==FD_SNAPDC_STATE_DECOMPRESSING ) ) {
     144           0 :         transition_malformed( ctx, stem );
     145           0 :         break;
     146           0 :       }
     147           0 :       ctx->state = FD_SNAPDC_STATE_DONE;
     148           0 :       break;
     149           0 :     case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
     150           0 :       FD_TEST( ctx->state==FD_SNAPDC_STATE_DONE );
     151           0 :       ctx->state = FD_SNAPDC_STATE_SHUTDOWN;
     152           0 :       metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */
     153           0 :       break;
     154           0 :     default:
     155           0 :       FD_LOG_ERR(( "unexpected control sig %lu", sig ));
     156           0 :       return;
     157           0 :   }
     158             : 
     159             :   /* 3. Acknowledge the control message, so the sender knows we received
     160             :         it.  We must acknowledge after handling the control frag, because
     161             :         if it causes us to generate a malformed transition, that must be
     162             :         sent back to the snaprd controller before the acknowledgement. */
     163           0 :   fd_stem_publish( stem, 1UL, FD_SNAPSHOT_MSG_CTRL_ACK, 0UL, 0UL, 0UL, 0UL, 0UL );
     164           0 : }
     165             : 
     166             : static inline int
     167             : handle_data_frag( fd_snapdc_tile_t *  ctx,
     168             :                   fd_stem_context_t * stem,
     169             :                   ulong               chunk,
     170           0 :                   ulong               sz ) {
     171           0 :   FD_TEST( ctx->state!=FD_SNAPDC_STATE_DONE );
     172             : 
     173           0 :   if( FD_UNLIKELY( ctx->state==FD_SNAPDC_STATE_MALFORMED ) ) return 0;
     174             : 
     175           0 :   if( FD_UNLIKELY( ctx->state==FD_SNAPDC_STATE_FINISHING ) ) {
     176             :     /* We thought the snapshot was finished (we already read the full
     177             :        frame) and then we got another data fragment from the reader.
     178             :        This means the snapshot has extra padding or garbage on the end,
     179             :        which we don't trust so just abandon it completely. */
     180           0 :     transition_malformed( ctx, stem );
     181           0 :     return 0;
     182           0 :   }
     183             : 
     184           0 :   FD_TEST( ctx->state==FD_SNAPDC_STATE_DECOMPRESSING );
     185           0 :   FD_TEST( chunk>=ctx->in.chunk0 && chunk<=ctx->in.wmark && sz<=ctx->in.mtu && sz>=ctx->in.frag_pos );
     186             : 
     187           0 :   uchar const * data = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
     188             : 
     189           0 :   uchar const * in  = data+ctx->in.frag_pos;
     190           0 :   uchar * out = fd_chunk_to_laddr( ctx->out.wksp, ctx->out.chunk );
     191           0 :   ulong in_consumed = 0UL, out_produced = 0UL;
     192           0 :   ulong error = ZSTD_decompressStream_simpleArgs( ctx->zstd,
     193           0 :                                                   out,
     194           0 :                                                   ctx->out.mtu,
     195           0 :                                                   &out_produced,
     196           0 :                                                   in,
     197           0 :                                                   sz-ctx->in.frag_pos,
     198           0 :                                                   &in_consumed );
     199           0 :   if( FD_UNLIKELY( ZSTD_isError( error ) ) ) {
     200           0 :     transition_malformed( ctx, stem );
     201           0 :     return 0;
     202           0 :   }
     203             : 
     204           0 :   if( FD_LIKELY( out_produced ) ) {
     205           0 :     fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_DATA, ctx->out.chunk, out_produced, 0UL, 0UL, 0UL );
     206           0 :     ctx->out.chunk = fd_dcache_compact_next( ctx->out.chunk, out_produced, ctx->out.chunk0, ctx->out.wmark );
     207           0 :   }
     208             : 
     209           0 :   ctx->in.frag_pos += in_consumed;
     210           0 :   FD_TEST( ctx->in.frag_pos<=sz );
     211             : 
     212           0 :   if( FD_LIKELY( ctx->full ) ) {
     213           0 :     ctx->metrics.full.compressed_bytes_read   += in_consumed;
     214           0 :     ctx->metrics.full.decompressed_bytes_read += out_produced;
     215           0 :   } else {
     216           0 :     ctx->metrics.incremental.compressed_bytes_read   += in_consumed;
     217           0 :     ctx->metrics.incremental.decompressed_bytes_read += out_produced;
     218           0 :   }
     219             : 
     220           0 :   if( FD_UNLIKELY( !error ) ) {
     221           0 :     if( FD_UNLIKELY( ctx->in.frag_pos!=sz ) ) {
     222             :       /* Zstandard finished decoding the snapshot frame (the whole
     223             :          snapshot is a single frame), but, the fragment we got from
     224             :          the snapshot reader has not been fully consumed, so there is
     225             :          some trailing padding or garbage at the end of the snapshot.
     226             : 
     227             :          This is not valid under the snapshot format and indicates a
     228             :          problem so we abandon the snapshot. */
     229           0 :       transition_malformed( ctx, stem );
     230           0 :       return 0;
     231           0 :     }
     232             : 
     233           0 :     ctx->state = FD_SNAPDC_STATE_FINISHING;
     234           0 :   }
     235             : 
     236           0 :   int maybe_more_output = out_produced==ctx->out.mtu || ctx->in.frag_pos<sz;
     237           0 :   if( FD_LIKELY( !maybe_more_output ) ) ctx->in.frag_pos = 0UL;
     238           0 :   return maybe_more_output;
     239           0 : }
     240             : 
     241             : static inline int
     242             : returnable_frag( fd_snapdc_tile_t *  ctx,
     243             :                  ulong               in_idx,
     244             :                  ulong               seq,
     245             :                  ulong               sig,
     246             :                  ulong               chunk,
     247             :                  ulong               sz,
     248             :                  ulong               ctl,
     249             :                  ulong               tsorig,
     250             :                  ulong               tspub,
     251           0 :                  fd_stem_context_t * stem ) {
     252           0 :   (void)in_idx;
     253           0 :   (void)seq;
     254           0 :   (void)ctl;
     255           0 :   (void)tsorig;
     256           0 :   (void)tspub;
     257             : 
     258           0 :   FD_TEST( ctx->state!=FD_SNAPDC_STATE_SHUTDOWN );
     259             : 
     260           0 :   if( FD_LIKELY( sig==FD_SNAPSHOT_MSG_DATA ) ) return handle_data_frag( ctx, stem, chunk, sz );
     261           0 :   else                                                handle_control_frag( ctx,stem, sig );
     262             : 
     263           0 :   return 0;
     264           0 : }
     265             : 
     266             : static ulong
     267             : populate_allowed_fds( fd_topo_t      const * topo FD_PARAM_UNUSED,
     268             :                       fd_topo_tile_t const * tile FD_PARAM_UNUSED,
     269             :                       ulong                  out_fds_cnt,
     270           0 :                       int *                  out_fds ) {
     271           0 :   if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
     272             : 
     273           0 :   ulong out_cnt = 0;
     274           0 :   out_fds[ out_cnt++ ] = 2UL; /* stderr */
     275           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
     276           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
     277           0 :   }
     278             : 
     279           0 :   return out_cnt;
     280           0 : }
     281             : 
     282             : static ulong
     283             : populate_allowed_seccomp( fd_topo_t const *      topo FD_PARAM_UNUSED,
     284             :                           fd_topo_tile_t const * tile FD_PARAM_UNUSED,
     285             :                           ulong                  out_cnt,
     286           0 :                           struct sock_filter *   out ) {
     287           0 :   populate_sock_filter_policy_fd_snapdc_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() );
     288           0 :   return sock_filter_policy_fd_snapdc_tile_instr_cnt;
     289           0 : }
     290             : 
     291             : static void
     292             : unprivileged_init( fd_topo_t *      topo,
     293           0 :                    fd_topo_tile_t * tile ) {
     294           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     295             : 
     296           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     297           0 :   fd_snapdc_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapdc_tile_t), sizeof(fd_snapdc_tile_t) );
     298           0 :   void * _zstd           = FD_SCRATCH_ALLOC_APPEND( l, 32UL,                      ZSTD_estimateDStreamSize( ZSTD_WINDOW_SZ ) );
     299             : 
     300           0 :   ctx->full = 1;
     301           0 :   ctx->state = FD_SNAPDC_STATE_DECOMPRESSING;
     302           0 :   ctx->zstd = ZSTD_initStaticDStream( _zstd, ZSTD_estimateDStreamSize( ZSTD_WINDOW_SZ ) );
     303           0 :   FD_TEST( ctx->zstd );
     304           0 :   FD_TEST( ctx->zstd==_zstd );
     305             : 
     306           0 :   ctx->in.frag_pos = 0UL;
     307           0 :   fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
     308             : 
     309           0 :   if( FD_UNLIKELY( tile->in_cnt !=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 1",  tile->in_cnt  ));
     310           0 :   if( FD_UNLIKELY( tile->out_cnt!=2UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2", tile->out_cnt ));
     311             : 
     312           0 :   fd_topo_link_t * writer_link = &topo->links[ tile->out_link_id[ 0UL ] ];
     313           0 :   ctx->out.wksp   = topo->workspaces[ topo->objs[ writer_link->dcache_obj_id ].wksp_id ].wksp;
     314           0 :   ctx->out.chunk0 = fd_dcache_compact_chunk0( ctx->out.wksp, writer_link->dcache );
     315           0 :   ctx->out.wmark  = fd_dcache_compact_wmark ( ctx->out.wksp, writer_link->dcache, writer_link->mtu );
     316           0 :   ctx->out.chunk  = ctx->out.chunk0;
     317           0 :   ctx->out.mtu    = writer_link->mtu;
     318             : 
     319           0 :   fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ 0UL ] ];
     320           0 :   fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
     321           0 :   ctx->in.wksp                   = in_wksp->wksp;;
     322           0 :   ctx->in.chunk0                 = fd_dcache_compact_chunk0( ctx->in.wksp, in_link->dcache );
     323           0 :   ctx->in.wmark                  = fd_dcache_compact_wmark( ctx->in.wksp, in_link->dcache, in_link->mtu );
     324           0 :   ctx->in.mtu                    = in_link->mtu;
     325             : 
     326           0 :   ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, 1UL );
     327           0 :   if( FD_UNLIKELY( scratch_top > (ulong)scratch + scratch_footprint( tile ) ) )
     328           0 :     FD_LOG_ERR(( "scratch overflow %lu %lu %lu",
     329           0 :                  scratch_top - (ulong)scratch - scratch_footprint( tile ),
     330           0 :                  scratch_top,
     331           0 :                  (ulong)scratch + scratch_footprint( tile ) ));
     332           0 : }
     333             : 
     334           0 : #define STEM_BURST 3UL /* For control fragments, one downstream clone, one acknowledgement, and one malformed message */
     335           0 : #define STEM_LAZY  1000L
     336             : 
     337           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_snapdc_tile_t
     338           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapdc_tile_t)
     339             : 
     340             : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
     341           0 : #define STEM_CALLBACK_METRICS_WRITE   metrics_write
     342           0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
     343             : 
     344             : #include "../../disco/stem/fd_stem.c"
     345             : 
     346             : fd_topo_run_tile_t fd_tile_snapdc = {
     347             :   .name                     = NAME,
     348             :   .populate_allowed_fds     = populate_allowed_fds,
     349             :   .populate_allowed_seccomp = populate_allowed_seccomp,
     350             :   .scratch_align            = scratch_align,
     351             :   .scratch_footprint        = scratch_footprint,
     352             :   .unprivileged_init        = unprivileged_init,
     353             :   .run                      = stem_run,
     354             : };
     355             : 
     356             : #undef NAME

Generated by: LCOV version 1.14