LCOV - code coverage report
Current view: top level - discof/batch - fd_batch_tile.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 327 0.0 %
Date: 2025-03-20 12:08:36 Functions: 0 11 0.0 %

          Line data    Source code
       1             : #include "../../disco/topo/fd_topo.h"
       2             : #include "../../disco/topo/fd_pod_format.h"
       3             : #include "../../funk/fd_funk.h"
       4             : #include "../../funk/fd_funk_filemap.h"
       5             : #include "../../flamenco/runtime/fd_hashes.h"
       6             : #include "../../flamenco/runtime/fd_txncache.h"
       7             : #include "../../flamenco/snapshot/fd_snapshot_create.h"
       8             : #include "../../flamenco/runtime/fd_runtime.h"
       9             : 
      10             : #include "generated/fd_batch_tile_seccomp.h"
      11             : 
      12             : #include <errno.h>
      13             : #include <unistd.h>
      14             : 
      15           0 : #define REPLAY_OUT_IDX     (0UL)
      16           0 : #define EAH_REPLAY_OUT_SIG (0UL)
      17             : 
      18           0 : #define MEM_FOOTPRINT      (8UL<<30)
      19             : 
      20             : struct fd_snapshot_tile_ctx {
      21             :   /* User defined parameters. */
      22             :   ulong           full_interval;
      23             :   ulong           incremental_interval;
      24             :   char const    * out_dir;
      25             :   char            funk_file[ PATH_MAX ];
      26             : 
      27             :   /* Shared data structures. */
      28             :   fd_txncache_t * status_cache;
      29             :   ulong         * is_constipated;
      30             :   fd_funk_t     * funk;
      31             : 
      32             :   /* File descriptors used for snapshot generation. */
      33             :   int             tmp_fd;
      34             :   int             tmp_inc_fd;
      35             :   int             full_snapshot_fd;
      36             :   int             incremental_snapshot_fd;
      37             : 
      38             :   /* Thread pool used for account hash calculation. */
      39             :   uchar           tpool_mem[ FD_TPOOL_FOOTPRINT( FD_TILE_MAX ) ] __attribute__( ( aligned( FD_TPOOL_ALIGN ) ) );
      40             :   fd_tpool_t *    tpool;
      41             : 
      42             :   /* Only join funk after tiles start spinning. */
      43             :   int             is_funk_active;
      44             : 
      45             :   /* Metadata from the full snapshot used for incremental snapshots. */
      46             :   ulong           last_full_snap_slot;
      47             :   fd_hash_t       last_hash;
      48             :   ulong           last_capitalization;
      49             : 
      50             :   /* Replay out link fields for epoch account hash. */
      51             :   fd_wksp_t *     replay_out_mem;
      52             :   ulong           replay_out_chunk;
      53             : 
      54             :   fd_wksp_t  *    replay_public_wksp;
      55             :   fd_runtime_public_t * replay_public;
      56             : 
      57             :   /* Bump allocator */
      58             :   fd_spad_t *     spad;
      59             : };
      60             : typedef struct fd_snapshot_tile_ctx fd_snapshot_tile_ctx_t;
      61             : 
      62             : void
      63           0 : tpool_batch_boot( fd_topo_t * topo, ulong total_thread_count ) {
      64           0 :   ushort tile_to_cpu[ FD_TILE_MAX ] = {0};
      65           0 :   ulong thread_count                = 0UL;
      66           0 :   ulong main_thread_seen            = 0UL;
      67             : 
      68           0 :   for( ulong i=0UL; i<topo->tile_cnt; i++ ) {
      69           0 :     if( strcmp( topo->tiles[i].name, "btpool" ) == 0 ) {
      70           0 :       tile_to_cpu[ 1+thread_count ] = (ushort)topo->tiles[i].cpu_idx;
      71           0 :       thread_count++;
      72           0 :     }
      73           0 :     if( strcmp( topo->tiles[i].name, "batch" ) == 0 ) {
      74           0 :       tile_to_cpu[ 0 ] = (ushort)topo->tiles[i].cpu_idx;
      75           0 :       main_thread_seen = 1;
      76           0 :     }
      77           0 :   }
      78             : 
      79           0 :   if( main_thread_seen ) {
      80           0 :     thread_count++;
      81           0 :   }
      82             : 
      83           0 :   if( thread_count != total_thread_count )
      84           0 :     FD_LOG_ERR(( "thread count mismatch thread_count=%lu total_thread_count=%lu main_thread_seen=%lu",
      85           0 :                  thread_count,
      86           0 :                  total_thread_count,
      87           0 :                  main_thread_seen ));
      88             : 
      89           0 :   fd_tile_private_map_boot( tile_to_cpu, thread_count );
      90           0 : }
      91             : 
      92             : FD_FN_CONST static inline ulong
      93           0 : scratch_align( void ) {
      94           0 :   return 128UL;
      95           0 : }
      96             : 
      97             : FD_FN_PURE static inline ulong
      98           0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
      99           0 :   ulong l = FD_LAYOUT_INIT;
     100           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_snapshot_tile_ctx_t), sizeof(fd_snapshot_tile_ctx_t) );
     101           0 :   l = FD_LAYOUT_APPEND( l, fd_spad_align(), fd_ulong_align_up( MEM_FOOTPRINT, fd_spad_align() ) );
     102           0 :   return FD_LAYOUT_FINI( l, scratch_align() );
     103           0 : }
     104             : 
     105             : static void
     106             : privileged_init( fd_topo_t      * topo FD_PARAM_UNUSED,
     107           0 :                  fd_topo_tile_t * tile ) {
     108             : 
     109             :   /* First open the relevant files here. TODO: We eventually want to extend
     110             :      this to support multiple files. */
     111             : 
     112           0 :   char tmp_dir_buf[ FD_SNAPSHOT_DIR_MAX ];
     113           0 :   int err = snprintf( tmp_dir_buf, FD_SNAPSHOT_DIR_MAX, "%s/%s", tile->batch.out_dir, FD_SNAPSHOT_TMP_ARCHIVE );
     114           0 :   if( FD_UNLIKELY( err<0 ) ) {
     115           0 :     FD_LOG_ERR(( "Failed to format directory string" ));
     116           0 :   }
     117             : 
     118           0 :   char tmp_inc_dir_buf[ FD_SNAPSHOT_DIR_MAX ];
     119           0 :   err = snprintf( tmp_inc_dir_buf, FD_SNAPSHOT_DIR_MAX, "%s/%s", tile->batch.out_dir, FD_SNAPSHOT_TMP_INCR_ARCHIVE );
     120           0 :   if( FD_UNLIKELY( err<0 ) ) {
     121           0 :     FD_LOG_ERR(( "Failed to format directory string" ));
     122           0 :   }
     123             : 
     124           0 :   char zstd_dir_buf[ FD_SNAPSHOT_DIR_MAX ];
     125           0 :   err = snprintf( zstd_dir_buf, FD_SNAPSHOT_DIR_MAX, "%s/%s", tile->batch.out_dir, FD_SNAPSHOT_TMP_FULL_ARCHIVE_ZSTD );
     126           0 :   if( FD_UNLIKELY( err<0 ) ) {
     127           0 :     FD_LOG_ERR(( "Failed to format directory string" ));
     128           0 :   }
     129             : 
     130           0 :   char zstd_inc_dir_buf[ FD_SNAPSHOT_DIR_MAX ];
     131           0 :   err = snprintf( zstd_inc_dir_buf, FD_SNAPSHOT_DIR_MAX, "%s/%s", tile->batch.out_dir, FD_SNAPSHOT_TMP_INCR_ARCHIVE_ZSTD );
     132           0 :   if( FD_UNLIKELY( err<0 ) ) {
     133           0 :     FD_LOG_ERR(( "Failed to format directory string" ));
     134           0 :   }
     135             : 
     136             :   /* Create and open the relevant files for snapshots. */
     137             : 
     138           0 :   tile->batch.tmp_fd = open( tmp_dir_buf, O_CREAT | O_RDWR | O_TRUNC, 0644 );
     139           0 :   if( FD_UNLIKELY( tile->batch.tmp_fd==-1 ) ) {
     140           0 :     FD_LOG_ERR(( "Failed to open and create tarball for file=%s (%i-%s)", tmp_dir_buf, errno, fd_io_strerror( errno ) ));
     141           0 :   }
     142             : 
     143           0 :   tile->batch.tmp_inc_fd = open( tmp_inc_dir_buf, O_CREAT | O_RDWR | O_TRUNC, 0644 );
     144           0 :   if( FD_UNLIKELY( tile->batch.tmp_inc_fd==-1 ) ) {
     145           0 :     FD_LOG_ERR(( "Failed to open and create tarball for file=%s (%i-%s)", tmp_inc_dir_buf, errno, fd_io_strerror( errno ) ));
     146           0 :   }
     147             : 
     148           0 :   tile->batch.full_snapshot_fd = open( zstd_dir_buf, O_RDWR | O_CREAT | O_TRUNC, 0644 );
     149           0 :   if( FD_UNLIKELY( tile->batch.full_snapshot_fd==-1 ) ) {
     150           0 :     FD_LOG_WARNING(( "Failed to open the snapshot file (%i-%s)", errno, fd_io_strerror( errno ) ));
     151           0 :   }
     152             : 
     153           0 :   tile->batch.incremental_snapshot_fd = open( zstd_inc_dir_buf, O_RDWR | O_CREAT | O_TRUNC, 0644 );
     154           0 :   if( FD_UNLIKELY( tile->batch.incremental_snapshot_fd==-1 ) ) {
     155           0 :     FD_LOG_WARNING(( "Failed to open the snapshot file (%i-%s)", errno, fd_io_strerror( errno ) ));
     156           0 :   }
     157           0 : }
     158             : 
     159             : static void
     160             : unprivileged_init( fd_topo_t      * topo,
     161           0 :                    fd_topo_tile_t * tile ) {
     162             : 
     163           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     164             : 
     165           0 :   if( FD_UNLIKELY( tile->out_cnt!=1UL || strcmp( topo->links[ tile->out_link_id[ REPLAY_OUT_IDX ] ].name, "batch_replay" ) ) ) {
     166           0 :     FD_LOG_ERR(( "batch tile has none or unexpected output links %lu %s",
     167           0 :                  tile->out_cnt, topo->links[ tile->out_link_id[ REPLAY_OUT_IDX ] ].name ));
     168           0 :   }
     169             : 
     170             :   /**********************************************************************/
     171             :   /* scratch (bump)-allocate memory owned by the replay tile            */
     172             :   /**********************************************************************/
     173             : 
     174             :   /* Do not modify order! This is join-order in unprivileged_init. */
     175             : 
     176           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     177           0 :   fd_snapshot_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapshot_tile_ctx_t), sizeof(fd_snapshot_tile_ctx_t) );
     178           0 :   memset( ctx, 0, sizeof(fd_snapshot_tile_ctx_t) );
     179           0 :   void * spad_mem            = FD_SCRATCH_ALLOC_APPEND( l, fd_spad_align(), fd_ulong_align_up( MEM_FOOTPRINT, fd_spad_align() ) );
     180           0 :   ulong  scratch_alloc_mem   = FD_SCRATCH_ALLOC_FINI  ( l, scratch_align() );
     181             : 
     182           0 :   if( FD_UNLIKELY( scratch_alloc_mem > (ulong)scratch + scratch_footprint(tile) ) ) {
     183           0 :     FD_LOG_ERR(( "scratch overflow" ));
     184           0 :   }
     185             : 
     186           0 :   ctx->full_interval           = tile->batch.full_interval;
     187           0 :   ctx->incremental_interval    = tile->batch.incremental_interval;
     188           0 :   ctx->out_dir                 = tile->batch.out_dir;
     189           0 :   ctx->tmp_fd                  = tile->batch.tmp_fd;
     190           0 :   ctx->tmp_inc_fd              = tile->batch.tmp_inc_fd;
     191           0 :   ctx->full_snapshot_fd        = tile->batch.full_snapshot_fd;
     192           0 :   ctx->incremental_snapshot_fd = tile->batch.incremental_snapshot_fd;
     193             : 
     194             :   /**********************************************************************/
     195             :   /* tpool                                                              */
     196             :   /**********************************************************************/
     197             : 
     198           0 :   FD_LOG_NOTICE(( "Number of threads in hash tpool: %lu", tile->batch.hash_tpool_thread_count ));
     199             : 
     200           0 :   if( FD_LIKELY( tile->batch.hash_tpool_thread_count>1UL ) ) {
     201           0 :     tpool_batch_boot( topo, tile->batch.hash_tpool_thread_count );
     202           0 :     ctx->tpool = fd_tpool_init( ctx->tpool_mem, tile->batch.hash_tpool_thread_count );
     203           0 :   } else {
     204           0 :     ctx->tpool = NULL;
     205           0 :   }
     206             : 
     207           0 :   if( FD_LIKELY( tile->batch.hash_tpool_thread_count>1UL ) ) {
     208             :     /* Start the tpool workers */
     209           0 :     for( ulong i=1UL; i<tile->batch.hash_tpool_thread_count; i++ ) {
     210           0 :       if( FD_UNLIKELY( !fd_tpool_worker_push( ctx->tpool, i, NULL, 0UL ) ) ) {
     211           0 :         FD_LOG_ERR(( "failed to launch worker" ));
     212           0 :       }
     213           0 :     }
     214           0 :   }
     215             : 
     216             :   /**********************************************************************/
     217             :   /* spads                                                              */
     218             :   /**********************************************************************/
     219             :   /* FIXME: Define a bound for the size of the spad. It likely needs to be
     220             :      larger than this. */
     221           0 :   uchar * spad_mem_cur = spad_mem;
     222           0 :   ctx->spad = fd_spad_join( fd_spad_new( spad_mem_cur, MEM_FOOTPRINT ) );
     223             : 
     224             :   /**********************************************************************/
     225             :   /* funk                                                               */
     226             :   /**********************************************************************/
     227             : 
     228             :   /* We only want to join funk after it has been setup and joined in the
     229             :      replay tile.
     230             :      TODO: Eventually funk will be joined via a shared topology object. */
     231           0 :   ctx->is_funk_active = 0;
     232           0 :   memcpy( ctx->funk_file, tile->replay.funk_file, sizeof(tile->replay.funk_file) );
     233             : 
     234             :   /**********************************************************************/
     235             :   /* status cache                                                       */
     236             :   /**********************************************************************/
     237             : 
     238           0 :   ulong status_cache_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "txncache" );
     239           0 :   if( FD_UNLIKELY( status_cache_obj_id==ULONG_MAX ) ) {
     240           0 :     FD_LOG_ERR(( "no status cache object id" ));
     241           0 :   }
     242             : 
     243           0 :   ctx->status_cache = fd_txncache_join( fd_topo_obj_laddr( topo, status_cache_obj_id ) );
     244           0 :   if( FD_UNLIKELY( !ctx->status_cache ) ) {
     245           0 :     FD_LOG_ERR(( "no status cache" ));
     246           0 :   }
     247             : 
     248             :   /**********************************************************************/
     249             :   /* constipated fseq                                                   */
     250             :   /**********************************************************************/
     251             : 
     252           0 :   ulong constipated_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "constipate" );
     253           0 :   if( FD_UNLIKELY( constipated_obj_id==ULONG_MAX ) ) {
     254           0 :     FD_LOG_ERR(( "no constipated object id" ));
     255           0 :   }
     256           0 :   ctx->is_constipated = fd_fseq_join( fd_topo_obj_laddr( topo, constipated_obj_id ) );
     257           0 :   if( FD_UNLIKELY( !ctx->is_constipated ) ) {
     258           0 :     FD_LOG_ERR(( "replay tile has no constipated fseq" ));
     259           0 :   }
     260           0 :   fd_fseq_update( ctx->is_constipated, 0UL );
     261           0 :   if( FD_UNLIKELY( fd_fseq_query( ctx->is_constipated ) ) ) {
     262           0 :     FD_LOG_ERR(( "constipated fseq is not zero" ));
     263           0 :   }
     264             : 
     265             :   /**********************************************************************/
     266             :   /* snapshot                                                           */
     267             :   /**********************************************************************/
     268             : 
     269             :   /* Zero init all of the fields used for incremental snapshot generation
     270             :      that must be persisted across snapshot creation. */
     271             : 
     272           0 :   ctx->last_full_snap_slot = 0UL;
     273           0 :   ctx->last_capitalization = 0UL;
     274           0 :   fd_memset( &ctx->last_hash, 0, sizeof(fd_hash_t) );
     275             : 
     276             :   /****************************************************************************/
     277             :   /* Replay Tile Link                                                         */
     278             :   /****************************************************************************/
     279             : 
     280             :   /* Set up replay output */
     281           0 :   fd_topo_link_t * replay_out = &topo->links[ tile->out_link_id[ REPLAY_OUT_IDX ] ];
     282           0 :   ctx->replay_out_mem         = topo->workspaces[ topo->objs[ replay_out->dcache_obj_id ].wksp_id ].wksp;
     283           0 :   ctx->replay_out_chunk       = fd_dcache_compact_chunk0( ctx->replay_out_mem, replay_out->dcache );;
     284             : 
     285             :   /* replay public setup */
     286           0 :   ulong replay_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "replay_pub" );
     287           0 :   FD_TEST( replay_obj_id!=ULONG_MAX );
     288           0 :   ctx->replay_public_wksp = topo->workspaces[ topo->objs[ replay_obj_id ].wksp_id ].wksp;
     289             : 
     290           0 :   if( ctx->replay_public_wksp==NULL ) {
     291           0 :     FD_LOG_ERR(( "no replay_public workspace" ));
     292           0 :   }
     293             : 
     294           0 :   ctx->replay_public = fd_runtime_public_join( fd_topo_obj_laddr( topo, replay_obj_id ) );
     295           0 :   FD_TEST( ctx->replay_public!=NULL );
     296           0 : }
     297             : 
     298             : static void
     299           0 : produce_snapshot( fd_snapshot_tile_ctx_t * ctx, ulong batch_fseq ) {
     300             : 
     301           0 :   ulong is_incremental = fd_batch_fseq_is_incremental( batch_fseq );
     302           0 :   ulong snapshot_slot  = fd_batch_fseq_get_slot( batch_fseq );
     303             : 
     304           0 :   if( !is_incremental ) {
     305           0 :     ctx->last_full_snap_slot = snapshot_slot;
     306           0 :   }
     307             : 
     308           0 :   FD_LOG_WARNING(( "Creating snapshot incremental=%lu slot=%lu", is_incremental, snapshot_slot ));
     309             : 
     310           0 :   fd_snapshot_ctx_t snapshot_ctx = {
     311           0 :     .features                 = &ctx->replay_public->features,
     312           0 :     .slot                     = snapshot_slot,
     313           0 :     .out_dir                  = ctx->out_dir,
     314           0 :     .is_incremental           = (uchar)is_incremental,
     315           0 :     .funk                     = ctx->funk,
     316           0 :     .status_cache             = ctx->status_cache,
     317           0 :     .tmp_fd                   = is_incremental ? ctx->tmp_inc_fd              : ctx->tmp_fd,
     318           0 :     .snapshot_fd              = is_incremental ? ctx->incremental_snapshot_fd : ctx->full_snapshot_fd,
     319           0 :     .tpool                    = ctx->tpool,
     320             :     /* These parameters are ignored if the snapshot is not incremental. */
     321           0 :     .last_snap_slot           = ctx->last_full_snap_slot,
     322           0 :     .last_snap_acc_hash       = &ctx->last_hash,
     323           0 :     .last_snap_capitalization = ctx->last_capitalization,
     324           0 :     .spad                     = ctx->spad
     325           0 :   };
     326             : 
     327             :   /* If this isn't the first snapshot that this tile is creating, the
     328             :       permissions should be made to not acessible by users and should be
     329             :       renamed to the constant file that is expected. */
     330             : 
     331           0 :   char proc_filename[ FD_SNAPSHOT_DIR_MAX ];
     332           0 :   char prev_filename[ FD_SNAPSHOT_DIR_MAX ];
     333           0 :   char new_filename [ FD_SNAPSHOT_DIR_MAX ];
     334           0 :   snprintf( proc_filename, FD_SNAPSHOT_DIR_MAX, "/proc/self/fd/%d", is_incremental ? ctx->incremental_snapshot_fd : ctx->full_snapshot_fd );
     335           0 :   long len = readlink( proc_filename, prev_filename, FD_SNAPSHOT_DIR_MAX );
     336           0 :   if( FD_UNLIKELY( len<=0L ) ) {
     337           0 :     FD_LOG_ERR(( "Failed to readlink the snapshot file" ));
     338           0 :   }
     339           0 :   prev_filename[ len ] = '\0';
     340             : 
     341           0 :   int err = snprintf( new_filename, FD_SNAPSHOT_DIR_MAX, "%s/%s", ctx->out_dir, is_incremental ? FD_SNAPSHOT_TMP_INCR_ARCHIVE_ZSTD : FD_SNAPSHOT_TMP_FULL_ARCHIVE_ZSTD );
     342           0 :   if( FD_UNLIKELY( err<0 ) ) {
     343           0 :     FD_LOG_ERR(( "Can't format filename" ));
     344           0 :     return;
     345           0 :   }
     346             : 
     347           0 :   err = rename( prev_filename, new_filename );
     348           0 :   if( FD_UNLIKELY( err ) ) {
     349           0 :     FD_LOG_ERR(( "Failed to rename file from %s to %s", prev_filename, new_filename ));
     350           0 :   }
     351           0 :   FD_LOG_NOTICE(( "Renaming file from %s to %s", prev_filename, new_filename ));
     352             : 
     353           0 :   err = ftruncate( snapshot_ctx.tmp_fd, 0UL );
     354           0 :   if( FD_UNLIKELY( err==-1 ) ) {
     355           0 :     FD_LOG_ERR(( "Failed to truncate the temporary file (%i-%s)", errno, fd_io_strerror( errno ) ));
     356           0 :   }
     357             : 
     358           0 :   err = ftruncate( snapshot_ctx.snapshot_fd, 0UL );
     359           0 :   if( FD_UNLIKELY( err==-1 ) ) {
     360           0 :     FD_LOG_ERR(( "Failed to truncate the snapshot file (%i-%s)", errno, fd_io_strerror( errno ) ));
     361           0 :   }
     362             : 
     363           0 :   long seek = lseek( snapshot_ctx.tmp_fd, 0UL, SEEK_SET );
     364           0 :   if( FD_UNLIKELY( seek!=0L ) ) {
     365           0 :     FD_LOG_ERR(( "Failed to seek to the beginning of the file" ));
     366           0 :   }
     367             : 
     368             :   /* Now that the files are in an expected state, create the snapshot. */
     369           0 :   FD_SPAD_FRAME_BEGIN( snapshot_ctx.spad ) {
     370           0 :     fd_snapshot_create_new_snapshot( &snapshot_ctx, &ctx->last_hash, &ctx->last_capitalization );
     371           0 :   } FD_SPAD_FRAME_END;
     372             : 
     373           0 :   if( is_incremental ) {
     374           0 :     FD_LOG_NOTICE(( "Done creating a snapshot in %s", snapshot_ctx.out_dir ));
     375           0 :     FD_LOG_ERR(("Successful exit" ));
     376           0 :   }
     377             : 
     378           0 :   FD_LOG_NOTICE(( "Done creating a snapshot in %s", snapshot_ctx.out_dir ));
     379             : 
     380             :   /* At this point the snapshot has been succesfully created, so we can
     381             :      unconstipate funk and any related data structures in the replay tile. */
     382             : 
     383           0 :   fd_fseq_update( ctx->is_constipated, 0UL );
     384             : 
     385           0 : }
     386             : 
     387             : static fd_funk_txn_t*
     388           0 : get_eah_txn( fd_funk_t * funk, ulong slot ) {
     389             : 
     390           0 :   fd_funk_txn_t * txn_map = fd_funk_txn_map( funk, fd_funk_wksp( funk ) );
     391           0 :   for( fd_funk_txn_map_iter_t iter = fd_funk_txn_map_iter_init( txn_map );
     392           0 :          !fd_funk_txn_map_iter_done( txn_map, iter );
     393           0 :          iter = fd_funk_txn_map_iter_next( txn_map, iter ) ) {
     394           0 :     fd_funk_txn_t * txn = fd_funk_txn_map_iter_ele( txn_map, iter );
     395           0 :     if( txn->xid.ul[0]==slot ) {
     396           0 :       FD_LOG_NOTICE(( "Found transaction for eah" ));
     397           0 :       return txn;
     398           0 :     }
     399           0 :   }
     400           0 :   FD_LOG_NOTICE(( "Calculating eah from root" ));
     401           0 :   return NULL;
     402           0 : }
     403             : 
     404             : static void
     405           0 : produce_eah( fd_snapshot_tile_ctx_t * ctx, fd_stem_context_t * stem, ulong batch_fseq ) {
     406           0 :   ulong eah_slot = fd_batch_fseq_get_slot( batch_fseq );
     407             : 
     408           0 :   if( FD_FEATURE_ACTIVE_( eah_slot, ctx->replay_public->features, accounts_lt_hash ) )
     409           0 :     return;
     410             : 
     411           0 :   ulong tsorig = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
     412             : 
     413           0 :   FD_LOG_WARNING(( "Begining to produce epoch account hash in background for slot=%lu", eah_slot ));
     414             : 
     415             :   /* TODO: Perhaps it makes sense to factor this out into a function in the
     416             :      runtime as this could technically be considered a layering violation. */
     417             : 
     418             :   /* First, we must retrieve the corresponding slot_bank. We have the guarantee
     419             :      that the root record is frozen from the replay tile. */
     420             : 
     421           0 :   fd_funk_t *           funk     = ctx->funk;
     422           0 :   fd_funk_txn_t *       eah_txn  = get_eah_txn( funk, eah_slot );
     423           0 :   fd_funk_rec_key_t     slot_id  = fd_runtime_slot_bank_key();
     424           0 :   fd_funk_rec_t const * slot_rec = fd_funk_rec_query( funk, eah_txn, &slot_id );
     425           0 :   if( FD_UNLIKELY( !slot_rec ) ) {
     426           0 :     FD_LOG_ERR(( "Failed to read slot bank record: missing record" ));
     427           0 :   }
     428           0 :   void * slot_val = fd_funk_val( slot_rec, fd_funk_wksp( funk ) );
     429             : 
     430           0 :   if( FD_UNLIKELY( fd_funk_val_sz( slot_rec )<sizeof(uint) ) ) {
     431           0 :     FD_LOG_ERR(( "Failed to read slot bank record: empty record" ));
     432           0 :   }
     433             : 
     434           0 :   uint slot_magic = *(uint*)slot_val;
     435           0 :   FD_SPAD_FRAME_BEGIN( ctx->spad ) {
     436           0 :     fd_bincode_decode_ctx_t slot_decode_ctx = {
     437           0 :       .data    = (uchar*)slot_val + sizeof(uint),
     438           0 :       .dataend = (uchar*)slot_val + fd_funk_val_sz( slot_rec )
     439           0 :     };
     440             : 
     441           0 :     if( FD_UNLIKELY( slot_magic!=FD_RUNTIME_ENC_BINCODE ) ) {
     442           0 :       FD_LOG_ERR(( "Slot bank record has wrong magic" ));
     443           0 :     }
     444             : 
     445           0 :     ulong total_sz = 0UL;
     446           0 :     int   err      = fd_slot_bank_decode_footprint( &slot_decode_ctx, &total_sz );
     447           0 :     if( FD_UNLIKELY( err ) ) {
     448           0 :       FD_LOG_ERR(( "Failed to read slot bank record: invalid decode" ));
     449           0 :     }
     450             : 
     451           0 :     uchar * mem = fd_spad_alloc( ctx->spad, fd_slot_bank_align(), total_sz );
     452           0 :     if( FD_UNLIKELY( !mem ) ) {
     453           0 :       FD_LOG_ERR(( "Failed to read slot bank record: unable to allocate memory" ));
     454           0 :     }
     455             : 
     456           0 :     fd_slot_bank_t * slot_bank = fd_slot_bank_decode( mem, &slot_decode_ctx );
     457             : 
     458             :     /* At this point, calculate the epoch account hash. */
     459             : 
     460           0 :     fd_hash_t epoch_account_hash = {0};
     461             : 
     462           0 :     fd_accounts_hash( funk, slot_bank, ctx->tpool, &epoch_account_hash, ctx->spad, 0, &ctx->replay_public->features );
     463             : 
     464           0 :     FD_LOG_NOTICE(( "Done computing epoch account hash (%s)", FD_BASE58_ENC_32_ALLOCA( &epoch_account_hash ) ));
     465             : 
     466             :     /* Once the hash is calculated, we are ready to push the computed hash
     467             :        onto the out link to replay. We don't need to add any other information
     468             :        as this is the only type of message that is transmitted. */
     469             : 
     470           0 :     uchar * out_buf = fd_chunk_to_laddr( ctx->replay_out_mem, ctx->replay_out_chunk );
     471           0 :     fd_memcpy( out_buf, epoch_account_hash.uc, sizeof(fd_hash_t) );
     472           0 :     ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
     473           0 :     fd_stem_publish( stem, 0UL, EAH_REPLAY_OUT_SIG, ctx->replay_out_chunk, sizeof(fd_hash_t), 0UL, tsorig, tspub );
     474             : 
     475             :     /* Reset the fseq allowing for the un-constipation of funk and allow for
     476             :        snapshots to be created again. */
     477             : 
     478           0 :     fd_fseq_update( ctx->is_constipated, 0UL );
     479           0 :   } FD_SPAD_FRAME_END;
     480           0 : }
     481             : 
     482             : static void
     483             : after_credit( fd_snapshot_tile_ctx_t * ctx,
     484             :               fd_stem_context_t *      stem,
     485             :               int *                    opt_poll_in FD_PARAM_UNUSED,
     486           0 :               int *                    charge_busy FD_PARAM_UNUSED ) {
     487             : 
     488           0 :   ulong batch_fseq = fd_fseq_query( ctx->is_constipated );
     489             : 
     490             :   /* If batch_fseq == 0, this means that we don't want to calculate/produce
     491             :      anything. Keep this tile spinning. */
     492           0 :   if( !batch_fseq ) {
     493           0 :     return;
     494           0 :   }
     495             : 
     496           0 :   if( FD_UNLIKELY( !ctx->is_funk_active ) ) {
     497             :     /* Setting these parameters are not required because we are joining the
     498             :        funk that was setup in the replay tile. */
     499           0 :     ctx->funk = fd_funk_open_file( ctx->funk_file,
     500           0 :                                    1UL,
     501           0 :                                    0UL,
     502           0 :                                    0UL,
     503           0 :                                    0UL,
     504           0 :                                    0UL,
     505           0 :                                    FD_FUNK_READ_WRITE,
     506           0 :                                    NULL );
     507           0 :     if( FD_UNLIKELY( !ctx->funk ) ) {
     508           0 :       FD_LOG_ERR(( "failed to join a funky" ));
     509           0 :     }
     510           0 :     ctx->is_funk_active = 1;
     511             : 
     512           0 :     FD_LOG_WARNING(( "Just joined funk at file=%s", ctx->funk_file ));
     513           0 :   }
     514             : 
     515           0 :   if( fd_batch_fseq_is_snapshot( batch_fseq ) ) {
     516           0 :     produce_snapshot( ctx, batch_fseq );
     517           0 :   } else {
     518             :     // We need features to disable this...
     519           0 :     produce_eah( ctx, stem, batch_fseq );
     520           0 :   }
     521           0 : }
     522             : 
     523             : static ulong
     524             : populate_allowed_seccomp( fd_topo_t const *      topo,
     525             :                           fd_topo_tile_t const * tile,
     526             :                           ulong                  out_cnt,
     527           0 :                           struct sock_filter *   out ) {
     528           0 :   (void)topo;
     529             : 
     530           0 :   populate_sock_filter_policy_fd_batch_tile( out_cnt,
     531           0 :                                              out,
     532           0 :                                              (uint)fd_log_private_logfile_fd(),
     533           0 :                                              (uint)tile->batch.tmp_fd,
     534           0 :                                              (uint)tile->batch.tmp_inc_fd,
     535           0 :                                              (uint)tile->batch.full_snapshot_fd,
     536           0 :                                              (uint)tile->batch.incremental_snapshot_fd );
     537           0 :   return sock_filter_policy_fd_batch_tile_instr_cnt;
     538           0 : }
     539             : 
     540             : static ulong
     541             : populate_allowed_fds( fd_topo_t const *      topo,
     542             :                       fd_topo_tile_t const * tile,
     543             :                       ulong                  out_fds_cnt,
     544           0 :                       int *                  out_fds ) {
     545           0 :   (void)topo;
     546             : 
     547           0 :   if( FD_UNLIKELY( out_fds_cnt<2UL ) ) {
     548           0 :     FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
     549           0 :   }
     550             : 
     551           0 :   ulong out_cnt = 0UL;
     552           0 :   out_fds[ out_cnt++ ] = 2; /* stderr */
     553           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
     554           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
     555             : 
     556           0 :   out_fds[ out_cnt++ ] = tile->batch.tmp_fd;
     557           0 :   out_fds[ out_cnt++ ] = tile->batch.tmp_inc_fd;
     558           0 :   out_fds[ out_cnt++ ] = tile->batch.full_snapshot_fd;
     559           0 :   out_fds[ out_cnt++ ] = tile->batch.incremental_snapshot_fd;
     560           0 :   return out_cnt;
     561           0 : }
     562             : 
     563           0 : #define STEM_BURST (1UL)
     564             : 
     565           0 : #define STEM_CALLBACK_CONTEXT_TYPE          fd_snapshot_tile_ctx_t
     566           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapshot_tile_ctx_t)
     567             : 
     568           0 : #define STEM_CALLBACK_AFTER_CREDIT        after_credit
     569             : 
     570             : #include "../../disco/stem/fd_stem.c"
     571             : 
     572             : fd_topo_run_tile_t fd_tile_batch = {
     573             :   .name                     = "batch",
     574             :   .populate_allowed_seccomp = populate_allowed_seccomp,
     575             :   .populate_allowed_fds     = populate_allowed_fds,
     576             :   .scratch_align            = scratch_align,
     577             :   .scratch_footprint        = scratch_footprint,
     578             :   .privileged_init          = privileged_init,
     579             :   .unprivileged_init        = unprivileged_init,
     580             :   .run                      = stem_run,
     581             : };

Generated by: LCOV version 1.14