Line data Source code
1 : #define _GNU_SOURCE /* Enable GNU and POSIX extensions */
2 : #include "../../disco/tiles.h"
3 : #include "../../disco/topo/fd_topo.h"
4 : #include "../../disco/net/fd_net_tile.h"
5 : #include "../../flamenco/types/fd_types.h"
6 : #include "../../flamenco/fd_flamenco_base.h"
7 : #include "../../disco/fd_disco.h"
8 :
9 : #include <errno.h>
10 : #include <fcntl.h>
11 : #include <sys/mman.h>
12 : #include <sys/stat.h>
13 : #include <string.h>
14 : #include <stdio.h>
15 : #include <unistd.h>
16 : #include <linux/unistd.h>
17 : #include <sys/socket.h>
18 : #include <linux/if_xdp.h>
19 : #include "generated/fd_shredcap_tile_seccomp.h"
20 :
21 :
22 : /* This tile spies on the net_shred, repair_net, and shred_repair links
23 : and currently outputs to a csv that can analyze repair performance
24 : in post. */
25 :
26 0 : #define FD_SHREDCAP_DEFAULT_WRITER_BUF_SZ (4096UL) /* local filesystem block size */
27 0 : #define FD_SHREDCAP_ALLOC_TAG (4UL)
28 : #define MAX_BUFFER_SIZE ( 20000UL * sizeof(fd_shred_dest_wire_t))
29 :
30 0 : #define NET_SHRED (0UL)
31 0 : #define REPAIR_NET (1UL)
32 0 : #define SHRED_REPAIR (2UL)
33 0 : #define GOSSIP_SHRED (3UL)
34 0 : #define GOSSIP_REPAIR (4UL)
35 :
36 : typedef union {
37 : struct {
38 : fd_wksp_t * mem;
39 : ulong chunk0;
40 : ulong wmark;
41 : };
42 : fd_net_rx_bounds_t net_rx;
43 : } fd_capture_in_ctx_t;
44 :
45 : struct fd_capture_tile_ctx {
46 : uchar in_kind[ 32 ];
47 : fd_capture_in_ctx_t in_links[ 32 ];
48 :
49 : int skip_frag;
50 : ushort repair_intake_listen_port;
51 :
52 : ulong shred_buffer_sz;
53 : uchar shred_buffer[ FD_NET_MTU ];
54 :
55 : ulong repair_buffer_sz;
56 : uchar repair_buffer[ FD_NET_MTU ];
57 :
58 :
59 : fd_ip4_udp_hdrs_t intake_hdr[1];
60 :
61 : ulong now;
62 : ulong last_packet_ns;
63 : double tick_per_ns;
64 :
65 : fd_io_buffered_ostream_t shred_ostream;
66 : fd_io_buffered_ostream_t repair_ostream;
67 : fd_io_buffered_ostream_t fecs_ostream;
68 : fd_io_buffered_ostream_t peers_ostream;
69 :
70 : int shreds_fd;
71 : int requests_fd;
72 : int fecs_fd;
73 : int peers_fd;
74 :
75 : ulong write_buf_sz;
76 :
77 : uchar * shreds_buf;
78 : uchar * requests_buf;
79 : uchar * fecs_buf;
80 : uchar * peers_buf;
81 :
82 : fd_alloc_t * alloc;
83 : uchar contact_info_buffer[ MAX_BUFFER_SIZE ];
84 : };
85 : typedef struct fd_capture_tile_ctx fd_capture_tile_ctx_t;
86 :
87 : FD_FN_CONST static inline ulong
88 0 : scratch_align( void ) {
89 0 : return 4096UL;
90 0 : }
91 :
92 : FD_FN_PURE static inline ulong
93 0 : loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
94 0 : return 1UL * FD_SHMEM_GIGANTIC_PAGE_SZ;
95 0 : }
96 :
97 : static ulong
98 : populate_allowed_seccomp( fd_topo_t const * topo FD_PARAM_UNUSED,
99 : fd_topo_tile_t const * tile,
100 : ulong out_cnt,
101 0 : struct sock_filter * out ) {
102 0 : populate_sock_filter_policy_fd_shredcap_tile( out_cnt,
103 0 : out,
104 0 : (uint)fd_log_private_logfile_fd(),
105 0 : (uint)tile->shredcap.shreds_fd,
106 0 : (uint)tile->shredcap.requests_fd,
107 0 : (uint)tile->shredcap.fecs_fd,
108 0 : (uint)tile->shredcap.peers_fd );
109 0 : return sock_filter_policy_fd_shredcap_tile_instr_cnt;
110 0 : }
111 :
112 :
113 : FD_FN_PURE static inline ulong
114 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
115 0 : (void)tile;
116 0 : ulong l = FD_LAYOUT_INIT;
117 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_capture_tile_ctx_t), sizeof(fd_capture_tile_ctx_t) );
118 0 : l = FD_LAYOUT_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
119 0 : return FD_LAYOUT_FINI( l, scratch_align() );
120 0 : }
121 :
122 : static inline int
123 : before_frag( fd_capture_tile_ctx_t * ctx,
124 : ulong in_idx,
125 : ulong seq FD_PARAM_UNUSED,
126 0 : ulong sig ) {
127 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==NET_SHRED ) ) {
128 0 : return (fd_disco_netmux_sig_proto( sig )!=DST_PROTO_SHRED) & (fd_disco_netmux_sig_proto( sig )!=DST_PROTO_REPAIR);
129 0 : }
130 0 : return 0;
131 0 : }
132 :
133 : static inline void
134 : handle_new_turbine_contact_info( fd_capture_tile_ctx_t * ctx,
135 0 : uchar const * buf ) {
136 0 : ulong const * header = (ulong const *)fd_type_pun_const( buf );
137 0 : ulong dest_cnt = header[ 0 ];
138 :
139 0 : fd_shred_dest_wire_t const * in_dests = fd_type_pun_const( header+1UL );
140 :
141 0 : for( ulong i=0UL; i<dest_cnt; i++ ) {
142 : // need to bswap the port
143 : //ushort port = fd_ushort_bswap( in_dests[i].udp_port );
144 0 : char peers_buf[1024];
145 0 : snprintf( peers_buf, sizeof(peers_buf),
146 0 : "%u,%u,%s,%d\n",
147 0 : in_dests[i].ip4_addr, in_dests[i].udp_port, FD_BASE58_ENC_32_ALLOCA(in_dests[i].pubkey), 1);
148 0 : int err = fd_io_buffered_ostream_write( &ctx->peers_ostream, peers_buf, strlen(peers_buf) );
149 0 : FD_TEST( err==0 );
150 0 : }
151 0 : }
152 :
153 :
154 : static int
155 0 : is_fec_completes_msg( ulong sz ) {
156 0 : return sz == FD_SHRED_DATA_HEADER_SZ + FD_SHRED_MERKLE_ROOT_SZ;
157 0 : }
158 :
159 : static inline void
160 : during_frag( fd_capture_tile_ctx_t * ctx,
161 : ulong in_idx,
162 : ulong seq FD_PARAM_UNUSED,
163 : ulong sig FD_PARAM_UNUSED,
164 : ulong chunk,
165 : ulong sz,
166 0 : ulong ctl FD_PARAM_UNUSED ) {
167 0 : ctx->skip_frag = 0;
168 0 : if( ctx->in_kind[ in_idx ]==SHRED_REPAIR ) {
169 0 : if( !is_fec_completes_msg( sz ) ) {
170 0 : ctx->skip_frag = 1;
171 0 : return;
172 0 : }
173 0 : fd_memcpy( ctx->shred_buffer, fd_chunk_to_laddr_const( ctx->in_links[ in_idx ].mem, chunk ), sz );
174 0 : ctx->shred_buffer_sz = sz;
175 0 : } else if( ctx->in_kind[ in_idx ] == NET_SHRED ) {
176 0 : uchar const * dcache_entry = fd_net_rx_translate_frag( &ctx->in_links[ in_idx ].net_rx, chunk, ctl, sz );
177 0 : ulong hdr_sz = fd_disco_netmux_sig_hdr_sz( sig );
178 0 : FD_TEST( hdr_sz <= sz ); /* Should be ensured by the net tile */
179 0 : fd_shred_t const * shred = fd_shred_parse( dcache_entry+hdr_sz, sz-hdr_sz );
180 0 : if( FD_UNLIKELY( !shred ) ) {
181 0 : ctx->skip_frag = 1;
182 0 : return;
183 0 : };
184 0 : fd_memcpy( ctx->shred_buffer, dcache_entry, sz );
185 0 : ctx->shred_buffer_sz = sz-hdr_sz;
186 0 : } else if( ctx->in_kind[ in_idx ] == REPAIR_NET ) {
187 : /* Repair will have outgoing pings, outgoing repair requests, and
188 : outgoing served shreds we want to filter everything but the
189 : repair requests.
190 : 1. We can index into the ip4 udp packet hdr and check if the src
191 : port is the intake listen port or serve port
192 : 2. Then we can filter on the discriminant which luckily does not
193 : require decoding! */
194 :
195 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in_links[ in_idx ].mem, chunk );
196 0 : fd_ip4_udp_hdrs_t const * hdr = (fd_ip4_udp_hdrs_t const *)dcache_entry;
197 0 : if( hdr->udp->net_sport != fd_ushort_bswap( ctx->repair_intake_listen_port ) ) {
198 0 : ctx->skip_frag = 1;
199 0 : return;
200 0 : }
201 0 : const uchar * encoded_protocol = dcache_entry + sizeof(fd_ip4_udp_hdrs_t);
202 0 : uint discriminant = FD_LOAD(uint, encoded_protocol);
203 :
204 0 : if( FD_UNLIKELY( discriminant <= fd_repair_protocol_enum_pong ) ) {
205 0 : ctx->skip_frag = 1;
206 0 : return;
207 0 : }
208 0 : fd_memcpy( ctx->repair_buffer, dcache_entry, sz );
209 0 : ctx->repair_buffer_sz = sz;
210 0 : } else {
211 : // contact infos can be copied into a buffer
212 0 : if( FD_UNLIKELY( chunk<ctx->in_links[ in_idx ].chunk0 || chunk>ctx->in_links[ in_idx ].wmark ) ) {
213 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
214 0 : ctx->in_links[ in_idx ].chunk0, ctx->in_links[ in_idx ].wmark ));
215 0 : }
216 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in_links[ in_idx ].mem, chunk );
217 0 : fd_memcpy( ctx->contact_info_buffer, dcache_entry, sz );
218 0 : }
219 0 : }
220 :
221 : static inline void
222 : after_frag( fd_capture_tile_ctx_t * ctx,
223 : ulong in_idx,
224 : ulong seq FD_PARAM_UNUSED,
225 : ulong sig,
226 : ulong sz,
227 : ulong tsorig FD_PARAM_UNUSED,
228 : ulong tspub FD_PARAM_UNUSED,
229 0 : fd_stem_context_t * stem FD_PARAM_UNUSED ) {
230 0 : if( FD_UNLIKELY( ctx->skip_frag ) ) return;
231 :
232 0 : if( ctx->in_kind[ in_idx ] == SHRED_REPAIR ) {
233 : /* This is a fec completes message! we can use it to check how long
234 : it takes to complete a fec */
235 :
236 0 : fd_shred_t const * shred = (fd_shred_t *)fd_type_pun( ctx->shred_buffer );
237 0 : uint data_cnt = fd_disco_shred_repair_fec_sig_data_cnt( sig );
238 0 : uint ref_tick = shred->data.flags & FD_SHRED_DATA_REF_TICK_MASK;
239 0 : char fec_complete[1024];
240 0 : snprintf( fec_complete, sizeof(fec_complete),
241 0 : "%ld,%lu,%u,%u,%u\n",
242 0 : fd_log_wallclock(), shred->slot, ref_tick, shred->fec_set_idx, data_cnt );
243 :
244 : // Last shred is guaranteed to be a data shred
245 :
246 :
247 0 : int err = fd_io_buffered_ostream_write( &ctx->fecs_ostream, fec_complete, strlen(fec_complete) );
248 0 : FD_TEST( err==0 );
249 0 : } else if( ctx->in_kind[ in_idx ] == NET_SHRED ) {
250 : /* TODO: leader schedule early exits in shred tile right around
251 : startup, which discards some turbine shreds, but there is a
252 : chance we capture this shred here. Currently handled in post, but
253 : in the future will want to get the leader schedule here so we can
254 : also benchmark whether the excepcted sender in the turbine tree
255 : matches the actual sender. */
256 :
257 0 : ulong hdr_sz = fd_disco_netmux_sig_hdr_sz( sig );
258 0 : fd_ip4_udp_hdrs_t * hdr = (fd_ip4_udp_hdrs_t *)ctx->shred_buffer;
259 0 : uint src_ip4_addr = hdr->ip4->saddr;
260 0 : ushort src_port = hdr->udp->net_sport;
261 :
262 0 : fd_shred_t const * shred = fd_shred_parse( ctx->shred_buffer + hdr_sz, sz - hdr_sz );
263 0 : int is_turbine = fd_disco_netmux_sig_proto( sig ) == DST_PROTO_SHRED;
264 0 : uint nonce = is_turbine ? 0 : FD_LOAD(uint, ctx->shred_buffer + hdr_sz + fd_shred_sz( shred ) );
265 0 : int is_data = fd_shred_is_data( fd_shred_type( shred->variant ) );
266 0 : ulong slot = shred->slot;
267 0 : uint idx = shred->idx;
268 0 : uint fec_idx = shred->fec_set_idx;
269 0 : uint ref_tick = 65;
270 0 : if( FD_UNLIKELY( is_turbine && is_data ) ) {
271 : /* We can then index into the flag and get a REFTICK */
272 0 : ref_tick = shred->data.flags & FD_SHRED_DATA_REF_TICK_MASK;
273 0 : }
274 :
275 0 : char repair_data_buf[1024];
276 0 : snprintf( repair_data_buf, sizeof(repair_data_buf),
277 0 : "%u,%u,%ld,%lu,%u,%u,%u,%d,%d,%u\n",
278 0 : src_ip4_addr, src_port, fd_log_wallclock(), slot, ref_tick, fec_idx, idx, is_turbine, is_data, nonce );
279 :
280 0 : int err = fd_io_buffered_ostream_write( &ctx->shred_ostream, repair_data_buf, strlen(repair_data_buf) );
281 0 : FD_TEST( err==0 );
282 0 : } else if( ctx->in_kind[ in_idx ] == REPAIR_NET ) {
283 : /* We have a valid repair request that we can finally decode.
284 : Unfortunately we actually have to decode because we cant cast
285 : directly to the protocol */
286 0 : fd_ip4_udp_hdrs_t * hdr = (fd_ip4_udp_hdrs_t *)ctx->repair_buffer;
287 0 : fd_repair_protocol_t protocol;
288 0 : fd_bincode_decode_ctx_t bctx = { .data = ctx->repair_buffer + sizeof(fd_ip4_udp_hdrs_t), .dataend = ctx->repair_buffer + ctx->repair_buffer_sz };
289 0 : fd_repair_protocol_t * decoded = fd_repair_protocol_decode( &protocol, &bctx );
290 :
291 0 : FD_TEST( decoded == &protocol );
292 0 : FD_TEST( decoded != NULL );
293 :
294 0 : uint peer_ip4_addr = hdr->ip4->daddr;
295 0 : ushort peer_port = hdr->udp->net_dport;
296 0 : ulong slot = 0UL;
297 0 : ulong shred_index = UINT_MAX;
298 0 : uint nonce = 0U;
299 :
300 0 : switch( protocol.discriminant ) {
301 0 : case fd_repair_protocol_enum_window_index: {
302 0 : slot = protocol.inner.window_index.slot;
303 0 : shred_index = protocol.inner.window_index.shred_index;
304 0 : nonce = protocol.inner.window_index.header.nonce;
305 0 : break;
306 0 : }
307 0 : case fd_repair_protocol_enum_highest_window_index: {
308 0 : slot = protocol.inner.highest_window_index.slot;
309 0 : shred_index = protocol.inner.highest_window_index.shred_index;
310 0 : nonce = protocol.inner.highest_window_index.header.nonce;
311 0 : break;
312 0 : }
313 0 : case fd_repair_protocol_enum_orphan: {
314 0 : slot = protocol.inner.orphan.slot;
315 0 : nonce = protocol.inner.orphan.header.nonce;
316 0 : break;
317 0 : }
318 0 : default:
319 0 : break;
320 0 : }
321 :
322 0 : char repair_data_buf[1024];
323 0 : snprintf( repair_data_buf, sizeof(repair_data_buf),
324 0 : "%u,%u,%ld,%u,%lu,%lu\n",
325 0 : peer_ip4_addr, peer_port, fd_log_wallclock(), nonce, slot, shred_index );
326 0 : int err = fd_io_buffered_ostream_write( &ctx->repair_ostream, repair_data_buf, strlen(repair_data_buf) );
327 0 : FD_TEST( err==0 );
328 0 : } else if( ctx->in_kind[ in_idx ] == GOSSIP_REPAIR ) {
329 0 : fd_shred_dest_wire_t const * in_dests = (fd_shred_dest_wire_t const *)fd_type_pun_const( ctx->contact_info_buffer );
330 0 : ulong dest_cnt = sz;
331 0 : for( ulong i=0UL; i<dest_cnt; i++ ) {
332 0 : char peers_buf[1024];
333 0 : snprintf( peers_buf, sizeof(peers_buf),
334 0 : "%u,%u,%s,%d\n",
335 0 : in_dests[i].ip4_addr, in_dests[i].udp_port, FD_BASE58_ENC_32_ALLOCA(in_dests[i].pubkey), 0);
336 0 : int err = fd_io_buffered_ostream_write( &ctx->peers_ostream, peers_buf, strlen(peers_buf) );
337 0 : FD_TEST( err==0 );
338 0 : }
339 0 : } else { // crds_shred contact infos
340 0 : handle_new_turbine_contact_info( ctx, ctx->contact_info_buffer );
341 0 : }
342 0 : }
343 :
344 : static ulong
345 : populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED,
346 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
347 : ulong out_fds_cnt FD_PARAM_UNUSED,
348 0 : int * out_fds ) {
349 0 : ulong out_cnt = 0UL;
350 :
351 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
352 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
353 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
354 0 : if( FD_LIKELY( -1!=tile->shredcap.shreds_fd ) )
355 0 : out_fds[ out_cnt++ ] = tile->shredcap.shreds_fd; /* shred file */
356 0 : if( FD_LIKELY( -1!=tile->shredcap.requests_fd ) )
357 0 : out_fds[ out_cnt++ ] = tile->shredcap.requests_fd; /* request file */
358 0 : if( FD_LIKELY( -1!=tile->shredcap.fecs_fd ) )
359 0 : out_fds[ out_cnt++ ] = tile->shredcap.fecs_fd; /* fec complete file */
360 0 : if( FD_LIKELY( -1!=tile->shredcap.peers_fd ) )
361 0 : out_fds[ out_cnt++ ] = tile->shredcap.peers_fd; /* peers file */
362 :
363 0 : return out_cnt;
364 0 : }
365 :
366 : static void
367 : privileged_init( fd_topo_t * topo FD_PARAM_UNUSED,
368 0 : fd_topo_tile_t * tile ) {
369 0 : char file_path[PATH_MAX];
370 0 : strcpy( file_path, tile->shredcap.folder_path );
371 0 : strcat( file_path, "/shred_data.csv" );
372 0 : tile->shredcap.shreds_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
373 0 : if ( FD_UNLIKELY( tile->shredcap.shreds_fd == -1 ) ) {
374 0 : FD_LOG_ERR(( "failed to open or create shred csv dump file %s %d %s", file_path, errno, strerror(errno) ));
375 0 : }
376 :
377 0 : strcpy( file_path, tile->shredcap.folder_path );
378 0 : strcat( file_path, "/request_data.csv" );
379 0 : tile->shredcap.requests_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
380 0 : if ( FD_UNLIKELY( tile->shredcap.requests_fd == -1 ) ) {
381 0 : FD_LOG_ERR(( "failed to open or create request csv dump file %s %d %s", file_path, errno, strerror(errno) ));
382 0 : }
383 :
384 0 : strcpy( file_path, tile->shredcap.folder_path );
385 0 : strcat( file_path, "/fec_complete.csv" );
386 0 : tile->shredcap.fecs_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
387 0 : if ( FD_UNLIKELY( tile->shredcap.fecs_fd == -1 ) ) {
388 0 : FD_LOG_ERR(( "failed to open or create fec complete csv dump file %s %d %s", file_path, errno, strerror(errno) ));
389 0 : }
390 0 : FD_LOG_NOTICE(( "Opening shred csv dump file at %s", file_path ));
391 :
392 0 : strcpy( file_path, tile->shredcap.folder_path );
393 0 : strcat( file_path, "/peers.csv" );
394 0 : tile->shredcap.peers_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
395 0 : if ( FD_UNLIKELY( tile->shredcap.peers_fd == -1 ) ) {
396 0 : FD_LOG_ERR(( "failed to open or create peers csv dump file %s %d %s", file_path, errno, strerror(errno) ));
397 0 : }
398 0 : }
399 :
400 : static void
401 : init_file_handlers( fd_capture_tile_ctx_t * ctx,
402 : int * ctx_file,
403 : int tile_file,
404 : uchar ** ctx_buf,
405 0 : fd_io_buffered_ostream_t * ctx_ostream ) {
406 0 : *ctx_file = tile_file ;
407 :
408 0 : int err = ftruncate( *ctx_file, 0UL );
409 0 : if( FD_UNLIKELY( err ) ) {
410 0 : FD_LOG_ERR(( "failed to truncate file (%i-%s)", errno, fd_io_strerror( errno ) ));
411 0 : }
412 0 : long seek = lseek( *ctx_file, 0UL, SEEK_SET );
413 0 : if( FD_UNLIKELY( seek!=0L ) ) {
414 0 : FD_LOG_ERR(( "failed to seek to the beginning of file" ));
415 0 : }
416 :
417 0 : *ctx_buf = fd_alloc_malloc( ctx->alloc, 4096, ctx->write_buf_sz );
418 0 : if( FD_UNLIKELY( *ctx_buf == NULL ) ) {
419 0 : FD_LOG_ERR(( "failed to allocate ostream buffer" ));
420 0 : }
421 :
422 0 : if( FD_UNLIKELY( !fd_io_buffered_ostream_init(
423 0 : ctx_ostream,
424 0 : *ctx_file,
425 0 : *ctx_buf,
426 0 : ctx->write_buf_sz ) ) ) {
427 0 : FD_LOG_ERR(( "failed to initialize ostream" ));
428 0 : }
429 0 : }
430 :
431 :
432 : static void
433 : unprivileged_init( fd_topo_t * topo,
434 0 : fd_topo_tile_t * tile ) {
435 :
436 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
437 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
438 0 : fd_capture_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_capture_tile_ctx_t), sizeof(fd_capture_tile_ctx_t) );
439 0 : void * alloc_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
440 0 : FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
441 :
442 : /* Input links */
443 0 : for( ulong i=0; i<tile->in_cnt; i++ ) {
444 0 : fd_topo_link_t * link = &topo->links[ tile->in_link_id[ i ] ];
445 0 : fd_topo_wksp_t * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ];
446 0 : if( 0==strcmp( link->name, "net_shred" ) ) {
447 0 : ctx->in_kind[ i ] = NET_SHRED;
448 0 : fd_net_rx_bounds_init( &ctx->in_links[ i ].net_rx, link->dcache );
449 0 : continue;
450 0 : } else if( 0==strcmp( link->name, "repair_net" ) ) {
451 0 : ctx->in_kind[ i ] = REPAIR_NET;
452 0 : } else if( 0==strcmp( link->name, "shred_repair" ) ) {
453 0 : ctx->in_kind[ i ] = SHRED_REPAIR;
454 0 : } else if( 0==strcmp( link->name, "crds_shred" ) ) {
455 0 : ctx->in_kind[ i ] = GOSSIP_SHRED;
456 0 : } else if( 0==strcmp( link->name, "gossip_repai" ) ) {
457 0 : ctx->in_kind[ i ] = GOSSIP_REPAIR;
458 0 : } else {
459 0 : FD_LOG_ERR(( "repair tile has unexpected input link %s", link->name ));
460 0 : }
461 :
462 0 : ctx->in_links[ i ].mem = link_wksp->wksp;
463 0 : ctx->in_links[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->in_links[ i ].mem, link->dcache );
464 0 : ctx->in_links[ i ].wmark = fd_dcache_compact_wmark ( ctx->in_links[ i ].mem, link->dcache, link->mtu );
465 0 : }
466 :
467 0 : ctx->repair_intake_listen_port = tile->shredcap.repair_intake_listen_port;
468 0 : ctx->write_buf_sz = tile->shredcap.write_buffer_size ? tile->shredcap.write_buffer_size : FD_SHREDCAP_DEFAULT_WRITER_BUF_SZ;
469 :
470 : /* Allocate the write buffers */
471 0 : ctx->alloc = fd_alloc_join( fd_alloc_new( alloc_mem, FD_SHREDCAP_ALLOC_TAG ), fd_tile_idx() );
472 0 : if( FD_UNLIKELY( !ctx->alloc ) ) {
473 0 : FD_LOG_ERR( ( "fd_alloc_join failed" ) );
474 0 : }
475 :
476 : /* Setup the csv files to be in the expected state */
477 :
478 0 : init_file_handlers( ctx, &ctx->shreds_fd, tile->shredcap.shreds_fd, &ctx->shreds_buf, &ctx->shred_ostream );
479 0 : init_file_handlers( ctx, &ctx->requests_fd, tile->shredcap.requests_fd, &ctx->requests_buf, &ctx->repair_ostream );
480 0 : init_file_handlers( ctx, &ctx->fecs_fd, tile->shredcap.fecs_fd, &ctx->fecs_buf, &ctx->fecs_ostream );
481 0 : init_file_handlers( ctx, &ctx->peers_fd, tile->shredcap.peers_fd, &ctx->peers_buf, &ctx->peers_ostream );
482 :
483 0 : int err = fd_io_buffered_ostream_write( &ctx->shred_ostream, "src_ip,src_port,timestamp,slot,ref_tick,fec_set_idx,idx,is_turbine,is_data,nonce\n", 81UL );
484 0 : err |= fd_io_buffered_ostream_write( &ctx->repair_ostream, "dst_ip,dst_port,timestamp,nonce,slot,idx\n", 41UL );
485 0 : err |= fd_io_buffered_ostream_write( &ctx->fecs_ostream, "timestamp,slot,ref_tick,fec_set_idx,data_cnt\n", 45UL );
486 0 : err |= fd_io_buffered_ostream_write( &ctx->peers_ostream, "peer_ip4_addr,peer_port,pubkey,turbine\n", 48UL );
487 :
488 0 : if( FD_UNLIKELY( err ) ) {
489 0 : FD_LOG_ERR(( "failed to write header to any of the 4 files (%i-%s)", errno, fd_io_strerror( errno ) ));
490 0 : }
491 0 : }
492 :
493 0 : #define STEM_BURST (1UL)
494 0 : #define STEM_LAZY (50UL)
495 :
496 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_capture_tile_ctx_t
497 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_capture_tile_ctx_t)
498 :
499 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
500 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
501 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
502 :
503 : #include "../../disco/stem/fd_stem.c"
504 :
505 : fd_topo_run_tile_t fd_tile_shredcap = {
506 : .name = "shrdcp",
507 : .loose_footprint = loose_footprint,
508 : .populate_allowed_seccomp = populate_allowed_seccomp,
509 : .populate_allowed_fds = populate_allowed_fds,
510 : .scratch_align = scratch_align,
511 : .scratch_footprint = scratch_footprint,
512 : .privileged_init = privileged_init,
513 : .unprivileged_init = unprivileged_init,
514 : .run = stem_run,
515 : };
|