LCOV - code coverage report
Current view: top level - discof/geyser - fd_geyser.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 180 0.0 %
Date: 2025-05-15 12:12:48 Functions: 0 14 0.0 %

          Line data    Source code
       1             : 
       2             : #include "fd_geyser.h"
       3             : 
       4             : #include "../../funk/fd_funk_filemap.h"
       5             : #include "../../tango/mcache/fd_mcache.h"
       6             : #include "../../flamenco/runtime/fd_acc_mgr.h"
       7             : #include "../../util/wksp/fd_wksp_private.h"
       8             : #include "../../disco/topo/fd_topo.h"
       9             : 
      10             : #include <stdio.h>
      11             : #include <stdlib.h>
      12             : #include <signal.h>
      13             : #include <errno.h>
      14             : #include <unistd.h>
      15             : #include <netdb.h>
      16             : #include <sys/socket.h>
      17             : #include <netinet/in.h>
      18             : #include <arpa/inet.h>
      19             : #include "fd_geyser.h"
      20             : 
      21             : #define SHAM_LINK_CONTEXT fd_geyser_t
      22             : #define SHAM_LINK_STATE   fd_replay_notif_msg_t
      23             : #define SHAM_LINK_NAME    replay_sham_link
      24             : #include "sham_link.h"
      25             : 
      26             : #define SHAM_LINK_CONTEXT fd_geyser_t
      27             : #define SHAM_LINK_STATE   fd_stake_ci_t
      28             : #define SHAM_LINK_NAME    stake_sham_link
      29             : #include "sham_link.h"
      30             : 
      31             : struct fd_geyser {
      32             :   fd_funk_t            funk[1];
      33             :   fd_blockstore_t      blockstore_ljoin;
      34             :   fd_blockstore_t *    blockstore;
      35             :   int                  blockstore_fd;
      36             :   fd_stake_ci_t *      stake_ci;
      37             :   replay_sham_link_t * rep_notify;
      38             :   stake_sham_link_t *  stake_notify;
      39             : 
      40             :   void * fun_arg;
      41             :   fd_geyser_execute_fun execute_fun; /* Slot numbers, bank hash */
      42             :   fd_geyser_block_fun   block_fun;   /* Raw block data, additional metadata */
      43             :   fd_geyser_entry_fun   entry_fun;   /* Every entry/microblock */
      44             :   fd_geyser_txn_fun     txn_fun;     /* executed individual transaction */
      45             :   fd_geyser_block_done_fun block_done_fun;   /* Called after block specific updates are done */
      46             : 
      47             :   fd_geyser_acct_fun    acct_fun;    /* Account written */
      48             : };
      49             : 
      50             : ulong
      51           0 : fd_geyser_footprint( void ) {
      52           0 :   ulong l = FD_LAYOUT_INIT;
      53           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_geyser_t), sizeof(fd_geyser_t) );
      54           0 :   l = FD_LAYOUT_APPEND( l, fd_stake_ci_align(), fd_stake_ci_footprint() );
      55           0 :   l = FD_LAYOUT_APPEND( l, replay_sham_link_align(), replay_sham_link_footprint() );
      56           0 :   l = FD_LAYOUT_APPEND( l, stake_sham_link_align(), stake_sham_link_footprint() );
      57           0 :   return FD_LAYOUT_FINI( l, 1UL );
      58           0 : }
      59             : 
      60             : ulong
      61           0 : fd_geyser_align( void ) {
      62           0 :   return alignof(fd_geyser_t);
      63           0 : }
      64             : 
      65             : void *
      66           0 : fd_geyser_new( void * mem, fd_geyser_args_t * args ) {
      67           0 :   FD_SCRATCH_ALLOC_INIT( l, mem );
      68           0 :   fd_geyser_t * self = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_geyser_t), sizeof(fd_geyser_t) );
      69           0 :   void * stake_ci_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_stake_ci_align(), fd_stake_ci_footprint() );
      70           0 :   void * rep_notify_mem = FD_SCRATCH_ALLOC_APPEND( l, replay_sham_link_align(), replay_sham_link_footprint() );
      71           0 :   void * stake_notify_mem = FD_SCRATCH_ALLOC_APPEND( l, stake_sham_link_align(), stake_sham_link_footprint() );
      72           0 :   ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, 1UL );
      73           0 :   FD_TEST( scratch_top <= (ulong)mem + fd_geyser_footprint() );
      74             : 
      75           0 :   fd_funk_t * funk = fd_funk_open_file( self->funk, args->funk_file, 1, 0, 0, 0, 0, FD_FUNK_READONLY, NULL );
      76           0 :   if( !funk ) {
      77           0 :     FD_LOG_ERR(( "fd_funk_open_file(%s) failed", args->funk_file ));
      78           0 :   }
      79             : 
      80           0 :   fd_wksp_t * wksp = fd_wksp_attach( args->blockstore_wksp );
      81           0 :   if( FD_UNLIKELY( !wksp ) )
      82           0 :     FD_LOG_ERR(( "unable to attach to \"%s\"\n\tprobably does not exist or bad permissions", args->blockstore_wksp ));
      83           0 :   fd_wksp_tag_query_info_t info;
      84           0 :   ulong tag = 1;
      85           0 :   if( fd_wksp_tag_query( wksp, &tag, 1, &info, 1 ) <= 0 ) {
      86           0 :     FD_LOG_ERR(( "workspace \"%s\" does not contain a blockstore", args->blockstore_wksp ));
      87           0 :   }
      88           0 :   void * shmem = fd_wksp_laddr_fast( wksp, info.gaddr_lo );
      89           0 :   self->blockstore = fd_blockstore_join( &self->blockstore_ljoin, shmem );
      90           0 :   if( self->blockstore->shmem->magic != FD_BLOCKSTORE_MAGIC ) {
      91           0 :     FD_LOG_ERR(( "failed to join a blockstore" ));
      92           0 :   }
      93           0 :   self->blockstore_fd = args->blockstore_fd;
      94           0 :   fd_wksp_mprotect( wksp, 1 );
      95             : 
      96           0 :   fd_pubkey_t identity_key[1]; /* Just the public key */
      97           0 :   memset( identity_key, 0xa5, sizeof(fd_pubkey_t) );
      98           0 :   self->stake_ci = fd_stake_ci_join( fd_stake_ci_new( stake_ci_mem, identity_key ) );
      99             : 
     100           0 :   self->rep_notify = replay_sham_link_new( rep_notify_mem, "fd1_replay_notif.wksp" );
     101           0 :   self->stake_notify = stake_sham_link_new( stake_notify_mem, "fd1_stake_out.wksp" );
     102             : 
     103           0 :   replay_sham_link_start( self->rep_notify );
     104           0 :   stake_sham_link_start( self->stake_notify );
     105             : 
     106           0 :   self->execute_fun = args->execute_fun;
     107           0 :   self->block_fun = args->block_fun;
     108           0 :   self->block_done_fun = args->block_done_fun;
     109           0 :   self->entry_fun = args->entry_fun;
     110           0 :   self->txn_fun = args->txn_fun;
     111           0 :   self->acct_fun = args->acct_fun;
     112           0 :   self->fun_arg = args->fun_arg;
     113             : 
     114           0 :   return mem;
     115           0 : }
     116             : 
     117             : fd_geyser_t *
     118           0 : fd_geyser_join( void * mem ) {
     119           0 :   FD_SCRATCH_ALLOC_INIT( l, mem );
     120           0 :   return FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_geyser_t), sizeof(fd_geyser_t) );
     121           0 : }
     122             : 
     123             : void *
     124           0 : fd_geyser_leave( fd_geyser_t * self ) {
     125           0 :   return self;
     126           0 : }
     127             : 
     128             : void *
     129           0 : fd_geyser_delete( void * mem ) {
     130           0 :   return mem;
     131           0 : }
     132             : 
     133             : static void
     134           0 : fd_geyser_scan_txns( fd_geyser_t * ctx, ulong slotn, uchar * data, ulong sz ) {
     135           0 :   ulong blockoff = 0;
     136           0 :   while( blockoff < sz ) {
     137           0 :     if( blockoff + sizeof( ulong ) > sz ) FD_LOG_ERR(( "premature end of block" ));
     138           0 :     ulong mcount = FD_LOAD( ulong, (const uchar *)data + blockoff );
     139           0 :     blockoff += sizeof( ulong );
     140             : 
     141             :     /* Loop across microblocks */
     142           0 :     for( ulong mblk = 0; mblk < mcount; ++mblk ) {
     143           0 :       if( blockoff + sizeof( fd_microblock_hdr_t ) > sz ) {
     144           0 :         FD_LOG_WARNING(( "premature end of block" ));
     145           0 :         return;
     146           0 :       }
     147           0 :       fd_microblock_hdr_t * hdr = (fd_microblock_hdr_t *)( (const uchar *)data + blockoff );
     148           0 :       blockoff += sizeof( fd_microblock_hdr_t );
     149             : 
     150           0 :       if( ctx->entry_fun != NULL ) {
     151           0 :         (*ctx->entry_fun)( slotn, hdr, ctx->fun_arg );
     152           0 :       }
     153             : 
     154             :       /* Loop across transactions */
     155           0 :       for( ulong txn_idx = 0; txn_idx < hdr->txn_cnt; txn_idx++ ) {
     156           0 :         uchar         txn_out[FD_TXN_MAX_SZ];
     157           0 :         uchar const * raw    = (uchar const *)data + blockoff;
     158           0 :         ulong         pay_sz = 0;
     159           0 :         ulong         txn_sz = fd_txn_parse_core( (uchar const *)raw,
     160           0 :                                                   fd_ulong_min( sz - blockoff, FD_TXN_MTU ),
     161           0 :                                                   txn_out,
     162           0 :                                                   NULL,
     163           0 :                                                   &pay_sz );
     164           0 :         if( txn_sz == 0 || txn_sz > FD_TXN_MTU ) {
     165           0 :           FD_LOG_WARNING(( "failed to parse transaction %lu in microblock %lu in slot %lu. txn size: %lu",
     166           0 :                            txn_idx,
     167           0 :                            mblk,
     168           0 :                            slotn,
     169           0 :                            txn_sz ));
     170           0 :           return;
     171           0 :         }
     172           0 :         fd_txn_t const * txn = (fd_txn_t const *)txn_out;
     173             : 
     174           0 :         if( ctx->txn_fun != NULL ) {
     175           0 :           (*ctx->txn_fun)( slotn, txn, raw, txn_sz, ctx->fun_arg );
     176           0 :         }
     177             : 
     178           0 :         blockoff += pay_sz;
     179           0 :       }
     180           0 :     }
     181           0 :   }
     182           0 : }
     183             : 
     184             : void
     185           0 : fd_geyser_replay_block( fd_geyser_t * ctx, ulong slotn ) {
     186           0 :   if( ctx->block_fun != NULL || ctx->entry_fun != NULL || ctx->txn_fun != NULL ) {
     187           0 :     FD_SCRATCH_SCOPE_BEGIN {
     188           0 :       fd_block_info_t meta[1];
     189           0 :       fd_block_rewards_t rewards[1];
     190           0 :       fd_hash_t parent_hash;
     191           0 :       uchar * blk_data;
     192           0 :       ulong blk_sz;
     193           0 :       if( fd_blockstore_block_data_query_volatile( ctx->blockstore, ctx->blockstore_fd, slotn, fd_scratch_virtual(), &parent_hash, meta, rewards, &blk_data, &blk_sz ) ) {
     194           0 :         FD_LOG_WARNING(( "failed to retrieve block for slot %lu", slotn ));
     195           0 :         return;
     196           0 :       }
     197           0 :       if( ctx->block_fun != NULL ) {
     198           0 :         (*ctx->block_fun)( slotn, meta, &parent_hash, blk_data, blk_sz, ctx->fun_arg );
     199           0 :       }
     200           0 :       if( ctx->entry_fun != NULL || ctx->txn_fun != NULL ) {
     201           0 :         fd_geyser_scan_txns( ctx, slotn, blk_data, blk_sz );
     202           0 :       }
     203           0 :     } FD_SCRATCH_SCOPE_END;
     204           0 :   }
     205           0 :   if( ctx->block_done_fun != NULL ) {
     206           0 :     ( *ctx->block_done_fun ) ( slotn, ctx->fun_arg );
     207           0 :   }
     208           0 : }
     209             : 
     210             : static void
     211           0 : replay_sham_link_during_frag( fd_geyser_t * ctx, fd_replay_notif_msg_t * state, void const * msg, int sz ) {
     212           0 :   (void)ctx;
     213           0 :   FD_TEST( sz == (int)sizeof(fd_replay_notif_msg_t) );
     214           0 :   fd_memcpy(state, msg, sizeof(fd_replay_notif_msg_t));
     215           0 : }
     216             : 
     217             : static const void *
     218           0 : read_account_with_xid( fd_geyser_t * ctx, fd_funk_rec_key_t * recid, fd_funk_txn_xid_t * xid, ulong * result_len ) {
     219           0 :   fd_funk_txn_map_t * txn_map = fd_funk_txn_map( ctx->funk );
     220           0 :   fd_funk_txn_t *     txn     = fd_funk_txn_query( xid, txn_map );
     221           0 :   return fd_funk_rec_query_copy( ctx->funk, txn, recid, fd_scratch_virtual(), result_len );
     222           0 : }
     223             : 
     224             : static void
     225           0 : replay_sham_link_after_frag(fd_geyser_t * ctx, fd_replay_notif_msg_t * msg) {
     226           0 :   if( msg->type == FD_REPLAY_SLOT_TYPE ) {
     227           0 :     ulong slotn = msg->slot_exec.slot;
     228           0 :     if( ctx->execute_fun != NULL ) {
     229           0 :       ( *ctx->execute_fun ) ( msg, ctx->fun_arg );
     230           0 :     }
     231           0 :     fd_geyser_replay_block( ctx, slotn );
     232             : 
     233           0 :   } else if( msg->type == FD_REPLAY_ACCTS_TYPE ) {
     234           0 :     if( ctx->acct_fun != NULL ) {
     235           0 :       for( uint i = 0; i < msg->accts.accts_cnt; ++i ) {
     236           0 :         FD_SCRATCH_SCOPE_BEGIN {
     237           0 :           fd_pubkey_t addr;
     238           0 :           fd_memcpy(&addr, msg->accts.accts[i].id, 32U );
     239           0 :           fd_funk_rec_key_t key = fd_funk_acc_key( &addr );
     240           0 :           ulong datalen;
     241           0 :           const void * data = read_account_with_xid( ctx, &key, &msg->accts.funk_xid, &datalen );
     242           0 :           if( data ) {
     243           0 :             fd_account_meta_t const * meta = fd_type_pun_const( data );
     244           0 :             if( datalen >= meta->hlen + meta->dlen ) {
     245           0 :               (*ctx->acct_fun)( msg->accts.funk_xid.ul[0], msg->accts.sig, &addr, meta, (uchar*)data + meta->hlen, meta->dlen, ctx->fun_arg );
     246           0 :             }
     247           0 :           }
     248           0 :         } FD_SCRATCH_SCOPE_END;
     249           0 :       }
     250           0 :     }
     251           0 :   }
     252           0 : }
     253             : 
     254             : static void
     255           0 : stake_sham_link_during_frag( fd_geyser_t * ctx, fd_stake_ci_t * state, void const * msg, int sz ) {
     256           0 :   (void)ctx; (void)sz;
     257           0 :   fd_stake_ci_stake_msg_init( state, msg );
     258           0 : }
     259             : 
     260             : static void
     261           0 : stake_sham_link_after_frag(fd_geyser_t * ctx, fd_stake_ci_t * state) {
     262           0 :   (void)ctx;
     263           0 :   fd_stake_ci_stake_msg_fini( state );
     264           0 : }
     265             : 
     266             : void
     267           0 : fd_geyser_poll( fd_geyser_t * self ) {
     268           0 :   fd_replay_notif_msg_t msg;
     269           0 :   replay_sham_link_poll( self->rep_notify, self, &msg );
     270             : 
     271           0 :   stake_sham_link_poll( self->stake_notify, self, self->stake_ci );
     272           0 : }

Generated by: LCOV version 1.14