LCOV - code coverage report
Current view: top level - disco/replay - fd_replay.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 195 0.0 %
Date: 2024-11-13 11:58:15 Functions: 0 3 0.0 %

          Line data    Source code
       1             : #include "fd_replay.h"
       2             : 
       3             : #include "../../util/net/fd_pcap.h"
       4             : #include <stdio.h>
       5             : #include <errno.h>
       6             : 
       7             : FD_STATIC_ASSERT( FD_FCTL_ALIGN<=FD_REPLAY_TILE_SCRATCH_ALIGN, packing );
       8             : 
       9           0 : #define FD_CNC_DIAG_IN_BACKP   (0UL)
      10           0 : #define FD_CNC_DIAG_BACKP_CNT  (1UL)
      11             : 
      12             : #define FD_FSEQ_DIAG_PUB_CNT   (0UL)
      13             : #define FD_FSEQ_DIAG_PUB_SZ    (1UL)
      14             : #define FD_FSEQ_DIAG_FILT_CNT  (2UL)
      15             : #define FD_FSEQ_DIAG_FILT_SZ   (3UL)
      16             : #define FD_FSEQ_DIAG_OVRNP_CNT (4UL)
      17             : #define FD_FSEQ_DIAG_OVRNR_CNT (5UL)
      18             : #define FD_FSEQ_DIAG_SLOW_CNT  (6UL)
      19             : 
      20             : ulong
      21           0 : fd_replay_tile_scratch_align( void ) {
      22           0 :   return FD_REPLAY_TILE_SCRATCH_ALIGN;
      23           0 : }
      24             : 
      25             : ulong
      26           0 : fd_replay_tile_scratch_footprint( ulong out_cnt ) {
      27           0 :   if( FD_UNLIKELY( out_cnt>FD_REPLAY_TILE_OUT_MAX ) ) return 0UL;
      28           0 :   ulong l = FD_LAYOUT_INIT;
      29           0 :   l = FD_LAYOUT_APPEND( l, fd_fctl_align(), fd_fctl_footprint( out_cnt ) ); /* fctl */
      30           0 :   return FD_LAYOUT_FINI( l, fd_replay_tile_scratch_align() );
      31           0 : }
      32             : 
      33             : int
      34             : fd_replay_tile( fd_cnc_t *       cnc,
      35             :                 char const *     pcap_path,
      36             :                 ulong            pkt_max,
      37             :                 ulong            orig,
      38             :                 fd_frag_meta_t * mcache,
      39             :                 uchar *          dcache,
      40             :                 ulong            out_cnt,
      41             :                 ulong **         out_fseq,
      42             :                 ulong            cr_max,
      43             :                 long             lazy,
      44             :                 fd_rng_t *       rng,
      45           0 :                 void *           scratch ) {
      46             : 
      47             :   /* cnc state */
      48           0 :   ulong * cnc_diag;               /* ==fd_cnc_app_laddr( cnc ), local address of the replay tile cnc diagnostic region */
      49           0 :   ulong   cnc_diag_in_backp;      /* is the run loop currently backpressured by one or more of the outs, in [0,1] */
      50           0 :   ulong   cnc_diag_backp_cnt;     /* Accumulates number of transitions of tile to backpressured between housekeeping events */
      51           0 :   ulong   cnc_diag_pcap_done;     /* is the pcap file stream replay done */
      52           0 :   ulong   cnc_diag_pcap_pub_cnt;  /* Accumulates number of pcap packets published between housekeeping events */
      53           0 :   ulong   cnc_diag_pcap_pub_sz;   /* Accumulates pcap payload bytes published between housekeeping events */
      54           0 :   ulong   cnc_diag_pcap_filt_cnt; /* Accumulates number of pcap packets filtered between housekeeping events */
      55           0 :   ulong   cnc_diag_pcap_filt_sz;  /* Accumulates pcap payload bytes filtered between housekeeping events */
      56             : 
      57             :   /* in pcap stream state */
      58           0 :   FILE *           pcap_file; /* handle of pcap file stream */
      59           0 :   fd_pcap_iter_t * pcap_iter; /* iterator for the pcap file stream */
      60             : 
      61             :   /* out frag stream state */
      62           0 :   ulong   depth;  /* ==fd_mcache_depth( mcache ), depth of the mcache / positive integer power of 2 */
      63           0 :   ulong * sync;   /* ==fd_mcache_seq_laddr( mcache ), local addr where replay mcache sync info is published */
      64           0 :   ulong   seq;    /* seq replay frag sequence number to publish */
      65             : 
      66           0 :   void *  base;   /* ==fd_wksp_containing( dcache ), chunk reference address in the tile's local address space */
      67           0 :   ulong   chunk0; /* ==fd_dcache_compact_chunk0( base, dcache, pkt_max ) */
      68           0 :   ulong   wmark;  /* ==fd_dcache_compact_wmark ( base, dcache, _pkt_max ), packets chunks start in [chunk0,wmark] */
      69           0 :   ulong   chunk;  /* Chunk where next packet will be written, in [chunk0,wmark] */
      70             : 
      71             :   /* flow control state */
      72           0 :   fd_fctl_t * fctl;     /* output flow control */
      73           0 :   ulong       cr_avail; /* number of flow control credits available to publish downstream, in [0,cr_max] */
      74             : 
      75             :   /* housekeeping state */
      76           0 :   ulong async_min; /* minimum number of ticks between processing a housekeeping event, positive integer power of 2 */
      77             : 
      78           0 :   do {
      79             : 
      80           0 :     FD_LOG_INFO(( "Booting replay (out-cnt %lu)", out_cnt ));
      81           0 :     if( FD_UNLIKELY( out_cnt>FD_REPLAY_TILE_OUT_MAX ) ) { FD_LOG_WARNING(( "out_cnt too large" )); return 1; }
      82             : 
      83           0 :     if( FD_UNLIKELY( !scratch ) ) {
      84           0 :       FD_LOG_WARNING(( "NULL scratch" ));
      85           0 :       return 1;
      86           0 :     }
      87             : 
      88           0 :     if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)scratch, fd_replay_tile_scratch_align() ) ) ) {
      89           0 :       FD_LOG_WARNING(( "misaligned scratch" ));
      90           0 :       return 1;
      91           0 :     }
      92             : 
      93           0 :     FD_SCRATCH_ALLOC_INIT( l, scratch );
      94             : 
      95             :     /* cnc state init */
      96             : 
      97           0 :     if( FD_UNLIKELY( !cnc ) ) { FD_LOG_WARNING(( "NULL cnc" )); return 1; }
      98           0 :     if( FD_UNLIKELY( fd_cnc_app_sz( cnc )<64UL ) ) { FD_LOG_WARNING(( "cnc app sz must be at least 64" )); return 1; }
      99           0 :     if( FD_UNLIKELY( fd_cnc_signal_query( cnc )!=FD_CNC_SIGNAL_BOOT ) ) { FD_LOG_WARNING(( "already booted" )); return 1; }
     100             : 
     101           0 :     cnc_diag = (ulong *)fd_cnc_app_laddr( cnc );
     102             : 
     103             :     /* in_backp==1, backp_cnt==0 indicates waiting for initial credits,
     104             :        cleared during first housekeeping if credits available */
     105           0 :     cnc_diag_in_backp      = 1UL;
     106           0 :     cnc_diag_backp_cnt     = 0UL;
     107           0 :     cnc_diag_pcap_done     = 0UL;
     108           0 :     cnc_diag_pcap_pub_cnt  = 0UL;
     109           0 :     cnc_diag_pcap_pub_sz   = 0UL;
     110           0 :     cnc_diag_pcap_filt_cnt = 0UL;
     111           0 :     cnc_diag_pcap_filt_sz  = 0UL;
     112             : 
     113             :     /* in pcap stream init */
     114             : 
     115           0 :     if( FD_UNLIKELY( !pkt_max ) ) { FD_LOG_WARNING(( "pkt_max must be positive" )); return 1; }
     116           0 :     if( FD_UNLIKELY( !pcap_path ) ) { FD_LOG_WARNING(( "NULL pcap path" )); return 1; }
     117           0 :     FD_LOG_INFO(( "Opening pcap %s (pkt_max %lu)", pcap_path, pkt_max ));
     118           0 :     pcap_file = fopen( pcap_path, "r" );
     119           0 :     if( FD_UNLIKELY( !pcap_file ) ) { FD_LOG_WARNING(( "fopen failed" )); return 1; }
     120             : 
     121           0 :     pcap_iter = fd_pcap_iter_new( pcap_file );
     122           0 :     if( FD_UNLIKELY( !pcap_iter ) ) { FD_LOG_WARNING(( "fd_pcap_iter_new failed" )); return 1; }
     123           0 :     FD_COMPILER_MFENCE();
     124           0 :     cnc_diag[ FD_REPLAY_CNC_DIAG_PCAP_DONE ] = 0UL; /* Clear before entering running state */
     125           0 :     FD_COMPILER_MFENCE();
     126             : 
     127             :     /* out frag stream init */
     128             : 
     129           0 :     if( FD_UNLIKELY( !mcache ) ) { FD_LOG_WARNING(( "NULL mcache" )); return 1; }
     130           0 :     depth = fd_mcache_depth    ( mcache );
     131           0 :     sync  = fd_mcache_seq_laddr( mcache );
     132             : 
     133           0 :     seq = fd_mcache_seq_query( sync ); /* FIXME: ALLOW OPTION FOR MANUAL SPECIFICATION */
     134             : 
     135           0 :     if( FD_UNLIKELY( !dcache ) ) { FD_LOG_WARNING(( "NULL dcache" )); return 1; }
     136             : 
     137           0 :     base = fd_wksp_containing( dcache );
     138           0 :     if( FD_UNLIKELY( !base ) ) { FD_LOG_WARNING(( "fd_wksp_containing failed" )); return 1; }
     139             : 
     140           0 :     if( FD_UNLIKELY( !fd_dcache_compact_is_safe( base, dcache, pkt_max, depth ) ) ) {
     141           0 :       FD_LOG_WARNING(( "--dcache not compatible with wksp base, --pkt-max and --mcache depth" ));
     142           0 :       return 1;
     143           0 :     }
     144             : 
     145           0 :     chunk0 = fd_dcache_compact_chunk0( base, dcache );
     146           0 :     wmark  = fd_dcache_compact_wmark ( base, dcache, pkt_max );
     147           0 :     chunk  = FD_VOLATILE_CONST( cnc_diag[ FD_REPLAY_CNC_DIAG_CHUNK_IDX ] );
     148           0 :     if( FD_UNLIKELY( !((chunk0<=chunk) & (chunk<=wmark)) ) ) {
     149           0 :       chunk = chunk0;
     150           0 :       FD_LOG_INFO(( "out of bounds cnc chunk index; overriding initial chunk to chunk0" ));
     151           0 :     }
     152           0 :     FD_LOG_INFO(( "chunk %lu", chunk ));
     153             : 
     154             :     /* out flow control init */
     155             : 
     156           0 :     if( FD_UNLIKELY( !!out_cnt && !out_fseq ) ) { FD_LOG_WARNING(( "NULL out_fseq" )); return 1; }
     157             : 
     158           0 :     fctl = fd_fctl_join( fd_fctl_new( FD_SCRATCH_ALLOC_APPEND( l, fd_fctl_align(), fd_fctl_footprint( out_cnt ) ), out_cnt ) );
     159           0 :     if( FD_UNLIKELY( !fctl ) ) { FD_LOG_WARNING(( "join failed" )); return 1; }
     160             : 
     161           0 :     for( ulong out_idx=0UL; out_idx<out_cnt; out_idx++ ) {
     162             : 
     163           0 :       ulong * fseq = out_fseq[ out_idx ];
     164           0 :       if( FD_UNLIKELY( !fseq ) ) { FD_LOG_WARNING(( "NULL out_fseq[%lu]", out_idx )); return 1; }
     165           0 :       ulong * fseq_diag = (ulong *)fd_fseq_app_laddr( fseq );
     166             : 
     167             :       /* Assumes lag_max==depth */
     168             :       /* FIXME: CONSIDER ADDING LAG_MAX THIS TO FSEQ AS A FIELD? */
     169           0 :       if( FD_UNLIKELY( !fd_fctl_cfg_rx_add( fctl, depth, fseq, &fseq_diag[ FD_FSEQ_DIAG_SLOW_CNT ] ) ) ) {
     170           0 :         FD_LOG_WARNING(( "fd_fctl_cfg_rx_add failed" ));
     171           0 :         return 1;
     172           0 :       }
     173           0 :     }
     174             : 
     175             :     /* cr_burst is 1 because we only send at most 1 fragment metadata
     176             :        between checking cr_avail.  We use defaults for cr_resume and
     177             :        cr_refill (and possible cr_max if the user wanted to use defaults
     178             :        here too). */
     179             : 
     180           0 :     if( FD_UNLIKELY( !fd_fctl_cfg_done( fctl, 1UL, cr_max, 0UL, 0UL ) ) ) {
     181           0 :       FD_LOG_WARNING(( "fd_fctl_cfg_done failed" ));
     182           0 :       return 1;
     183           0 :     }
     184           0 :     FD_LOG_INFO(( "cr_burst %lu cr_max %lu cr_resume %lu cr_refill %lu",
     185           0 :                   fd_fctl_cr_burst( fctl ), fd_fctl_cr_max( fctl ), fd_fctl_cr_resume( fctl ), fd_fctl_cr_refill( fctl ) ));
     186             : 
     187           0 :     cr_max   = fd_fctl_cr_max( fctl );
     188           0 :     cr_avail = 0UL; /* Will be initialized by run loop */
     189             : 
     190             :     /* housekeeping init */
     191             : 
     192           0 :     if( lazy<=0L ) lazy = fd_tempo_lazy_default( cr_max );
     193           0 :     FD_LOG_INFO(( "Configuring housekeeping (lazy %li ns)", lazy ));
     194             : 
     195           0 :     async_min = fd_tempo_async_min( lazy, 1UL /*event_cnt*/, (float)fd_tempo_tick_per_ns( NULL ) );
     196           0 :     if( FD_UNLIKELY( !async_min ) ) { FD_LOG_WARNING(( "bad lazy" )); return 1; }
     197             : 
     198           0 :   } while(0);
     199             : 
     200           0 :   FD_LOG_INFO(( "Running replay (orig %lu)", orig ));
     201           0 :   fd_cnc_signal( cnc, FD_CNC_SIGNAL_RUN );
     202           0 :   long then = fd_tickcount();
     203           0 :   long now  = then;
     204           0 :   for(;;) {
     205             : 
     206             :     /* Do housekeeping at a low rate in the background */
     207           0 :     if( FD_UNLIKELY( (now-then)>=0L ) ) {
     208             : 
     209             :       /* Send synchronization info */
     210           0 :       fd_mcache_seq_update( sync, seq );
     211             : 
     212             :       /* Send diagnostic info */
     213             :       /* When we drain, we don't do a fully atomic update of the
     214             :          diagnostics as it is only diagnostic and it will still be
     215             :          correct the usual case where individual diagnostic counters
     216             :          aren't used by multiple writers spread over different threads
     217             :          of execution. */
     218           0 :       fd_cnc_heartbeat( cnc, now );
     219           0 :       FD_COMPILER_MFENCE();
     220           0 :       cnc_diag[ FD_CNC_DIAG_IN_BACKP             ]  = cnc_diag_in_backp;
     221           0 :       cnc_diag[ FD_CNC_DIAG_BACKP_CNT            ] += cnc_diag_backp_cnt;
     222           0 :       cnc_diag[ FD_REPLAY_CNC_DIAG_CHUNK_IDX     ]  = chunk;
     223           0 :       cnc_diag[ FD_REPLAY_CNC_DIAG_PCAP_DONE     ]  = cnc_diag_pcap_done;
     224           0 :       cnc_diag[ FD_REPLAY_CNC_DIAG_PCAP_PUB_CNT  ] += cnc_diag_pcap_pub_cnt;
     225           0 :       cnc_diag[ FD_REPLAY_CNC_DIAG_PCAP_PUB_SZ   ] += cnc_diag_pcap_pub_sz;
     226           0 :       cnc_diag[ FD_REPLAY_CNC_DIAG_PCAP_FILT_CNT ] += cnc_diag_pcap_filt_cnt;
     227           0 :       cnc_diag[ FD_REPLAY_CNC_DIAG_PCAP_FILT_SZ  ] += cnc_diag_pcap_filt_sz;
     228           0 :       FD_COMPILER_MFENCE();
     229           0 :       cnc_diag_backp_cnt     = 0UL;
     230           0 :       cnc_diag_pcap_pub_cnt  = 0UL;
     231           0 :       cnc_diag_pcap_pub_sz   = 0UL;
     232           0 :       cnc_diag_pcap_filt_cnt = 0UL;
     233           0 :       cnc_diag_pcap_filt_sz  = 0UL;
     234             : 
     235             :       /* Receive command-and-control signals */
     236           0 :       ulong s = fd_cnc_signal_query( cnc );
     237           0 :       if( FD_UNLIKELY( s!=FD_CNC_SIGNAL_RUN ) ) {
     238           0 :         if( FD_LIKELY( s==FD_CNC_SIGNAL_HALT ) ) break;
     239           0 :         if( FD_UNLIKELY( s!=FD_REPLAY_CNC_SIGNAL_ACK ) ) {
     240           0 :           char buf[ FD_CNC_SIGNAL_CSTR_BUF_MAX ];
     241           0 :           FD_LOG_WARNING(( "Unexpected signal %s (%lu) received; trying to resume", fd_cnc_signal_cstr( s, buf ), s ));
     242           0 :         }
     243           0 :         fd_cnc_signal( cnc, FD_CNC_SIGNAL_RUN );
     244           0 :       }
     245             : 
     246             :       /* Receive flow control credits */
     247           0 :       cr_avail = fd_fctl_tx_cr_update( fctl, cr_avail, seq );
     248             : 
     249             :       /* Reload housekeeping timer */
     250           0 :       then = now + (long)fd_tempo_async_reload( rng, async_min );
     251           0 :     }
     252             : 
     253             :     /* Check if we are backpressured.  If so, count any transition into
     254             :        a backpressured regime and spin to wait for flow control credits
     255             :        to return.  We don't do a fully atomic update here as it is only
     256             :        diagnostic and it will still be correct the usual case where
     257             :        individual diagnostic counters aren't used by writers in
     258             :        different threads of execution.  We only count the transition
     259             :        from not backpressured to backpressured. */
     260             : 
     261           0 :     if( FD_UNLIKELY( !cr_avail ) ) {
     262           0 :       cnc_diag_backp_cnt += (ulong)!cnc_diag_in_backp;
     263           0 :       cnc_diag_in_backp   = 1UL;
     264           0 :       FD_SPIN_PAUSE();
     265           0 :       now = fd_tickcount();
     266           0 :       continue;
     267           0 :     }
     268           0 :     cnc_diag_in_backp = 0UL;
     269             : 
     270             :     /* Try to load the next packet directly into the dcache at chunk */
     271             : 
     272           0 :     if( FD_UNLIKELY( cnc_diag_pcap_done ) ) {
     273           0 :       FD_SPIN_PAUSE();
     274           0 :       now = fd_tickcount();
     275           0 :       continue;
     276           0 :     }
     277             : 
     278           0 :     long  ts;
     279           0 :     ulong sz = fd_pcap_iter_next( pcap_iter, fd_chunk_to_laddr( base, chunk ), pkt_max, &ts );
     280           0 :     if( FD_UNLIKELY( !sz ) ) {
     281           0 :       cnc_diag_pcap_done = 1UL;
     282           0 :       now = fd_tickcount();
     283           0 :       continue;
     284           0 :     }
     285             : 
     286           0 :     int should_filter = 0; /* FIXME: filter logic goes here */
     287             : 
     288           0 :     if( FD_UNLIKELY( should_filter ) ) {
     289           0 :       cnc_diag_pcap_filt_cnt++;
     290           0 :       cnc_diag_pcap_filt_sz += sz;
     291           0 :       now = fd_tickcount();
     292           0 :       continue;
     293           0 :     }
     294             : 
     295           0 :     ulong sig = (ulong)ts; /* FIXME: TEMPORARY HACK */
     296           0 :     ulong ctl = fd_frag_meta_ctl( orig, 1 /*som*/, 1 /*eom*/, 0 /*err*/ );
     297             : 
     298           0 :     now = fd_tickcount();
     299           0 :     ulong tsorig = fd_frag_meta_ts_comp( now );
     300           0 :     ulong tspub  = tsorig;
     301           0 :     fd_mcache_publish( mcache, depth, seq, sig, chunk, sz, ctl, tsorig, tspub );
     302             : 
     303             :     /* Windup for the next iteration and accumulate diagnostics */
     304             : 
     305           0 :     chunk = fd_dcache_compact_next( chunk, sz, chunk0, wmark );
     306           0 :     seq   = fd_seq_inc( seq, 1UL );
     307           0 :     cr_avail--;
     308           0 :     cnc_diag_pcap_pub_cnt++;
     309           0 :     cnc_diag_pcap_pub_sz += sz;
     310           0 :   }
     311             : 
     312           0 :   do {
     313             : 
     314           0 :     FD_LOG_INFO(( "Halting replay" ));
     315             : 
     316           0 :     FD_LOG_INFO(( "Destroying fctl" ));
     317           0 :     fd_fctl_delete( fd_fctl_leave( fctl ) );
     318             : 
     319           0 :     FD_LOG_INFO(( "Closing pcap" ));
     320           0 :     if( FD_UNLIKELY( fclose( fd_pcap_iter_delete( pcap_iter ) ) ) )
     321           0 :       FD_LOG_WARNING(( "fclose failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     322             : 
     323           0 :     FD_LOG_INFO(( "Halted replay" ));
     324           0 :     fd_cnc_signal( cnc, FD_CNC_SIGNAL_BOOT );
     325             : 
     326           0 :   } while(0);
     327             : 
     328           0 :   return 0;
     329           0 : }

Generated by: LCOV version 1.14