Line data Source code
1 : #include "../../shared/fd_config.h"
2 : #include "../../shared/fd_action.h"
3 : #include "../../../disco/metrics/fd_metrics.h"
4 : #include "../../../util/clock/fd_clock.h"
5 : #include "../../../util/net/fd_pcap.h"
6 :
7 : #include <errno.h>
8 : #include <fcntl.h>
9 : #include <signal.h>
10 : #include <stdio.h>
11 : #include <unistd.h>
12 :
13 : struct dump_ctx {
14 : fd_io_buffered_ostream_t ostream;
15 : ulong pending_sz;
16 :
17 : struct link {
18 : fd_topo_link_t const * topo;
19 : void const * dcache_base;
20 : uint link_hash;
21 : } links[ FD_TOPO_MAX_LINKS ];
22 : ulong link_cnt;
23 :
24 : ulong * metrics_base;
25 : long next_stat_log;
26 : ulong frags;
27 : ulong bytes;
28 : ulong last_overrun_frags;
29 :
30 : long warmup_until;
31 :
32 : fd_clock_t clock[ 1 ];
33 : fd_clock_shmem_t clock_mem[ 1 ];
34 : fd_clock_epoch_t clock_epoch[ 1 ];
35 : long clock_recal_next;
36 : };
37 : typedef struct dump_ctx dump_ctx_t;
38 :
39 : void
40 : dump_cmd_args( int * argc,
41 : char * * * argv,
42 0 : args_t * args ) {
43 0 : char const * out_file = fd_env_strip_cmdline_cstr( argc, argv, "--out-file", NULL, "dump.pcap" );
44 0 : char const * link = fd_env_strip_cmdline_cstr( argc, argv, "--link", NULL, "" );
45 0 : args->dump.once = fd_env_strip_cmdline_int( argc, argv, "--once", NULL, 1 );
46 0 : fd_cstr_fini( fd_cstr_append_cstr_safe( fd_cstr_init( args->dump.pcap_path ), out_file, sizeof(args->dump.pcap_path)-1UL ) );
47 0 : fd_cstr_fini( fd_cstr_append_cstr_safe( fd_cstr_init( args->dump.link_name ), link, sizeof(args->dump.link_name)-1UL ) );
48 0 : }
49 :
50 : static void
51 : dump_link_once( dump_ctx_t * ctx,
52 0 : struct link const * link ) {
53 0 : fd_frag_meta_t const * mcache = link->topo->mcache;
54 0 : ulong seq0 = fd_mcache_seq0( mcache );
55 0 : ulong seq_init = fd_mcache_seq_query( fd_mcache_seq_laddr_const( mcache ) );
56 0 : ulong depth = fd_mcache_depth( mcache );
57 :
58 0 : ulong frags = 0UL;
59 0 : ulong bytes = 0UL;
60 :
61 : /* For links that are published reliably, we can trust the value of
62 : seq_init. For unreliable links, we'll just publish one whole
63 : depth. */
64 :
65 : /* We know at this point [seq0, seq_init) were published, but they may
66 : be long overwritten, and there may be more published than that. */
67 :
68 0 : ulong min_seq_seen = seq_init; /* What we actually dumped is [min_seq_seen, seq_init) */
69 0 : for( ulong seq=fd_seq_dec( seq_init, 1UL ); fd_seq_ge( seq, seq0 ); seq=fd_seq_dec( seq, 1UL ) ) {
70 : /* It's not necessary for this to be atomic, since this is a
71 : post-mortem tool. */
72 0 : fd_frag_meta_t const * line = mcache+fd_mcache_line_idx( seq, depth );
73 0 : ulong read_seq = fd_frag_meta_seq_query( line );
74 0 : if( FD_UNLIKELY( read_seq!=seq ) ) break;
75 :
76 0 : min_seq_seen = seq;
77 :
78 0 : if( FD_LIKELY( link->dcache_base ) ) {
79 0 : ulong chunk = line->chunk;
80 0 : ulong sz = line->sz;
81 0 : void const * buffer = fd_chunk_to_laddr_const( link->dcache_base, chunk );
82 0 : fd_pcap_ostream_pkt( &ctx->ostream, (long)seq, line, sizeof(fd_frag_meta_t), buffer, sz, link->link_hash );
83 0 : bytes += sz;
84 0 : } else {
85 0 : fd_pcap_ostream_pkt( &ctx->ostream, (long)seq, line, sizeof(fd_frag_meta_t), NULL, 0, link->link_hash );
86 0 : }
87 0 : frags++;
88 0 : }
89 :
90 : /* Now check everything after seq_init. This could potentially loop
91 : forever if the producer is still going, so we cap it at one depth. */
92 0 : for( ulong off=0UL; off<depth; off++ ) {
93 0 : ulong seq = fd_seq_inc( seq_init, off );
94 :
95 0 : fd_frag_meta_t const * line = mcache+fd_mcache_line_idx( seq, depth );
96 0 : ulong read_seq = fd_frag_meta_seq_query( line );
97 :
98 : /* Skip anything we processed in the first loop, also skipping any
99 : with the err control bit set. When an mcache is initialized, all
100 : the frags have err set to true, so this will skip those in
101 : particular as well. */
102 0 : if( FD_UNLIKELY( (fd_seq_le( min_seq_seen, read_seq ) & fd_seq_lt( read_seq, seq_init )) | (line->ctl & (1<<2)) ) ) continue;
103 :
104 0 : if( FD_LIKELY( link->dcache_base ) ) {
105 0 : ulong chunk = line->chunk;
106 0 : ulong sz = line->sz;
107 0 : void const * buffer = fd_chunk_to_laddr_const( link->dcache_base, chunk );
108 0 : fd_pcap_ostream_pkt( &ctx->ostream, (long)seq, line, sizeof(fd_frag_meta_t), buffer, sz, link->link_hash );
109 0 : bytes += sz;
110 0 : } else {
111 0 : fd_pcap_ostream_pkt( &ctx->ostream, (long)seq, line, sizeof(fd_frag_meta_t), NULL, 0, link->link_hash );
112 0 : }
113 0 : frags++;
114 0 : }
115 0 : FD_LOG_NOTICE(( "dumped %lu frags, %lu bytes in total from %s:%lu. Link hash: 0x%x",
116 0 : frags, bytes, link->topo->name, link->topo->kind_id, link->link_hash ));
117 0 : }
118 :
119 : static int running = 1;
120 :
121 : static void
122 0 : exit_signal( int sig FD_PARAM_UNUSED ) {
123 0 : running = 0;
124 0 : }
125 :
126 : static int
127 0 : should_shutdown( dump_ctx_t * ctx FD_PARAM_UNUSED ) {
128 0 : return !running;
129 0 : }
130 :
131 : static void
132 : during_frag( dump_ctx_t * ctx,
133 : ulong in_idx,
134 : ulong seq,
135 : ulong sig FD_PARAM_UNUSED ,
136 : ulong chunk,
137 : ulong sz,
138 0 : ulong ctl FD_PARAM_UNUSED ) {
139 : /* We have a new candidate fragment to copy into the dump file.
140 : Because we attach read-only and do not backpressure any producers,
141 : this fragment could be overrun at any point during this function.
142 : So we copy the relevant data into local memory (in the buffered
143 : ostream peek space) and only commit it to the file later in
144 : after_frag. after_frag is only called if the fragment was not
145 : overrun while we were processing it. */
146 :
147 0 : ctx->pending_sz = 0UL;
148 :
149 0 : long pcap_sz = fd_pcap_pkt_sz( sizeof(fd_frag_meta_t), sz );
150 0 : if( FD_UNLIKELY( pcap_sz < 0L ) ) return;
151 :
152 0 : if( (ulong)pcap_sz > fd_io_buffered_ostream_peek_sz( &ctx->ostream ) ) {
153 0 : fd_io_buffered_ostream_flush( &ctx->ostream );
154 0 : if( FD_UNLIKELY( (ulong)pcap_sz > fd_io_buffered_ostream_peek_sz( &ctx->ostream ) ) ) {
155 0 : FD_LOG_ERR(( "packet size %ld too large for pcap, increase write buffer size (%lu)",
156 0 : pcap_sz, fd_io_buffered_ostream_peek_sz( &ctx->ostream ) ));
157 0 : }
158 0 : }
159 :
160 0 : FD_TEST( (sz==0UL) || (ctx->links[ in_idx ].dcache_base!=NULL) );
161 :
162 0 : fd_topo_link_t const * link = ctx->links[ in_idx ].topo;
163 0 : fd_frag_meta_t const * mline = link->mcache + fd_mcache_line_idx( seq, fd_mcache_depth( link->mcache ) );
164 0 : void const * frag_data = fd_chunk_to_laddr_const( ctx->links[ in_idx ].dcache_base, chunk );
165 :
166 0 : fd_pcap_pkt( fd_io_buffered_ostream_peek( &ctx->ostream ),
167 0 : 0L,
168 0 : mline,
169 0 : sizeof(fd_frag_meta_t),
170 0 : frag_data,
171 0 : sz,
172 0 : ctx->links[ in_idx ].link_hash );
173 0 : ctx->pending_sz = (ulong)pcap_sz;
174 0 : }
175 :
176 : static void
177 : after_frag( dump_ctx_t * ctx,
178 : ulong in_idx FD_PARAM_UNUSED,
179 : ulong seq FD_PARAM_UNUSED,
180 : ulong sig FD_PARAM_UNUSED,
181 : ulong sz,
182 : ulong tsorig FD_PARAM_UNUSED,
183 : ulong tspub,
184 0 : fd_stem_context_t * stem FD_PARAM_UNUSED ) {
185 0 : if( FD_UNLIKELY( !ctx->pending_sz ) ) return;
186 0 : long pcap_sz = fd_pcap_pkt_sz( sizeof(fd_frag_meta_t), sz );
187 0 : FD_TEST( (ulong)pcap_sz == ctx->pending_sz );
188 :
189 0 : long tickcount = fd_tickcount();
190 0 : long now = fd_clock_epoch_y( ctx->clock_epoch, tickcount );
191 0 : if( FD_LIKELY( now>ctx->warmup_until ) ) {
192 0 : if( FD_LIKELY( tspub!=0UL ) ) {
193 0 : long tspub_decomp = fd_frag_meta_ts_decomp( tspub, tickcount );
194 0 : long tspub_wallclock = fd_clock_epoch_y( ctx->clock_epoch, tspub_decomp );
195 0 : uint * pcap_tss = fd_io_buffered_ostream_peek( &ctx->ostream );
196 0 : pcap_tss[ 0 ] = (uint)(((ulong)tspub_wallclock) / (ulong)1e9);
197 0 : pcap_tss[ 1 ] = (uint)(((ulong)tspub_wallclock) % (ulong)1e9); /* Actually nsec */
198 0 : }
199 0 : fd_io_buffered_ostream_seek( &ctx->ostream, ctx->pending_sz );
200 0 : ctx->frags += 1UL;
201 0 : ctx->bytes += sz;
202 0 : }
203 0 : ctx->pending_sz = 0UL;
204 0 : }
205 :
206 : static void
207 0 : log_metrics( dump_ctx_t * ctx ) {
208 0 : FD_LOG_NOTICE(( "dumped %lu frags, %lu bytes", ctx->frags, ctx->bytes ));
209 0 : ctx->frags = 0UL;
210 0 : ctx->bytes = 0UL;
211 :
212 0 : ulong overrun_frags = 0UL;
213 0 : for( ulong i=0UL; i<ctx->link_cnt; i++) {
214 0 : volatile ulong const * link_metrics = fd_metrics_link_in( ctx->metrics_base, i );
215 0 : overrun_frags += link_metrics[ FD_METRICS_COUNTER_LINK_OVERRUN_POLLING_FRAG_COUNT_OFF ];
216 0 : overrun_frags += link_metrics[ FD_METRICS_COUNTER_LINK_OVERRUN_READING_FRAG_COUNT_OFF ];
217 0 : }
218 0 : if( FD_UNLIKELY( overrun_frags != ctx->last_overrun_frags ) ) {
219 : /* Note: We expect overruns at startup because we have no way to
220 : know the current seq to start polling from, so we start at 0
221 : which has often been overrun. */
222 0 : if( FD_LIKELY( ctx->last_overrun_frags != 0 ) ) {
223 0 : long overrun_diff = fd_seq_diff( overrun_frags, ctx->last_overrun_frags );
224 0 : FD_LOG_WARNING(( "overrun detected, %ld frags dropped", overrun_diff ));
225 0 : }
226 0 : ctx->last_overrun_frags = overrun_frags;
227 0 : }
228 0 : }
229 :
230 : static void
231 0 : during_housekeeping( dump_ctx_t * ctx ) {
232 0 : long now = fd_clock_epoch_y( ctx->clock_epoch, fd_tickcount() );
233 0 : if( FD_UNLIKELY( now > ctx->clock_recal_next ) ) {
234 0 : ctx->clock_recal_next = fd_clock_default_recal( ctx->clock );
235 0 : fd_clock_epoch_refresh( ctx->clock_epoch, ctx->clock_mem );
236 0 : }
237 0 : if( FD_UNLIKELY( now > ctx->next_stat_log ) ) {
238 0 : ctx->next_stat_log = now + (long)1e9;
239 0 : log_metrics( ctx );
240 0 : }
241 0 : }
242 :
243 : #define STEM_BURST (0UL)
244 : #define STEM_CALLBACK_CONTEXT_TYPE dump_ctx_t
245 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(dump_ctx_t)
246 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
247 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
248 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
249 0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
250 : #include "../../../disco/stem/fd_stem.c"
251 :
252 : void
253 : dump_cmd_fn( args_t * args,
254 0 : fd_config_t * config ) {
255 0 : char * tokens[ 16 ];
256 0 : ulong token_count = fd_cstr_tokenize( tokens, 16UL, args->dump.link_name, ',' );
257 :
258 0 : dump_ctx_t ctx = { 0 };
259 :
260 0 : uchar write_buf[ 131072 ];
261 0 : int fd = open( args->dump.pcap_path, O_WRONLY | O_CREAT | O_TRUNC,
262 0 : S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH );
263 0 : if( FD_UNLIKELY( fd < 0 ) )
264 0 : FD_LOG_ERR(( "open() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
265 0 : if( FD_UNLIKELY( &ctx.ostream != fd_io_buffered_ostream_init( &ctx.ostream, fd, write_buf, sizeof(write_buf) ) ) )
266 0 : FD_LOG_ERR(( "fd_io_buffered_ostream_init() failed" ));
267 :
268 0 : FD_TEST( 0==fd_pcap_ostream_hdr( &ctx.ostream, FD_PCAP_LINK_LAYER_USER0 ) );
269 :
270 0 : fd_topo_t * topo = &config->topo;
271 0 : fd_topo_join_workspaces( topo, FD_SHMEM_JOIN_MODE_READ_ONLY );
272 0 : fd_topo_fill( topo );
273 :
274 0 : for( ulong i=0UL; i<topo->link_cnt; i++) {
275 0 : fd_topo_link_t * link = &topo->links[ i ];
276 0 : int found = (token_count==0UL);
277 0 : for( ulong j=0UL; (!found)&(j<token_count); j++ ) found |= !strcmp( tokens[ j ], topo->links[ i ].name );
278 0 : if( found ) {
279 0 : if( FD_UNLIKELY( NULL==link->mcache ) )
280 0 : FD_LOG_ERR(( "link %s:%lu mcache is null", link->name, link->kind_id ));
281 0 : ctx.links[ ctx.link_cnt ].topo = link;
282 0 : ctx.links[ ctx.link_cnt ].dcache_base = link->mtu ? fd_topo_obj_wksp_base( topo, link->dcache_obj_id ) : NULL;
283 0 : ctx.links[ ctx.link_cnt ].link_hash = (uint)((fd_hash( 17UL, link->name, strlen( link->name ) ) << 8) | link->kind_id);
284 0 : ctx.link_cnt++;
285 0 : }
286 0 : }
287 :
288 0 : if( args->dump.once ) {
289 0 : for( ulong i=0UL; i<ctx.link_cnt; i++ ) {
290 0 : dump_link_once( &ctx, &ctx.links[ i ] );
291 0 : }
292 0 : } else {
293 0 : struct sigaction sa = {
294 0 : .sa_handler = exit_signal,
295 0 : .sa_flags = 0,
296 0 : };
297 0 : if( FD_UNLIKELY( sigaction( SIGTERM, &sa, NULL ) ) )
298 0 : FD_LOG_ERR(( "sigaction(SIGTERM) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
299 0 : if( FD_UNLIKELY( sigaction( SIGINT, &sa, NULL ) ) )
300 0 : FD_LOG_ERR(( "sigaction(SIGINT) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
301 :
302 0 : fd_frag_meta_t const * mcaches[ FD_TOPO_MAX_LINKS ];
303 0 : for( ulong i=0UL; i<ctx.link_cnt; i++ ) {
304 0 : mcaches[ i ] = ctx.links[ i ].topo->mcache;
305 0 : }
306 :
307 0 : uchar fseq_mem[ FD_TOPO_MAX_LINKS ][ FD_FSEQ_FOOTPRINT ] __attribute__((aligned((FD_FSEQ_ALIGN))));
308 0 : ulong * fseqs[ FD_TOPO_MAX_LINKS ];
309 0 : for( ulong i=0UL; i<ctx.link_cnt; i++ ) {
310 0 : fseqs[ i ] = fd_fseq_join( fd_fseq_new( fseq_mem[ i ], 0UL ) );
311 0 : }
312 :
313 0 : fd_rng_t _rng[1];
314 0 : fd_rng_t * rng = fd_rng_join( fd_rng_new( _rng, (uint)fd_tickcount(), 0UL ) );
315 :
316 0 : uchar * scratch = fd_alloca( FD_STEM_SCRATCH_ALIGN, stem_scratch_footprint( ctx.link_cnt, 0UL, 0UL ) );
317 :
318 0 : ctx.metrics_base = fd_metrics_join( fd_metrics_new( fd_alloca( FD_METRICS_ALIGN, FD_METRICS_FOOTPRINT( ctx.link_cnt, 0UL ) ), ctx.link_cnt, 0UL ) );
319 0 : fd_metrics_register( ctx.metrics_base );
320 :
321 : /* The purpose of the warmup period is to ensure that all frags in
322 : the pcap were from after the dump was started. fd_stem currently
323 : has no way to start from the most recently published frags when
324 : joining links, so at startup we get a full ~depth set of callbacks
325 : for old frags. This code assumes that we are caught up and only
326 : processing new frags by the time the warmup timer expires. */
327 0 : fd_clock_default_init( ctx.clock, ctx.clock_mem );
328 0 : ctx.clock_recal_next = fd_clock_default_recal( ctx.clock );
329 0 : fd_clock_epoch_init( ctx.clock_epoch, ctx.clock_mem );
330 0 : ctx.warmup_until = fd_clock_epoch_y( ctx.clock_epoch, fd_tickcount() ) + (long)1e9;
331 0 : ctx.next_stat_log = ctx.warmup_until + (long)1e9;
332 :
333 0 : stem_run1( ctx.link_cnt, /* in_cnt */
334 0 : mcaches, /* in_mcache */
335 0 : fseqs, /* in_fseq */
336 0 : 0UL, /* out_cnt */
337 0 : NULL, /* out_mcache */
338 0 : 0UL, /* cons_cnt */
339 0 : NULL, /* _cons_out */
340 0 : NULL, /* _cons_fseq */
341 0 : 0UL, /* burst */
342 0 : 0UL, /* lazy */
343 0 : rng, /* rng */
344 0 : scratch, /* scratch */
345 0 : &ctx ); /* ctx */
346 :
347 0 : for( ulong i=0UL; i<ctx.link_cnt; i++ ) {
348 0 : struct link const * link = &ctx.links[ i ];
349 0 : volatile ulong const * link_metrics = fd_metrics_link_in( ctx.metrics_base, i );
350 0 : ulong frags = link_metrics[ FD_METRICS_COUNTER_LINK_CONSUMED_COUNT_OFF ];
351 0 : ulong bytes = link_metrics[ FD_METRICS_COUNTER_LINK_CONSUMED_SIZE_BYTES_OFF ];
352 0 : FD_LOG_NOTICE(( "dumped %lu frags, %lu bytes in total from %s:%lu. Link hash: 0x%x",
353 0 : frags, bytes, link->topo->name, link->topo->kind_id, link->link_hash ));
354 0 : }
355 :
356 0 : fd_clock_leave( ctx.clock );
357 0 : fd_clock_delete( ctx.clock_mem );
358 :
359 0 : fd_metrics_delete( fd_metrics_leave( ctx.metrics_base ) );
360 0 : ctx.metrics_base = NULL;
361 :
362 0 : fd_rng_delete( fd_rng_leave( rng ) );
363 :
364 0 : for( ulong i=0UL; i<ctx.link_cnt; i++ ) {
365 0 : fd_fseq_delete( fd_fseq_leave( fseqs[ i ] ) );
366 0 : }
367 0 : }
368 :
369 0 : fd_topo_leave_workspaces( topo );
370 :
371 0 : fd_io_buffered_ostream_flush( &ctx.ostream );
372 0 : fd_io_buffered_ostream_fini( &ctx.ostream );
373 0 : close( fd );
374 0 : }
375 :
376 : action_t fd_action_dump = {
377 : .name = "dump",
378 : .args = dump_cmd_args,
379 : .fn = dump_cmd_fn,
380 : .perm = NULL,
381 : .description = "Dump tango links to a packet capture file",
382 : .is_diagnostic = 1
383 : };
|