LCOV - code coverage report
Current view: top level - discof/restore - fd_snapmk_para.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 474 0.0 %
Date: 2025-12-07 04:58:33 Functions: 0 4 0.0 %

          Line data    Source code
       1             :  #include "../../util/fd_util.h"
       2             : #include "../../tango/fd_tango.h"
       3             : #include "../../util/archive/fd_tar.h"
       4             : #include <errno.h>
       5             : #include <stdio.h>
       6             : #include <stdlib.h>
       7             : #include <zstd.h>
       8             : 
       9             : #define SNAPMK_MAGIC        (0xf212f209fd944ba2UL)
      10           0 : #define SNAPMK_PARA_ENABLE  (0x72701281047a55b8UL)
      11           0 : #define SNAPMK_PARA_DISABLE (0xd629be3208ad6fb4UL)
      12             : 
      13             : #define WKSP_TAG (1UL)
      14             : 
      15           0 : #define COMP_TILE_MAX (63UL)
      16             : 
      17           0 : #define ORIG_PARA_ENABLE   1  /* start of parallel section */
      18           0 : #define ORIG_PARA_DISABLE  2  /* end of parallel section */
      19           0 : #define ORIG_SHUTDOWN      3  /* shutdown signal */
      20             : 
      21             : struct link {
      22             :   fd_frag_meta_t * mcache;
      23             :   uchar *          dcache;
      24             :   ulong            chunk0;
      25             :   ulong            chunk;
      26             :   ulong            wmark;
      27             : };
      28             : 
      29             : typedef struct link link_t;
      30             : 
      31             : static struct {
      32             :   fd_wksp_t * wksp;
      33             :   ulong       comp_cnt;
      34             :   ulong       comp_depth;
      35             :   ulong       comp_mtu;
      36             :   ulong *     wr_fseqs [ COMP_TILE_MAX ];
      37             :   link_t      tar_links[ COMP_TILE_MAX ];
      38             :   link_t      zst_links[ COMP_TILE_MAX ];
      39             :   ulong *     comp_fseq[ COMP_TILE_MAX ];
      40             :   FILE *      in_file;
      41             :   ulong       in_file_sz;
      42             :   FILE *      out_file;
      43             :   ulong       frame_sz;
      44             : } glob;
      45             : 
      46             : static int
      47             : rd_tile_exec( int     argc,
      48           0 :               char ** argv ) {
      49           0 :   (void)argc; (void)argv;
      50             : 
      51           0 :   fd_wksp_t * wksp       = glob.wksp;
      52           0 :   FILE *      in_file    = glob.in_file;
      53           0 :   ulong       in_file_sz = glob.in_file_sz;
      54           0 :   ulong       comp_cnt   = glob.comp_cnt;
      55           0 :   ulong       comp_depth = glob.comp_depth;
      56           0 :   link_t *    tar_links  = glob.tar_links;
      57           0 :   ulong **    comp_fseqs = glob.comp_fseq;
      58           0 :   ulong       out_seqs [ COMP_TILE_MAX ] = {0UL};
      59           0 :   fd_fctl_t * fctls    [ COMP_TILE_MAX ] = {0UL};
      60           0 :   ulong       cr_avails[ COMP_TILE_MAX ] = {0UL};
      61           0 :   ulong       mtu        = glob.comp_mtu;
      62           0 :   ulong       frame_off  = 0UL;
      63             : 
      64           0 :   ulong slow_diag = 0UL;
      65           0 :   uchar fctl_mem[ COMP_TILE_MAX*FD_FCTL_FOOTPRINT( 1UL ) ] __attribute__((aligned(FD_FCTL_ALIGN)));
      66           0 :   for( ulong i=0UL; i<comp_cnt; i++ ) {
      67           0 :     fd_fctl_t * fctl = fd_fctl_join( fd_fctl_new( fctl_mem+(i*FD_FCTL_FOOTPRINT( 1UL )), COMP_TILE_MAX ) );
      68           0 :     FD_TEST( fctl );
      69           0 :     FD_TEST( fd_fctl_cfg_rx_add( fctl, comp_depth, comp_fseqs[i], &slow_diag ) );
      70           0 :     FD_TEST( fd_fctl_cfg_done( fctl, 1UL, 0UL, 0UL, 0UL ) );
      71           0 :     fctls[ i ] = fctl;
      72           0 :   }
      73             : 
      74             :   /* Enable load-balancing once first accounts/ file was found */
      75           0 :   _Bool enable_lb = 0;
      76             : 
      77           0 :   FD_LOG_NOTICE(( "Reader start" ));
      78             : 
      79           0 :   ulong out_idx       = 0UL;
      80           0 :   ulong last_stat_off = 0UL;
      81           0 :   long  last_stat     = fd_log_wallclock();
      82           0 :   for(;;) {
      83           0 :     long off = ftell( in_file );
      84           0 :     if( FD_LIKELY( off>=0L ) ) {
      85           0 :       ulong since_last_stat = (ulong)off - last_stat_off;
      86           0 :       if( FD_UNLIKELY( since_last_stat>=(1UL<<27) ) ) {
      87           0 :         long now = fd_log_wallclock();
      88           0 :         last_stat_off = (ulong)off;
      89           0 :         FD_LOG_NOTICE(( "%8.3f / %8.3f GB (%4.1f %%)  %8.2f MB/s",
      90           0 :             (double)last_stat_off/1e9,
      91           0 :             (double)in_file_sz   /1e9,
      92           0 :             100.0 * (double)last_stat_off/(double)in_file_sz,
      93           0 :             ((double)since_last_stat*1e3) / (double)(now - last_stat) ));
      94           0 :         last_stat = now;
      95           0 :       }
      96           0 :     }
      97             : 
      98             :     /* Process TAR header */
      99             : 
     100           0 :     ulong   chunk       = tar_links[ out_idx ].chunk;
     101           0 :     void *  chunk_laddr = fd_chunk_to_laddr( wksp, chunk );
     102           0 :     union {
     103           0 :       fd_tar_meta_t hdr;
     104           0 :       uchar         buf[512];
     105           0 :     } * tar = chunk_laddr;
     106           0 :     if( FD_UNLIKELY( fread( tar, sizeof(tar->hdr), 1UL, in_file )!=1UL ) ) {
     107           0 :       int err = ferror( in_file );
     108           0 :       FD_LOG_ERR(( "fread failed (%i-%s)", err, fd_io_strerror( err ) ));
     109           0 :     }
     110             : 
     111           0 :     if( FD_UNLIKELY( memcmp( tar->hdr.magic, FD_TAR_MAGIC, 5UL ) ) ) {
     112           0 :       int not_zero = 0;
     113           0 :       for( ulong i=0UL; i<512UL; i++ ) not_zero |= tar->buf[i];
     114           0 :       if( FD_UNLIKELY( not_zero ) ) FD_LOG_ERR(( "invalid tar header magic `%s`", tar->hdr.magic ));
     115             : 
     116             :       /* EOF marker reached */
     117             : 
     118             :       /* Broadcast barrier signal, for non-zero tile also shutdown signal */
     119           0 :       for( ulong out2=0UL; out2<comp_cnt; out2++ ) {
     120           0 :         while( cr_avails[ out2 ]<3 ) {
     121           0 :           cr_avails[ out2 ] = fd_fctl_tx_cr_update( fctls[ out2 ], cr_avails[ out2 ], out_seqs[ out2 ] );
     122           0 :         }
     123           0 :         fd_mcache_publish(
     124           0 :             tar_links[ out2 ].mcache,
     125           0 :             comp_depth,
     126           0 :             out_seqs[ out2 ]++,
     127           0 :             0UL,
     128           0 :             0UL,
     129           0 :             0UL,
     130           0 :             fd_frag_meta_ctl( 0UL, 0, 1, 0 ),
     131           0 :             0UL,
     132           0 :             0UL
     133           0 :         );
     134           0 :         fd_mcache_publish(
     135           0 :             tar_links[ out2 ].mcache,
     136           0 :             comp_depth,
     137           0 :             out_seqs[ out2 ]++,
     138           0 :             0UL,
     139           0 :             0UL,
     140           0 :             0UL,
     141           0 :             fd_frag_meta_ctl( ORIG_PARA_DISABLE, 0, 1, 0 ),
     142           0 :             0UL,
     143           0 :             0UL
     144           0 :         );
     145           0 :         cr_avails[ out2 ]--;
     146           0 :         if( out2>0UL ) {
     147           0 :           fd_mcache_publish(
     148           0 :               tar_links[ out2 ].mcache,
     149           0 :               comp_depth,
     150           0 :               out_seqs[ out2 ]++,
     151           0 :               0UL,
     152           0 :               0UL,
     153           0 :               0UL,
     154           0 :               fd_frag_meta_ctl( ORIG_SHUTDOWN, 0, 1, 0 ),
     155           0 :               0UL,
     156           0 :               0UL
     157           0 :           );
     158           0 :           cr_avails[ out2 ]--;
     159           0 :         }
     160           0 :       }
     161             : 
     162             :       /* Seek back since we need to retransmit EOF marker */
     163           0 :       if( FD_UNLIKELY( fseek( in_file, -512L, SEEK_CUR ) ) ) {
     164           0 :         FD_LOG_ERR(( "fseek failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     165           0 :       }
     166             : 
     167           0 :       break;
     168           0 :     }
     169             : 
     170           0 :     ulong const file_sz = fd_tar_meta_get_size( &tar->hdr );
     171           0 :     if( FD_UNLIKELY( file_sz==ULONG_MAX ) ) FD_LOG_ERR(( "invalid tar file size" ));
     172             : 
     173           0 :     if( FD_UNLIKELY( tar->hdr.typeflag!=FD_TAR_TYPE_DIR && !fd_tar_meta_is_reg( &tar->hdr ) ) ) {
     174           0 :       FD_LOG_WARNING(( "invalid tar header type %d", tar->hdr.typeflag ));
     175           0 :     }
     176           0 :     ulong const align_sz = fd_ulong_align_up( file_sz, 512UL );
     177             : 
     178             :     /* See if we can switch to load-balancing */
     179             : 
     180           0 :     if( FD_UNLIKELY( !enable_lb ) ) {
     181           0 :       if( 0==strncmp( tar->hdr.name, "accounts/", 9UL ) ) {
     182             :         /* Send barrier signal */
     183           0 :         while( !cr_avails[ out_idx ] ) {
     184           0 :           cr_avails[ out_idx ] = fd_fctl_tx_cr_update( fctls[ out_idx ], cr_avails[ out_idx ], out_seqs[ out_idx ] );
     185           0 :         }
     186           0 :         fd_mcache_publish(
     187           0 :             tar_links[ out_idx ].mcache,
     188           0 :             comp_depth,
     189           0 :             out_seqs[ out_idx ]++,
     190           0 :             0UL,
     191           0 :             0UL,
     192           0 :             0UL,
     193           0 :             fd_frag_meta_ctl( ORIG_PARA_ENABLE, 0, 1, 0 ),
     194           0 :             0UL,
     195           0 :             0UL
     196           0 :         );
     197             :         /* Poll for barrier receive */
     198           0 :         ulong * wr_fseq = glob.wr_fseqs[0];
     199           0 :         for(;;) {
     200           0 :           FD_COMPILER_MFENCE();
     201           0 :           ulong sig = FD_VOLATILE_CONST( wr_fseq[1] );
     202           0 :           FD_COMPILER_MFENCE();
     203           0 :           if( sig==1UL ) break;
     204           0 :           FD_SPIN_PAUSE();
     205           0 :         }
     206           0 :         FD_LOG_NOTICE(( "Reader enabling load-balancing" ));
     207           0 :         enable_lb = 1;
     208           0 :       }
     209           0 :     }
     210             : 
     211             :     /* Send data frags */
     212             : 
     213           0 :     _Bool   eom      = 0;
     214           0 :     ulong   rem      = align_sz;
     215           0 :     uchar * data     = (uchar *)( tar+1 );
     216           0 :     ulong   data_max = mtu - 512UL;
     217           0 :     do {
     218           0 :       while( !cr_avails[ out_idx ] ) {
     219           0 :         cr_avails[ out_idx ] = fd_fctl_tx_cr_update( fctls[ out_idx ], cr_avails[ out_idx ], out_seqs[ out_idx ] );
     220           0 :       }
     221             : 
     222           0 :       ulong data_sz = fd_ulong_min( rem, data_max );
     223           0 :       if( data_sz ) {
     224           0 :         if( FD_UNLIKELY( fread( data, data_sz, 1UL, in_file )!=1UL ) ) {
     225           0 :           FD_LOG_ERR(( "fread failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     226           0 :         }
     227           0 :       }
     228           0 :       rem -= data_sz;
     229           0 :       if( !rem ) {
     230           0 :         frame_off += 512UL + align_sz;
     231           0 :         eom = frame_off>=glob.frame_sz;
     232           0 :       }
     233             : 
     234             :       //if( eom ) FD_LOG_NOTICE(( "finished burst out_idx=%lu out_seq=%lu sz=%lu", out_idx, out_seqs[ out_idx ], frame_off ));
     235           0 :       ulong frag_sz = (ulong)data+data_sz-(ulong)chunk_laddr;
     236           0 :       fd_mcache_publish(
     237           0 :           tar_links[ out_idx ].mcache,
     238           0 :           comp_depth,
     239           0 :           out_seqs[ out_idx ]++,
     240           0 :           frag_sz,
     241           0 :           chunk,
     242           0 :           0UL,
     243           0 :           fd_frag_meta_ctl( 0UL, 0, eom, 0 ),
     244           0 :           0UL,
     245           0 :           0UL
     246           0 :       );
     247           0 :       cr_avails[ out_idx ]--;
     248             : 
     249           0 :       chunk       = fd_dcache_compact_next( chunk, frag_sz, tar_links[ out_idx ].chunk0, tar_links[ out_idx ].wmark );
     250           0 :       chunk_laddr = fd_chunk_to_laddr( wksp, chunk );
     251           0 :       data        = chunk_laddr;
     252           0 :       data_max    = mtu;
     253           0 :       tar_links[ out_idx ].chunk = chunk;
     254           0 :     } while( rem );
     255             : 
     256             :     /* Select next index */
     257             : 
     258           0 :     if( eom && enable_lb ) {
     259           0 :       frame_off = 0UL;
     260           0 :       out_idx++;
     261           0 :       if( out_idx>=comp_cnt ) out_idx = 0UL;
     262           0 :     }
     263           0 :   }
     264             : 
     265             :   /* Send tail end of data to tile 0 */
     266           0 :   for(;;) {
     267           0 :     size_t read_sz = fread( fd_chunk_to_laddr( wksp, tar_links[ 0UL ].chunk ), 1UL, mtu, in_file );
     268           0 :     if( FD_UNLIKELY( read_sz==0UL ) ) {
     269           0 :       if( feof( in_file ) ) break;
     270           0 :       FD_LOG_ERR(( "fread failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     271           0 :     }
     272           0 :     while( !cr_avails[ 0UL ] ) {
     273           0 :       cr_avails[ 0UL ] = fd_fctl_tx_cr_update( fctls[ 0UL ], cr_avails[ 0UL ], out_seqs[ 0UL ] );
     274           0 :     }
     275           0 :     fd_mcache_publish(
     276           0 :         tar_links[ 0UL ].mcache,
     277           0 :         comp_depth,
     278           0 :         out_seqs[ 0UL ]++,
     279           0 :         read_sz,
     280           0 :         tar_links[ 0UL ].chunk,
     281           0 :         0UL,
     282           0 :         fd_frag_meta_ctl( 0UL, 0, 0, 0 ),
     283           0 :         0UL,
     284           0 :         0UL
     285           0 :     );
     286           0 :     cr_avails[ 0UL ]--;
     287           0 :     tar_links[ 0UL ].chunk = fd_dcache_compact_next( tar_links[ 0UL ].chunk, read_sz, tar_links[ 0UL ].chunk0, tar_links[ 0UL ].wmark );
     288           0 :   }
     289             : 
     290             :   /* Write shutdown signal */
     291           0 :   while( cr_avails[ 0 ]<2 ) cr_avails[ 0 ] = fd_fctl_tx_cr_update( fctls[ 0 ], cr_avails[ 0 ], out_seqs[ 0 ] );
     292           0 :   fd_mcache_publish(
     293           0 :       tar_links[ 0 ].mcache,
     294           0 :       comp_depth,
     295           0 :       out_seqs[ 0 ]++,
     296           0 :       0UL,
     297           0 :       0UL,
     298           0 :       0UL,
     299           0 :       fd_frag_meta_ctl( 0UL, 0, 1, 0 ),
     300           0 :       0UL,
     301           0 :       0UL
     302           0 :   );
     303           0 :   fd_mcache_publish(
     304           0 :       tar_links[ 0 ].mcache,
     305           0 :       comp_depth,
     306           0 :       out_seqs[ 0 ]++,
     307           0 :       0UL,
     308           0 :       tar_links[ 0 ].chunk,
     309           0 :       0UL,
     310           0 :       fd_frag_meta_ctl( ORIG_SHUTDOWN, 0, 1, 0 ),
     311           0 :       0UL,
     312           0 :       0UL
     313           0 :   );
     314           0 :   cr_avails[ 0 ]--;
     315             : 
     316           0 :   FD_LOG_NOTICE(( "Reader done" ));
     317             : 
     318           0 :   return 0;
     319           0 : }
     320             : 
     321             : static int
     322             : comp_tile_exec( int     argc,
     323           0 :                 char ** argv ) {
     324           0 :   (void)argc; (void)argv;
     325             : 
     326           0 :   uint rng_seed = (uint)fd_ulong_hash( (uint)fd_tickcount()+fd_tile_idx() );
     327           0 :   fd_rng_t _rng[1]; fd_rng_t * rng = fd_rng_join( fd_rng_new( _rng, rng_seed, 0UL ) );
     328             : 
     329           0 :   fd_wksp_t *      wksp       = glob.wksp;
     330           0 :   ulong            comp_idx   = fd_tile_idx()-2UL; FD_TEST( fd_tile_idx()>=2UL );
     331           0 :   ulong            depth      = glob.comp_depth;
     332           0 :   fd_frag_meta_t * in_mcache  = glob.tar_links[ comp_idx ].mcache;
     333           0 :   ulong            in_seq     = 0UL;
     334           0 :   ulong *          fseq       = glob.comp_fseq[ comp_idx ];
     335           0 :   fd_frag_meta_t * out_mcache = glob.zst_links[ comp_idx ].mcache;
     336           0 :   uchar *          out_dcache = glob.zst_links[ comp_idx ].dcache;
     337           0 :   ulong            out_chunk0 = glob.zst_links[ comp_idx ].chunk0;
     338           0 :   ulong            out_seq    = 0UL;
     339             : 
     340           0 :   uchar fctl_mem[ FD_FCTL_FOOTPRINT( 1UL ) ] __attribute__((aligned(FD_FCTL_ALIGN)));
     341           0 :   fd_fctl_t * fctl = fd_fctl_join( fd_fctl_new( fctl_mem, 1UL ) );
     342           0 :   FD_TEST( fctl );
     343           0 :   ulong slow_diag;
     344           0 :   FD_TEST( fd_fctl_cfg_rx_add( fctl, depth, glob.wr_fseqs[ comp_idx ], &slow_diag ) );
     345           0 :   FD_TEST( fd_fctl_cfg_done( fctl, 1UL, 0UL, 0UL, 0UL ) );
     346             : 
     347           0 :   ulong async_min = 1UL<<7;
     348           0 :   ulong async_rem = 1UL; /* Do housekeeping on first iteration */
     349           0 :   ulong cr_avail  = 0UL;
     350             : 
     351           0 :   ZSTD_CStream * zst = ZSTD_createCStream();
     352           0 :   if( FD_UNLIKELY( !zst ) ) FD_LOG_ERR(( "ZSTD_createCStream() failed" ));
     353           0 :   ZSTD_initCStream( zst, 3 );
     354             : 
     355           0 :   ulong out_chunk = out_chunk0;
     356           0 :   ulong out_mtu   = ZSTD_COMPRESSBOUND( glob.comp_mtu );
     357           0 :   ZSTD_outBuffer zst_out = {
     358           0 :     .dst  = fd_chunk_to_laddr( wksp, out_chunk ),
     359           0 :     .size = out_mtu,
     360           0 :     .pos  = 0UL
     361           0 :   };
     362             : 
     363           0 :   for(;;) {
     364           0 :     fd_frag_meta_t const * mline;
     365           0 :     ulong                  seq_found;
     366           0 :     long                   diff;
     367             : 
     368           0 :     ulong in_sig;
     369           0 :     ulong in_chunk;
     370           0 :     ulong in_sz;
     371           0 :     ulong in_ctl;
     372           0 :     ulong in_tsorig;
     373           0 :     ulong in_tspub;
     374           0 :     FD_MCACHE_WAIT_REG( in_sig, in_chunk, in_sz, in_ctl, in_tsorig, in_tspub, mline, seq_found, diff, async_rem, in_mcache, depth, in_seq );
     375           0 :     (void)mline; (void)seq_found; (void)in_sz; (void)in_tsorig; (void)in_tspub;
     376             : 
     377           0 :     if( FD_UNLIKELY( !async_rem ) ) {
     378           0 :       fd_fctl_rx_cr_return( fseq, in_seq );
     379           0 :       cr_avail = fd_fctl_tx_cr_update( fctl, cr_avail, out_seq );
     380           0 :       async_rem = fd_tempo_async_reload( rng, async_min );
     381           0 :       continue;
     382           0 :     }
     383             : 
     384           0 :     if( FD_UNLIKELY( diff>0 ) ) {
     385           0 :       FD_LOG_ERR(( "Overrun while polling" ));
     386           0 :     }
     387           0 :     FD_TEST( diff==0 );
     388             : 
     389           0 :     ulong in_orig = fd_frag_meta_ctl_orig( in_ctl );
     390           0 :     if( FD_UNLIKELY( in_orig ) ) {
     391           0 :       FD_TEST( zst_out.pos==0 );
     392             : 
     393             :       /* Forward control signal */
     394           0 :       while( !cr_avail ) cr_avail = fd_fctl_tx_cr_update( fctl, cr_avail, out_seq );
     395           0 :       fd_mcache_publish( out_mcache, depth, out_seq++, 0UL, 0UL, 0UL, fd_frag_meta_ctl( in_orig, 0, 0, 0 ), 0UL, 0UL );
     396           0 :       in_seq = fd_seq_inc( in_seq, 1UL );
     397             : 
     398           0 :       if( in_orig==ORIG_SHUTDOWN ) break;
     399           0 :       continue;
     400           0 :     }
     401             : 
     402           0 :     ZSTD_inBuffer zst_in = {
     403           0 :       .src    = fd_chunk_to_laddr( wksp, in_chunk ),
     404           0 :       .size   = in_sig,
     405           0 :       .pos    = 0UL
     406           0 :     };
     407           0 :     for(;;) {
     408           0 :       size_t const ret = ZSTD_compressStream( zst, &zst_out, &zst_in );
     409           0 :       if( FD_UNLIKELY( ZSTD_isError( ret ) ) ) {
     410           0 :         FD_LOG_ERR(( "ZSTD_compressStream() failed: %s", ZSTD_getErrorName( ret ) ));
     411           0 :       }
     412           0 :       if( FD_LIKELY( zst_in.pos==zst_in.size ) ) break;
     413             : 
     414             :       /* Flush */
     415           0 :       ulong chunk_sz = zst_out.pos;
     416           0 :       while( !cr_avail ) cr_avail = fd_fctl_tx_cr_update( fctl, cr_avail, out_seq );
     417           0 :       fd_mcache_publish( out_mcache, depth, out_seq++, chunk_sz, out_chunk, 0UL, fd_frag_meta_ctl( 0UL, 0, 0, 0 ), 0UL, 0UL );
     418           0 :       out_chunk = fd_dcache_compact_next( out_chunk, chunk_sz, out_chunk0, glob.zst_links[ comp_idx ].wmark );
     419           0 :       cr_avail--;
     420           0 :       zst_out.dst = fd_chunk_to_laddr( wksp, out_chunk );
     421           0 :       zst_out.pos = 0UL;
     422           0 :     }
     423             : 
     424           0 :     if( fd_frag_meta_ctl_eom( in_ctl ) ) {
     425           0 :       for(;;) {
     426           0 :         ulong ret = ZSTD_endStream( zst, &zst_out );
     427           0 :         if( FD_UNLIKELY( ZSTD_isError( ret ) ) ) {
     428           0 :           FD_LOG_ERR(( "ZSTD_endStream() failed: %s", ZSTD_getErrorName( ret ) ));
     429           0 :         }
     430             : 
     431             :         /* Flush */
     432           0 :         int eom = !ret;
     433           0 :         ulong chunk_sz = zst_out.pos;
     434           0 :         while( !cr_avail ) cr_avail = fd_fctl_tx_cr_update( fctl, cr_avail, out_seq );
     435           0 :         fd_mcache_publish( out_mcache, depth, out_seq++, chunk_sz, out_chunk, 0UL, fd_frag_meta_ctl( 0UL, 0, eom, 0 ), 0UL, 0UL );
     436           0 :         out_chunk = fd_dcache_compact_next( out_chunk, chunk_sz, out_chunk0, glob.zst_links[ comp_idx ].wmark );
     437           0 :         cr_avail--;
     438           0 :         zst_out.dst = fd_chunk_to_laddr( wksp, out_chunk );
     439           0 :         zst_out.pos = 0UL;
     440             :         // if( eom ) FD_LOG_NOTICE(( "finished burst comp_idx=%lu in_seq=%lu out_seq=%lu", comp_idx, in_seq, out_seq-1UL ));
     441           0 :         if( eom ) break;
     442           0 :       }
     443           0 :     }
     444             : 
     445           0 :     in_seq = fd_seq_inc( in_seq, 1UL );
     446           0 :   }
     447             : 
     448           0 :   if( zst_out.pos < zst_out.size ) {
     449             :     /* Flush */
     450           0 :     ulong chunk_sz = zst_out.pos;
     451           0 :     while( !cr_avail ) cr_avail = fd_fctl_tx_cr_update( fctl, cr_avail, out_seq );
     452           0 :     fd_mcache_publish( out_mcache, depth, out_seq++, chunk_sz, out_chunk, 0UL, fd_frag_meta_ctl( 0UL, 0, 1, 0 ), 0UL, 0UL );
     453           0 :     out_chunk = fd_dcache_compact_next( out_chunk, chunk_sz, out_chunk0, glob.zst_links[ comp_idx ].wmark );
     454           0 :     cr_avail--;
     455           0 :     zst_out.dst = fd_chunk_to_laddr( out_dcache, out_chunk );
     456           0 :     zst_out.pos = 0UL;
     457           0 :   }
     458             : 
     459           0 :   fd_mcache_seq_update( fd_mcache_seq_laddr( out_mcache ), out_seq );
     460             : 
     461           0 :   fd_rng_delete( fd_rng_leave( rng ) );
     462             : 
     463           0 :   return 0;
     464           0 : }
     465             : 
     466             : static int
     467             : wr_tile_exec( int     argc,
     468           0 :               char ** argv ) {
     469           0 :   (void)argc; (void)argv;
     470             : 
     471           0 :   uint rng_seed = (uint)fd_ulong_hash( (uint)fd_tickcount()+fd_tile_idx() );
     472           0 :   fd_rng_t _rng[1]; fd_rng_t * rng = fd_rng_join( fd_rng_new( _rng, rng_seed, 0UL ) );
     473             : 
     474           0 :   fd_wksp_t * wksp    = glob.wksp;
     475           0 :   ulong      comp_cnt = glob.comp_cnt;
     476           0 :   ulong      depth    = glob.comp_depth;
     477           0 :   FILE *     out_file = glob.out_file;
     478           0 :   ulong **   fseqs    = glob.wr_fseqs;
     479           0 :   ulong      in_seqs[ COMP_TILE_MAX ] = {0UL};
     480             : 
     481           0 :   ulong active_set = (1UL<<comp_cnt)-1UL;
     482           0 :   ulong drain_set  = (1UL<<comp_cnt)-1UL;
     483           0 :   ulong dirty_set  = 0UL;
     484             : 
     485           0 :   uchar fctl_mem[ FD_FCTL_FOOTPRINT( COMP_TILE_MAX ) ] __attribute__((aligned(FD_FCTL_ALIGN)));
     486           0 :   fd_fctl_t * fctl = fd_fctl_join( fd_fctl_new( fctl_mem, COMP_TILE_MAX ) );
     487           0 :   FD_TEST( fctl );
     488           0 :   ulong slow_diag;
     489           0 :   for( ulong i=0UL; i<comp_cnt; i++ ) {
     490           0 :     FD_TEST( fd_fctl_cfg_rx_add( fctl, glob.comp_depth, glob.comp_fseq[i], &slow_diag ) );
     491           0 :   }
     492           0 :   FD_TEST( fd_fctl_cfg_done( fctl, 1UL, 0UL, 0UL, 0UL ) );
     493             : 
     494           0 :   ulong async_min = 1UL<<7;
     495           0 :   ulong async_rem = 1UL; /* Do housekeeping on first iteration */
     496             : 
     497           0 :   ulong in_idx = 0UL;
     498             : 
     499           0 :   _Bool sent_enable = 0;
     500             : 
     501           0 :   FD_LOG_NOTICE(( "Writer running" ));
     502             : 
     503           0 :   for(;;) {
     504             : 
     505           0 :     fd_frag_meta_t * in_mcache = glob.zst_links[ in_idx ].mcache;
     506             : 
     507           0 :     fd_frag_meta_t const * mline;
     508           0 :     ulong                  seq_found;
     509           0 :     long                   diff;
     510             : 
     511           0 :     ulong in_sig;
     512           0 :     ulong in_chunk;
     513           0 :     ulong in_sz;
     514           0 :     ulong in_ctl;
     515           0 :     ulong in_tsorig;
     516           0 :     ulong in_tspub;
     517           0 :     FD_MCACHE_WAIT_REG( in_sig, in_chunk, in_sz, in_ctl, in_tsorig, in_tspub, mline, seq_found, diff, async_rem, in_mcache, depth, in_seqs[ in_idx ] );
     518           0 :     (void)mline; (void)seq_found; (void)in_sz; (void)in_tsorig; (void)in_tspub;
     519             : 
     520           0 :     if( FD_UNLIKELY( !async_rem ) ) {
     521           0 :       if( FD_UNLIKELY( !active_set ) ) break;
     522             : 
     523           0 :       if( !fd_ulong_extract_bit( dirty_set, (int)in_idx ) ) {
     524           0 :         in_idx++;
     525           0 :         if( in_idx>=comp_cnt ) in_idx = 0UL;
     526           0 :       }
     527             : 
     528           0 :       for( ulong i=0UL; i<comp_cnt; i++ ) {
     529           0 :         fd_fctl_rx_cr_return( fseqs[i], in_seqs[i] );
     530           0 :       }
     531           0 :       async_rem = fd_tempo_async_reload( rng, async_min );
     532           0 :       continue;
     533           0 :     }
     534             : 
     535           0 :     if( FD_UNLIKELY( diff>0 ) ) {
     536           0 :       FD_LOG_ERR(( "Overrun while polling" ));
     537           0 :     }
     538           0 :     FD_TEST( diff==0 );
     539             : 
     540           0 :     ulong in_orig = fd_frag_meta_ctl_orig( in_ctl );
     541           0 :     if( FD_UNLIKELY( in_orig ) ) {
     542           0 :       if( in_orig==ORIG_PARA_ENABLE && !sent_enable ) {
     543           0 :         struct __attribute__((packed)) {
     544           0 :           uint  magic;
     545           0 :           uint  frame_sz;
     546           0 :           ulong user;
     547           0 :         } header = {
     548           0 :           .magic    = 0x184D2A50U,
     549           0 :           .frame_sz = 8U,
     550           0 :           .user     = SNAPMK_PARA_ENABLE
     551           0 :         };
     552           0 :         if( FD_UNLIKELY( fwrite( &header, sizeof(header), 1UL, out_file )!=1UL ) ) {
     553           0 :           FD_LOG_ERR(( "fwrite failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     554           0 :         }
     555           0 :         FD_VOLATILE( fseqs[ in_idx ][1] ) = 1UL;
     556           0 :         sent_enable = 1;
     557           0 :       } else if( in_orig==ORIG_PARA_DISABLE ) {
     558           0 :         drain_set = fd_ulong_clear_bit( drain_set, (int)in_idx );
     559           0 :         if( drain_set ) continue;
     560             : 
     561           0 :         struct __attribute__((packed)) {
     562           0 :           uint  magic;
     563           0 :           uint  frame_sz;
     564           0 :           ulong user;
     565           0 :         } header = {
     566           0 :           .magic    = 0x184D2A50U,
     567           0 :           .frame_sz = 8U,
     568           0 :           .user     = SNAPMK_PARA_DISABLE
     569           0 :         };
     570           0 :         if( FD_UNLIKELY( fwrite( &header, sizeof(header), 1UL, out_file )!=1UL ) ) {
     571           0 :           FD_LOG_ERR(( "fwrite failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     572           0 :         }
     573             : 
     574           0 :       } else if( in_orig==ORIG_SHUTDOWN ) {
     575           0 :         FD_TEST( !fd_ulong_extract_bit( dirty_set, (int)in_idx ) );
     576           0 :         active_set = fd_ulong_clear_bit( active_set, (int)in_idx );
     577           0 :       }
     578           0 :       in_seqs[ in_idx ] = fd_seq_inc( in_seqs[ in_idx ], 1UL );
     579           0 :       if( in_orig==ORIG_SHUTDOWN || in_orig==ORIG_PARA_DISABLE ) {
     580           0 :         in_idx++;
     581           0 :         if( in_idx>=comp_cnt ) in_idx = 0UL;
     582           0 :       }
     583           0 :       continue;
     584           0 :     }
     585             : 
     586           0 :     if( in_sig ) {
     587           0 :       void const * in_frag = fd_chunk_to_laddr( wksp, in_chunk );
     588           0 :       ulong wr_sz = fwrite( in_frag, in_sig, 1UL, out_file );
     589           0 :       if( FD_UNLIKELY( wr_sz!=1UL ) ) {
     590           0 :         FD_LOG_ERR(( "fwrite failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     591           0 :       }
     592           0 :     }
     593             : 
     594           0 :     in_seqs[ in_idx ] = fd_seq_inc( in_seqs[ in_idx ], 1UL );
     595             : 
     596           0 :     int eom = fd_frag_meta_ctl_eom( in_ctl );
     597           0 :     if( eom ) {
     598             :       // FD_LOG_NOTICE(( "finished write comp_idx=%lu in_seq=%lu", in_idx, in_seqs[ in_idx ]-1UL ));
     599           0 :       dirty_set = fd_ulong_clear_bit( dirty_set, (int)in_idx );
     600           0 :       in_idx++;
     601           0 :       if( in_idx>=comp_cnt ) in_idx = 0UL;
     602             :       // FD_LOG_NOTICE(( "switching to comp_idx=%lu", in_idx ));
     603           0 :     } else {
     604           0 :       dirty_set = fd_ulong_set_bit( dirty_set, (int)in_idx );
     605           0 :     }
     606           0 :   }
     607             : 
     608           0 :   FD_LOG_NOTICE(( "Writer done" ));
     609             : 
     610           0 :   return 0;
     611           0 : }
     612             : 
     613             : __attribute__((noreturn)) static void
     614           0 : usage( int rc ) {
     615             :   fputs( "Usage: fd_snapmk_para --in FILE.tar --out FILE.tar.zst\n", stderr );
     616           0 :   exit( rc );
     617           0 : }
     618             : 
     619             : int
     620             : main( int     argc,
     621             :       char ** argv ) {
     622             :   if( fd_env_strip_cmdline_contains( &argc, &argv, "--help" ) ) {
     623             :     fputs( "fd_snapmk creates a backwards-compatible Firedancer-optimized Solana snapshot\n", stderr );
     624             :     usage( EXIT_SUCCESS );
     625             :   }
     626             : 
     627             :   fd_boot( &argc, &argv );
     628             : 
     629             :   char const * _page_sz  = fd_env_strip_cmdline_cstr  ( &argc, &argv, "--page-sz",    NULL,      "gigantic" );
     630             :   ulong        page_cnt  = fd_env_strip_cmdline_ulong ( &argc, &argv, "--page-cnt",   NULL,             1UL );
     631             :   ulong        near_cpu  = fd_env_strip_cmdline_ulong ( &argc, &argv, "--near-cpu",   NULL, fd_log_cpu_id() );
     632             :   char const * in_path    = fd_env_strip_cmdline_cstr ( &argc, &argv, "--in",         NULL,            NULL );
     633             :   char const * out_path   = fd_env_strip_cmdline_cstr ( &argc, &argv, "--out",        NULL,            NULL );
     634             :   ulong        frame_sz   = fd_env_strip_cmdline_ulong( &argc, &argv, "--frame-sz",   NULL,      33554432UL );
     635             :   ulong        depth      = fd_env_strip_cmdline_ulong( &argc, &argv, "--depth",      NULL,            32UL );
     636             :   ulong        mtu        = fd_env_strip_cmdline_ulong( &argc, &argv, "--mtu",        NULL,         1UL<<20 );
     637             : 
     638             :   if( FD_UNLIKELY( !in_path ) ) usage( EXIT_FAILURE );
     639             :   if( !out_path ) {
     640             :     ulong in_len = strlen( in_path );
     641             :     if( FD_UNLIKELY( in_len+strlen( ".zst" )+1UL>PATH_MAX ) ) FD_LOG_ERR(( "--in argument is too long" ));
     642             :     static char output_path[ PATH_MAX ];
     643             :     fd_cstr_fini( fd_cstr_append_cstr( fd_cstr_append_text( fd_cstr_init( output_path ), in_path, in_len ), ".zst" ) );
     644             :     out_path = output_path;
     645             :   }
     646             : 
     647             :   if( FD_UNLIKELY( fd_tile_cnt()<3 ) ) FD_LOG_ERR(( "This program requires at least 3 tiles" ));
     648             :   ulong comp_cnt = fd_tile_cnt() - 2UL;
     649             :   comp_cnt = fd_ulong_min( comp_cnt, COMP_TILE_MAX );
     650             : 
     651             :   FILE * in_file = fopen( in_path, "rb" );
     652             :   if( FD_UNLIKELY( !in_file ) ) {
     653             :     FD_LOG_ERR(( "fopen(%s,\"rb\") failed (%i-%s)", in_path, errno, fd_io_strerror( errno ) ));
     654             :   }
     655             :   ulong in_file_sz;
     656             :   if( FD_UNLIKELY( fseek( in_file, 0L, SEEK_END )!=0 ) ) {
     657             :     FD_LOG_ERR(( "fseek(%s,0,SEEK_END) failed (%i-%s)", in_path, errno, fd_io_strerror( errno ) ));
     658             :   }
     659             :   long ftell_res = ftell( in_file );
     660             :   if( FD_UNLIKELY( ftell_res<0L ) ) {
     661             :     FD_LOG_ERR(( "ftell(%s) failed (%i-%s)", in_path, errno, fd_io_strerror( errno ) ));
     662             :   }
     663             :   in_file_sz = (ulong)ftell_res;
     664             :   if( FD_UNLIKELY( fseek( in_file, 0L, SEEK_SET )!=0 ) ) {
     665             :     FD_LOG_ERR(( "fseek(%s,0,SEEK_SET) failed (%i-%s)", in_path, errno, fd_io_strerror( errno ) ));
     666             :   }
     667             :   glob.in_file    = in_file;
     668             :   glob.in_file_sz = in_file_sz;
     669             : 
     670             :   FILE * out_file = fopen( out_path, "wb" );
     671             :   if( FD_UNLIKELY( !out_file ) ) {
     672             :     FD_LOG_ERR(( "fopen(%s,\"wb\") failed (%i-%s)", out_path, errno, fd_io_strerror( errno ) ));
     673             :   }
     674             :   glob.out_file = out_file;
     675             : 
     676             :   struct __attribute__((packed)) {
     677             :     uint  magic;
     678             :     uint  frame_sz;
     679             :     ulong user;
     680             :   } header = {
     681             :     .magic    = 0x184D2A50U,
     682             :     .frame_sz = 8U,
     683             :     .user     = SNAPMK_MAGIC
     684             :   };
     685             :   if( FD_UNLIKELY( fwrite( &header, sizeof(header), 1UL, out_file )!=1UL ) ) {
     686             :     FD_LOG_ERR(( "fwrite header to %s failed (%i-%s)", out_path, errno, fd_io_strerror( errno ) ));
     687             :   }
     688             : 
     689             :   FD_LOG_NOTICE(( "--wksp not specified, using an anonymous local workspace, --page-sz %s, --page-cnt %lu, --near-cpu %lu",
     690             :                   _page_sz, page_cnt, near_cpu ));
     691             :   fd_wksp_t * wksp = fd_wksp_new_anonymous( fd_cstr_to_shmem_page_sz( _page_sz ), page_cnt, near_cpu, "wksp", 0UL );
     692             : 
     693             :   if( FD_UNLIKELY( !wksp ) ) FD_LOG_ERR(( "Unable to attach to wksp" ));
     694             :   glob.wksp       = wksp;
     695             :   glob.comp_cnt   = comp_cnt;
     696             :   glob.comp_depth = depth;
     697             :   glob.frame_sz   = frame_sz;
     698             :   glob.comp_mtu   = mtu;
     699             : 
     700             :   ulong tar_dcache_sz = fd_dcache_req_data_sz( mtu, depth, 1UL, 1 );
     701             :   ulong zst_dcache_sz = fd_dcache_req_data_sz( ZSTD_COMPRESSBOUND( mtu ), depth, 1UL, 1 );
     702             : 
     703             :   for( ulong i=0UL; i<comp_cnt; i++ ) {
     704             :     link_t * tar = &glob.tar_links[i];
     705             :     tar->mcache = fd_mcache_join( fd_mcache_new( fd_wksp_alloc_laddr( wksp, fd_mcache_align(), fd_mcache_footprint( depth, 0UL ), WKSP_TAG ), depth, 0UL, 0UL ) );
     706             :     FD_TEST( tar->mcache );
     707             :     tar->dcache = fd_dcache_join( fd_dcache_new( fd_wksp_alloc_laddr( wksp, fd_dcache_align(), fd_dcache_footprint( tar_dcache_sz, 0UL ), WKSP_TAG ), tar_dcache_sz, 0UL ) );
     708             :     FD_TEST( tar->dcache );
     709             :     tar->chunk0 = fd_dcache_compact_chunk0( wksp, tar->dcache );
     710             :     tar->chunk  = tar->chunk0;
     711             :     tar->wmark  = fd_dcache_compact_wmark ( wksp, tar->dcache, mtu );
     712             : 
     713             :     link_t * zst = &glob.zst_links[i];
     714             :     zst->mcache = fd_mcache_join( fd_mcache_new( fd_wksp_alloc_laddr( wksp, fd_mcache_align(), fd_mcache_footprint( depth, 0UL ), WKSP_TAG ), depth, 0UL, 0UL ) );
     715             :     FD_TEST( zst->mcache );
     716             :     zst->dcache = fd_dcache_join( fd_dcache_new( fd_wksp_alloc_laddr( wksp, fd_dcache_align(), fd_dcache_footprint( zst_dcache_sz, 0UL ), WKSP_TAG ), zst_dcache_sz, 0UL ) );
     717             :     FD_TEST( zst->dcache );
     718             :     zst->chunk0 = fd_dcache_compact_chunk0( wksp, zst->dcache );
     719             :     zst->chunk  = zst->chunk0;
     720             :     zst->wmark  = fd_dcache_compact_wmark ( wksp, zst->dcache, mtu );
     721             : 
     722             :     glob.wr_fseqs[i] = fd_fseq_join( fd_fseq_new( fd_wksp_alloc_laddr( wksp, fd_fseq_align(), fd_fseq_footprint(), WKSP_TAG ), 0UL ) );
     723             :     FD_TEST( glob.wr_fseqs[i] );
     724             : 
     725             :     glob.comp_fseq[i] = fd_fseq_join( fd_fseq_new( fd_wksp_alloc_laddr( wksp, fd_fseq_align(), fd_fseq_footprint(), WKSP_TAG ), 0UL ) );
     726             :     FD_TEST( glob.comp_fseq[i] );
     727             :   }
     728             : 
     729             :   fd_tile_exec_t * wr_exec = fd_tile_exec_new( 1UL, wr_tile_exec, 0, NULL );
     730             :   FD_TEST( wr_exec );
     731             : 
     732             :   fd_tile_exec_t * comp_exec[ COMP_TILE_MAX ];
     733             :   for( ulong i=0UL; i<comp_cnt; i++ ) {
     734             :     comp_exec[ i ] = fd_tile_exec_new( 2UL+i, comp_tile_exec, 0, NULL );
     735             :   }
     736             : 
     737             :   long dt = -fd_log_wallclock();
     738             :   rd_tile_exec( 0, NULL );
     739             : 
     740             :   for( ulong i=0UL; i<comp_cnt; i++ ) {
     741             :     FD_TEST( !fd_tile_exec_delete( comp_exec[ i ], NULL ) );
     742             :   }
     743             :   FD_TEST( !fd_tile_exec_delete( wr_exec, NULL ) );
     744             : 
     745             :   if( FD_UNLIKELY( 0!=fclose( in_file ) ) ) {
     746             :     FD_LOG_ERR(( "fclose(%s) failed (%i-%s)", in_path, errno, fd_io_strerror( errno ) ));
     747             :   }
     748             : 
     749             :   ftell_res = ftell( out_file );
     750             :   if( FD_UNLIKELY( ftell_res<0L ) ) {
     751             :     FD_LOG_ERR(( "ftell(%s) failed (%i-%s)", out_path, errno, fd_io_strerror( errno ) ));
     752             :   }
     753             :   ulong out_file_sz = (ulong)ftell_res;
     754             : 
     755             :   if( FD_UNLIKELY( 0!=fclose( out_file ) ) ) {
     756             :     FD_LOG_ERR(( "fclose(%s) failed (%i-%s)", out_path, errno, fd_io_strerror( errno ) ));
     757             :   }
     758             :   dt += fd_log_wallclock();
     759             :   FD_LOG_NOTICE(( "Compressed %.3f GiB to %.3f GiB in %.3f s (%.3f GB/s, ratio %.2f)",
     760             :                   (double)in_file_sz/(double)(1UL<<30),
     761             :                   (double)out_file_sz/(double)(1UL<<30),
     762             :                   (double)dt/1e9,
     763             :                   ((double)in_file_sz)/((double)dt),
     764             :                   (double)in_file_sz/(double)out_file_sz ));
     765             : 
     766             :   fd_wksp_delete_anonymous( wksp );
     767             : 
     768             :   fd_halt();
     769             :   return 0;
     770             : }

Generated by: LCOV version 1.14