Line data Source code
1 : #include "fd_replay.h"
2 :
3 : #include "../../util/net/fd_pcap.h"
4 : #include <stdio.h>
5 : #include <errno.h>
6 :
7 : FD_STATIC_ASSERT( FD_FCTL_ALIGN<=FD_REPLAY_TILE_SCRATCH_ALIGN, packing );
8 :
9 0 : #define FD_CNC_DIAG_IN_BACKP (0UL)
10 0 : #define FD_CNC_DIAG_BACKP_CNT (1UL)
11 :
12 : #define FD_FSEQ_DIAG_PUB_CNT (0UL)
13 : #define FD_FSEQ_DIAG_PUB_SZ (1UL)
14 : #define FD_FSEQ_DIAG_FILT_CNT (2UL)
15 : #define FD_FSEQ_DIAG_FILT_SZ (3UL)
16 : #define FD_FSEQ_DIAG_OVRNP_CNT (4UL)
17 : #define FD_FSEQ_DIAG_OVRNR_CNT (5UL)
18 : #define FD_FSEQ_DIAG_SLOW_CNT (6UL)
19 :
20 : ulong
21 0 : fd_replay_tile_scratch_align( void ) {
22 0 : return FD_REPLAY_TILE_SCRATCH_ALIGN;
23 0 : }
24 :
25 : ulong
26 0 : fd_replay_tile_scratch_footprint( ulong out_cnt ) {
27 0 : if( FD_UNLIKELY( out_cnt>FD_REPLAY_TILE_OUT_MAX ) ) return 0UL;
28 0 : ulong l = FD_LAYOUT_INIT;
29 0 : l = FD_LAYOUT_APPEND( l, fd_fctl_align(), fd_fctl_footprint( out_cnt ) ); /* fctl */
30 0 : return FD_LAYOUT_FINI( l, fd_replay_tile_scratch_align() );
31 0 : }
32 :
33 : int
34 : fd_replay_tile( fd_cnc_t * cnc,
35 : char const * pcap_path,
36 : ulong pkt_max,
37 : ulong orig,
38 : fd_frag_meta_t * mcache,
39 : uchar * dcache,
40 : ulong out_cnt,
41 : ulong ** out_fseq,
42 : ulong cr_max,
43 : long lazy,
44 : fd_rng_t * rng,
45 0 : void * scratch ) {
46 :
47 : /* cnc state */
48 0 : ulong * cnc_diag; /* ==fd_cnc_app_laddr( cnc ), local address of the replay tile cnc diagnostic region */
49 0 : ulong cnc_diag_in_backp; /* is the run loop currently backpressured by one or more of the outs, in [0,1] */
50 0 : ulong cnc_diag_backp_cnt; /* Accumulates number of transitions of tile to backpressured between housekeeping events */
51 0 : ulong cnc_diag_pcap_done; /* is the pcap file stream replay done */
52 0 : ulong cnc_diag_pcap_pub_cnt; /* Accumulates number of pcap packets published between housekeeping events */
53 0 : ulong cnc_diag_pcap_pub_sz; /* Accumulates pcap payload bytes published between housekeeping events */
54 0 : ulong cnc_diag_pcap_filt_cnt; /* Accumulates number of pcap packets filtered between housekeeping events */
55 0 : ulong cnc_diag_pcap_filt_sz; /* Accumulates pcap payload bytes filtered between housekeeping events */
56 :
57 : /* in pcap stream state */
58 0 : FILE * pcap_file; /* handle of pcap file stream */
59 0 : fd_pcap_iter_t * pcap_iter; /* iterator for the pcap file stream */
60 :
61 : /* out frag stream state */
62 0 : ulong depth; /* ==fd_mcache_depth( mcache ), depth of the mcache / positive integer power of 2 */
63 0 : ulong * sync; /* ==fd_mcache_seq_laddr( mcache ), local addr where replay mcache sync info is published */
64 0 : ulong seq; /* seq replay frag sequence number to publish */
65 :
66 0 : void * base; /* ==fd_wksp_containing( dcache ), chunk reference address in the tile's local address space */
67 0 : ulong chunk0; /* ==fd_dcache_compact_chunk0( base, dcache, pkt_max ) */
68 0 : ulong wmark; /* ==fd_dcache_compact_wmark ( base, dcache, _pkt_max ), packets chunks start in [chunk0,wmark] */
69 0 : ulong chunk; /* Chunk where next packet will be written, in [chunk0,wmark] */
70 :
71 : /* flow control state */
72 0 : fd_fctl_t * fctl; /* output flow control */
73 0 : ulong cr_avail; /* number of flow control credits available to publish downstream, in [0,cr_max] */
74 :
75 : /* housekeeping state */
76 0 : ulong async_min; /* minimum number of ticks between processing a housekeeping event, positive integer power of 2 */
77 :
78 0 : do {
79 :
80 0 : FD_LOG_INFO(( "Booting replay (out-cnt %lu)", out_cnt ));
81 0 : if( FD_UNLIKELY( out_cnt>FD_REPLAY_TILE_OUT_MAX ) ) { FD_LOG_WARNING(( "out_cnt too large" )); return 1; }
82 :
83 0 : if( FD_UNLIKELY( !scratch ) ) {
84 0 : FD_LOG_WARNING(( "NULL scratch" ));
85 0 : return 1;
86 0 : }
87 :
88 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)scratch, fd_replay_tile_scratch_align() ) ) ) {
89 0 : FD_LOG_WARNING(( "misaligned scratch" ));
90 0 : return 1;
91 0 : }
92 :
93 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
94 :
95 : /* cnc state init */
96 :
97 0 : if( FD_UNLIKELY( !cnc ) ) { FD_LOG_WARNING(( "NULL cnc" )); return 1; }
98 0 : if( FD_UNLIKELY( fd_cnc_app_sz( cnc )<64UL ) ) { FD_LOG_WARNING(( "cnc app sz must be at least 64" )); return 1; }
99 0 : if( FD_UNLIKELY( fd_cnc_signal_query( cnc )!=FD_CNC_SIGNAL_BOOT ) ) { FD_LOG_WARNING(( "already booted" )); return 1; }
100 :
101 0 : cnc_diag = (ulong *)fd_cnc_app_laddr( cnc );
102 :
103 : /* in_backp==1, backp_cnt==0 indicates waiting for initial credits,
104 : cleared during first housekeeping if credits available */
105 0 : cnc_diag_in_backp = 1UL;
106 0 : cnc_diag_backp_cnt = 0UL;
107 0 : cnc_diag_pcap_done = 0UL;
108 0 : cnc_diag_pcap_pub_cnt = 0UL;
109 0 : cnc_diag_pcap_pub_sz = 0UL;
110 0 : cnc_diag_pcap_filt_cnt = 0UL;
111 0 : cnc_diag_pcap_filt_sz = 0UL;
112 :
113 : /* in pcap stream init */
114 :
115 0 : if( FD_UNLIKELY( !pkt_max ) ) { FD_LOG_WARNING(( "pkt_max must be positive" )); return 1; }
116 0 : if( FD_UNLIKELY( !pcap_path ) ) { FD_LOG_WARNING(( "NULL pcap path" )); return 1; }
117 0 : FD_LOG_INFO(( "Opening pcap %s (pkt_max %lu)", pcap_path, pkt_max ));
118 0 : pcap_file = fopen( pcap_path, "r" );
119 0 : if( FD_UNLIKELY( !pcap_file ) ) { FD_LOG_WARNING(( "fopen failed" )); return 1; }
120 :
121 0 : pcap_iter = fd_pcap_iter_new( pcap_file );
122 0 : if( FD_UNLIKELY( !pcap_iter ) ) { FD_LOG_WARNING(( "fd_pcap_iter_new failed" )); return 1; }
123 0 : FD_COMPILER_MFENCE();
124 0 : cnc_diag[ FD_REPLAY_CNC_DIAG_PCAP_DONE ] = 0UL; /* Clear before entering running state */
125 0 : FD_COMPILER_MFENCE();
126 :
127 : /* out frag stream init */
128 :
129 0 : if( FD_UNLIKELY( !mcache ) ) { FD_LOG_WARNING(( "NULL mcache" )); return 1; }
130 0 : depth = fd_mcache_depth ( mcache );
131 0 : sync = fd_mcache_seq_laddr( mcache );
132 :
133 0 : seq = fd_mcache_seq_query( sync ); /* FIXME: ALLOW OPTION FOR MANUAL SPECIFICATION */
134 :
135 0 : if( FD_UNLIKELY( !dcache ) ) { FD_LOG_WARNING(( "NULL dcache" )); return 1; }
136 :
137 0 : base = fd_wksp_containing( dcache );
138 0 : if( FD_UNLIKELY( !base ) ) { FD_LOG_WARNING(( "fd_wksp_containing failed" )); return 1; }
139 :
140 0 : if( FD_UNLIKELY( !fd_dcache_compact_is_safe( base, dcache, pkt_max, depth ) ) ) {
141 0 : FD_LOG_WARNING(( "--dcache not compatible with wksp base, --pkt-max and --mcache depth" ));
142 0 : return 1;
143 0 : }
144 :
145 0 : chunk0 = fd_dcache_compact_chunk0( base, dcache );
146 0 : wmark = fd_dcache_compact_wmark ( base, dcache, pkt_max );
147 0 : chunk = FD_VOLATILE_CONST( cnc_diag[ FD_REPLAY_CNC_DIAG_CHUNK_IDX ] );
148 0 : if( FD_UNLIKELY( !((chunk0<=chunk) & (chunk<=wmark)) ) ) {
149 0 : chunk = chunk0;
150 0 : FD_LOG_INFO(( "out of bounds cnc chunk index; overriding initial chunk to chunk0" ));
151 0 : }
152 0 : FD_LOG_INFO(( "chunk %lu", chunk ));
153 :
154 : /* out flow control init */
155 :
156 0 : if( FD_UNLIKELY( !!out_cnt && !out_fseq ) ) { FD_LOG_WARNING(( "NULL out_fseq" )); return 1; }
157 :
158 0 : fctl = fd_fctl_join( fd_fctl_new( FD_SCRATCH_ALLOC_APPEND( l, fd_fctl_align(), fd_fctl_footprint( out_cnt ) ), out_cnt ) );
159 0 : if( FD_UNLIKELY( !fctl ) ) { FD_LOG_WARNING(( "join failed" )); return 1; }
160 :
161 0 : for( ulong out_idx=0UL; out_idx<out_cnt; out_idx++ ) {
162 :
163 0 : ulong * fseq = out_fseq[ out_idx ];
164 0 : if( FD_UNLIKELY( !fseq ) ) { FD_LOG_WARNING(( "NULL out_fseq[%lu]", out_idx )); return 1; }
165 0 : ulong * fseq_diag = (ulong *)fd_fseq_app_laddr( fseq );
166 :
167 : /* Assumes lag_max==depth */
168 : /* FIXME: CONSIDER ADDING LAG_MAX THIS TO FSEQ AS A FIELD? */
169 0 : if( FD_UNLIKELY( !fd_fctl_cfg_rx_add( fctl, depth, fseq, &fseq_diag[ FD_FSEQ_DIAG_SLOW_CNT ] ) ) ) {
170 0 : FD_LOG_WARNING(( "fd_fctl_cfg_rx_add failed" ));
171 0 : return 1;
172 0 : }
173 0 : }
174 :
175 : /* cr_burst is 1 because we only send at most 1 fragment metadata
176 : between checking cr_avail. We use defaults for cr_resume and
177 : cr_refill (and possible cr_max if the user wanted to use defaults
178 : here too). */
179 :
180 0 : if( FD_UNLIKELY( !fd_fctl_cfg_done( fctl, 1UL, cr_max, 0UL, 0UL ) ) ) {
181 0 : FD_LOG_WARNING(( "fd_fctl_cfg_done failed" ));
182 0 : return 1;
183 0 : }
184 0 : FD_LOG_INFO(( "cr_burst %lu cr_max %lu cr_resume %lu cr_refill %lu",
185 0 : fd_fctl_cr_burst( fctl ), fd_fctl_cr_max( fctl ), fd_fctl_cr_resume( fctl ), fd_fctl_cr_refill( fctl ) ));
186 :
187 0 : cr_max = fd_fctl_cr_max( fctl );
188 0 : cr_avail = 0UL; /* Will be initialized by run loop */
189 :
190 : /* housekeeping init */
191 :
192 0 : if( lazy<=0L ) lazy = fd_tempo_lazy_default( cr_max );
193 0 : FD_LOG_INFO(( "Configuring housekeeping (lazy %li ns)", lazy ));
194 :
195 0 : async_min = fd_tempo_async_min( lazy, 1UL /*event_cnt*/, (float)fd_tempo_tick_per_ns( NULL ) );
196 0 : if( FD_UNLIKELY( !async_min ) ) { FD_LOG_WARNING(( "bad lazy" )); return 1; }
197 :
198 0 : } while(0);
199 :
200 0 : FD_LOG_INFO(( "Running replay (orig %lu)", orig ));
201 0 : fd_cnc_signal( cnc, FD_CNC_SIGNAL_RUN );
202 0 : long then = fd_tickcount();
203 0 : long now = then;
204 0 : for(;;) {
205 :
206 : /* Do housekeeping at a low rate in the background */
207 0 : if( FD_UNLIKELY( (now-then)>=0L ) ) {
208 :
209 : /* Send synchronization info */
210 0 : fd_mcache_seq_update( sync, seq );
211 :
212 : /* Send diagnostic info */
213 : /* When we drain, we don't do a fully atomic update of the
214 : diagnostics as it is only diagnostic and it will still be
215 : correct the usual case where individual diagnostic counters
216 : aren't used by multiple writers spread over different threads
217 : of execution. */
218 0 : fd_cnc_heartbeat( cnc, now );
219 0 : FD_COMPILER_MFENCE();
220 0 : cnc_diag[ FD_CNC_DIAG_IN_BACKP ] = cnc_diag_in_backp;
221 0 : cnc_diag[ FD_CNC_DIAG_BACKP_CNT ] += cnc_diag_backp_cnt;
222 0 : cnc_diag[ FD_REPLAY_CNC_DIAG_CHUNK_IDX ] = chunk;
223 0 : cnc_diag[ FD_REPLAY_CNC_DIAG_PCAP_DONE ] = cnc_diag_pcap_done;
224 0 : cnc_diag[ FD_REPLAY_CNC_DIAG_PCAP_PUB_CNT ] += cnc_diag_pcap_pub_cnt;
225 0 : cnc_diag[ FD_REPLAY_CNC_DIAG_PCAP_PUB_SZ ] += cnc_diag_pcap_pub_sz;
226 0 : cnc_diag[ FD_REPLAY_CNC_DIAG_PCAP_FILT_CNT ] += cnc_diag_pcap_filt_cnt;
227 0 : cnc_diag[ FD_REPLAY_CNC_DIAG_PCAP_FILT_SZ ] += cnc_diag_pcap_filt_sz;
228 0 : FD_COMPILER_MFENCE();
229 0 : cnc_diag_backp_cnt = 0UL;
230 0 : cnc_diag_pcap_pub_cnt = 0UL;
231 0 : cnc_diag_pcap_pub_sz = 0UL;
232 0 : cnc_diag_pcap_filt_cnt = 0UL;
233 0 : cnc_diag_pcap_filt_sz = 0UL;
234 :
235 : /* Receive command-and-control signals */
236 0 : ulong s = fd_cnc_signal_query( cnc );
237 0 : if( FD_UNLIKELY( s!=FD_CNC_SIGNAL_RUN ) ) {
238 0 : if( FD_LIKELY( s==FD_CNC_SIGNAL_HALT ) ) break;
239 0 : if( FD_UNLIKELY( s!=FD_REPLAY_CNC_SIGNAL_ACK ) ) {
240 0 : char buf[ FD_CNC_SIGNAL_CSTR_BUF_MAX ];
241 0 : FD_LOG_WARNING(( "Unexpected signal %s (%lu) received; trying to resume", fd_cnc_signal_cstr( s, buf ), s ));
242 0 : }
243 0 : fd_cnc_signal( cnc, FD_CNC_SIGNAL_RUN );
244 0 : }
245 :
246 : /* Receive flow control credits */
247 0 : cr_avail = fd_fctl_tx_cr_update( fctl, cr_avail, seq );
248 :
249 : /* Reload housekeeping timer */
250 0 : then = now + (long)fd_tempo_async_reload( rng, async_min );
251 0 : }
252 :
253 : /* Check if we are backpressured. If so, count any transition into
254 : a backpressured regime and spin to wait for flow control credits
255 : to return. We don't do a fully atomic update here as it is only
256 : diagnostic and it will still be correct the usual case where
257 : individual diagnostic counters aren't used by writers in
258 : different threads of execution. We only count the transition
259 : from not backpressured to backpressured. */
260 :
261 0 : if( FD_UNLIKELY( !cr_avail ) ) {
262 0 : cnc_diag_backp_cnt += (ulong)!cnc_diag_in_backp;
263 0 : cnc_diag_in_backp = 1UL;
264 0 : FD_SPIN_PAUSE();
265 0 : now = fd_tickcount();
266 0 : continue;
267 0 : }
268 0 : cnc_diag_in_backp = 0UL;
269 :
270 : /* Try to load the next packet directly into the dcache at chunk */
271 :
272 0 : if( FD_UNLIKELY( cnc_diag_pcap_done ) ) {
273 0 : FD_SPIN_PAUSE();
274 0 : now = fd_tickcount();
275 0 : continue;
276 0 : }
277 :
278 0 : long ts;
279 0 : ulong sz = fd_pcap_iter_next( pcap_iter, fd_chunk_to_laddr( base, chunk ), pkt_max, &ts );
280 0 : if( FD_UNLIKELY( !sz ) ) {
281 0 : cnc_diag_pcap_done = 1UL;
282 0 : now = fd_tickcount();
283 0 : continue;
284 0 : }
285 :
286 0 : int should_filter = 0; /* FIXME: filter logic goes here */
287 :
288 0 : if( FD_UNLIKELY( should_filter ) ) {
289 0 : cnc_diag_pcap_filt_cnt++;
290 0 : cnc_diag_pcap_filt_sz += sz;
291 0 : now = fd_tickcount();
292 0 : continue;
293 0 : }
294 :
295 0 : ulong sig = (ulong)ts; /* FIXME: TEMPORARY HACK */
296 0 : ulong ctl = fd_frag_meta_ctl( orig, 1 /*som*/, 1 /*eom*/, 0 /*err*/ );
297 :
298 0 : now = fd_tickcount();
299 0 : ulong tsorig = fd_frag_meta_ts_comp( now );
300 0 : ulong tspub = tsorig;
301 0 : fd_mcache_publish( mcache, depth, seq, sig, chunk, sz, ctl, tsorig, tspub );
302 :
303 : /* Windup for the next iteration and accumulate diagnostics */
304 :
305 0 : chunk = fd_dcache_compact_next( chunk, sz, chunk0, wmark );
306 0 : seq = fd_seq_inc( seq, 1UL );
307 0 : cr_avail--;
308 0 : cnc_diag_pcap_pub_cnt++;
309 0 : cnc_diag_pcap_pub_sz += sz;
310 0 : }
311 :
312 0 : do {
313 :
314 0 : FD_LOG_INFO(( "Halting replay" ));
315 :
316 0 : FD_LOG_INFO(( "Destroying fctl" ));
317 0 : fd_fctl_delete( fd_fctl_leave( fctl ) );
318 :
319 0 : FD_LOG_INFO(( "Closing pcap" ));
320 0 : if( FD_UNLIKELY( fclose( fd_pcap_iter_delete( pcap_iter ) ) ) )
321 0 : FD_LOG_WARNING(( "fclose failed (%i-%s)", errno, fd_io_strerror( errno ) ));
322 :
323 0 : FD_LOG_INFO(( "Halted replay" ));
324 0 : fd_cnc_signal( cnc, FD_CNC_SIGNAL_BOOT );
325 :
326 0 : } while(0);
327 :
328 0 : return 0;
329 0 : }
|