Line data Source code
1 : #define _GNU_SOURCE /* Enable GNU and POSIX extensions */
2 : #include "../../disco/topo/fd_topo.h"
3 : #include "../../disco/net/fd_net_tile.h"
4 : #include "../../flamenco/types/fd_types.h"
5 : #include "../../flamenco/fd_flamenco_base.h"
6 : #include "../../util/pod/fd_pod_format.h"
7 : #include "../../flamenco/gossip/fd_gossip_types.h"
8 : #include "../../disco/fd_disco.h"
9 : #include "../../discof/fd_discof.h"
10 : #include "../../discof/repair/fd_repair.h"
11 : #include "../../discof/replay/fd_replay_tile.h"
12 : #include "../../discof/replay/fd_exec.h"
13 : #include "../../discof/restore/utils/fd_ssmsg.h"
14 : #include "../../discof/restore/utils/fd_ssmanifest_parser.h"
15 : #include "../../flamenco/runtime/sysvar/fd_sysvar_epoch_schedule.h"
16 : #include "../../disco/fd_disco.h"
17 : #include "../../util/pod/fd_pod_format.h"
18 :
19 : #include <errno.h>
20 : #include <fcntl.h>
21 : #include <sys/mman.h>
22 : #include <sys/stat.h>
23 : #include <string.h>
24 : #include <stdio.h>
25 : #include <stdlib.h>
26 : #include <unistd.h>
27 : #include <sys/socket.h>
28 : #include "generated/fd_shredcap_tile_seccomp.h"
29 :
30 :
31 : /* This tile currently has two functionalities.
32 :
33 : The first is spying on the net_shred, repair_net, and shred_out
34 : links and currently outputs to a csv that can analyze repair
35 : performance in post.
36 :
37 : The second is to capture the bank hashes from the replay tile and
38 : slices of shreds from the repair tile. These are outputted to binary
39 : files that can be used to reproduce a live replay execution. */
40 :
41 0 : #define FD_SHREDCAP_DEFAULT_WRITER_BUF_SZ (4096UL) /* local filesystem block size */
42 0 : #define FD_SHREDCAP_ALLOC_TAG (4UL)
43 : #define MAX_BUFFER_SIZE (20000UL * sizeof(fd_shred_dest_wire_t))
44 0 : #define MANIFEST_MAX_TOTAL_BANKS (2UL) /* the minimum is 2 */
45 0 : #define MANIFEST_MAX_FORK_WIDTH (1UL) /* banks are only needed during publish_stake_weights() */
46 :
47 0 : #define NET_SHRED (0UL)
48 0 : #define REPAIR_NET (1UL)
49 0 : #define SHRED_OUT (2UL)
50 0 : #define GOSSIP_OUT (3UL)
51 0 : #define REPAIR_SHREDCAP (4UL)
52 0 : #define REPLAY_OUT (5UL)
53 :
54 : typedef union {
55 : struct {
56 : fd_wksp_t * mem;
57 : ulong chunk0;
58 : ulong wmark;
59 : };
60 : fd_net_rx_bounds_t net_rx;
61 : } fd_capture_in_ctx_t;
62 :
63 : struct out_link {
64 : ulong idx;
65 : fd_frag_meta_t * mcache;
66 : ulong * sync;
67 : ulong depth;
68 : ulong seq;
69 : fd_wksp_t * mem;
70 : ulong chunk0;
71 : ulong wmark;
72 : ulong chunk;
73 : };
74 : typedef struct out_link out_link_t;
75 :
76 : struct fd_capture_tile_ctx {
77 : uchar in_kind[ 32 ];
78 : fd_capture_in_ctx_t in_links[ 32 ];
79 :
80 : int skip_frag;
81 : ushort repair_intake_listen_port;
82 :
83 : ulong shred_buffer_sz;
84 : uchar shred_buffer[ FD_NET_MTU ];
85 :
86 : ulong repair_buffer_sz;
87 : uchar repair_buffer[ FD_NET_MTU ];
88 :
89 : out_link_t stake_out[1];
90 : out_link_t snap_out[1];
91 : int enable_publish_stake_weights;
92 : ulong * manifest_wmark;
93 : uchar * manifest_bank_mem;
94 : fd_banks_t * banks;
95 : fd_bank_t * bank;
96 : char manifest_path[ PATH_MAX ];
97 : int manifest_load_done;
98 : uchar * manifest_spad_mem;
99 : fd_spad_t * manifest_spad;
100 : uchar * shared_spad_mem;
101 : fd_spad_t * shared_spad;
102 :
103 : fd_ip4_udp_hdrs_t intake_hdr[1];
104 :
105 : ulong now;
106 : ulong last_packet_ns;
107 : double tick_per_ns;
108 :
109 : fd_io_buffered_ostream_t shred_ostream;
110 : fd_io_buffered_ostream_t repair_ostream;
111 : fd_io_buffered_ostream_t fecs_ostream;
112 : fd_io_buffered_ostream_t peers_ostream;
113 : fd_io_buffered_ostream_t slices_ostream;
114 : fd_io_buffered_ostream_t bank_hashes_ostream;
115 :
116 : int shreds_fd; /* shreds snooped from net_shred */
117 : int requests_fd;
118 : int fecs_fd;
119 : int peers_fd;
120 : int slices_fd; /* all shreds in slices from repair tile */
121 : int bank_hashes_fd; /* bank hashes from replay tile */
122 :
123 : ulong write_buf_sz;
124 :
125 : uchar * shreds_buf;
126 : uchar * requests_buf;
127 : uchar * fecs_buf;
128 : uchar * peers_buf;
129 : uchar * slices_buf;
130 : uchar * bank_hashes_buf;
131 :
132 : fd_alloc_t * alloc;
133 : uchar contact_info_buffer[ MAX_BUFFER_SIZE ];
134 : };
135 : typedef struct fd_capture_tile_ctx fd_capture_tile_ctx_t;
136 :
137 : FD_FN_CONST static inline ulong
138 0 : scratch_align( void ) {
139 0 : return 4096UL;
140 0 : }
141 :
142 : FD_FN_CONST static inline ulong
143 0 : manifest_bank_align( void ) {
144 0 : return fd_banks_align();
145 0 : }
146 :
147 : FD_FN_CONST static inline ulong
148 0 : manifest_bank_footprint( void ) {
149 0 : return fd_banks_footprint( MANIFEST_MAX_TOTAL_BANKS, MANIFEST_MAX_FORK_WIDTH );
150 0 : }
151 :
152 : FD_FN_CONST static inline ulong
153 0 : manifest_load_align( void ) {
154 0 : return 128UL;
155 0 : }
156 :
157 : FD_FN_CONST static inline ulong
158 0 : manifest_load_footprint( void ) {
159 : /* A manifest typically requires 1GB, but closer to 2GB
160 : have been observed in mainnet. The footprint is then
161 : set to 2GB. TODO a future adjustment may be needed. */
162 0 : return 2UL * FD_SHMEM_GIGANTIC_PAGE_SZ;
163 0 : }
164 :
165 : FD_FN_CONST static inline ulong
166 0 : manifest_spad_max_alloc_align( void ) {
167 0 : return FD_SPAD_ALIGN;
168 0 : }
169 :
170 : FD_FN_CONST static inline ulong
171 0 : manifest_spad_max_alloc_footprint( void ) {
172 : /* The amount of memory required in the manifest load
173 : scratchpad to process it tends to be slightly larger
174 : than the manifest load footprint. */
175 0 : return manifest_load_footprint() + 128UL * FD_SHMEM_HUGE_PAGE_SZ;
176 0 : }
177 :
178 : FD_FN_CONST static inline ulong
179 0 : shared_spad_max_alloc_align( void ) {
180 0 : return FD_SPAD_ALIGN;
181 0 : }
182 :
183 : FD_FN_CONST static inline ulong
184 0 : shared_spad_max_alloc_footprint( void ) {
185 : /* The shared scratchpad is used by the manifest banks
186 : and by the manifest load (but not at the same time).
187 : The footprint for the banks needs to be equal to
188 : banks footprint (at least for the current setup with
189 : MANIFEST_MAX_TOTAL_BANKS==2). */
190 0 : return fd_ulong_max( manifest_bank_footprint(), manifest_load_footprint() );
191 0 : }
192 :
193 : FD_FN_PURE static inline ulong
194 0 : loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
195 0 : ulong footprint = sizeof(fd_capture_tile_ctx_t)
196 0 : + manifest_bank_footprint()
197 0 : + fd_spad_footprint( manifest_spad_max_alloc_footprint() )
198 0 : + fd_spad_footprint( shared_spad_max_alloc_footprint() )
199 0 : + fd_alloc_footprint();
200 0 : return fd_ulong_align_up( footprint, FD_SHMEM_GIGANTIC_PAGE_SZ );
201 0 : }
202 :
203 :
204 : static ulong
205 : populate_allowed_seccomp( fd_topo_t const * topo FD_PARAM_UNUSED,
206 : fd_topo_tile_t const * tile,
207 : ulong out_cnt,
208 0 : struct sock_filter * out ) {
209 0 : populate_sock_filter_policy_fd_shredcap_tile( out_cnt,
210 0 : out,
211 0 : (uint)fd_log_private_logfile_fd(),
212 0 : (uint)tile->shredcap.shreds_fd,
213 0 : (uint)tile->shredcap.requests_fd,
214 0 : (uint)tile->shredcap.fecs_fd,
215 0 : (uint)tile->shredcap.peers_fd );
216 0 : return sock_filter_policy_fd_shredcap_tile_instr_cnt;
217 0 : }
218 :
219 : FD_FN_PURE static inline ulong
220 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
221 0 : (void)tile;
222 0 : ulong l = FD_LAYOUT_INIT;
223 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_capture_tile_ctx_t), sizeof(fd_capture_tile_ctx_t) );
224 0 : l = FD_LAYOUT_APPEND( l, manifest_bank_align(), manifest_bank_footprint() );
225 0 : l = FD_LAYOUT_APPEND( l, manifest_spad_max_alloc_align(), fd_spad_footprint( manifest_spad_max_alloc_footprint() ) );
226 0 : l = FD_LAYOUT_APPEND( l, shared_spad_max_alloc_align(), fd_spad_footprint( shared_spad_max_alloc_footprint() ) );
227 0 : l = FD_LAYOUT_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
228 0 : return FD_LAYOUT_FINI( l, scratch_align() );
229 0 : }
230 :
231 : static inline ulong
232 : generate_stake_weight_msg_manifest( ulong epoch,
233 : fd_epoch_schedule_t const * epoch_schedule,
234 : fd_snapshot_manifest_epoch_stakes_t const * epoch_stakes,
235 0 : ulong * stake_weight_msg_out ) {
236 0 : fd_stake_weight_msg_t * stake_weight_msg = (fd_stake_weight_msg_t *)fd_type_pun( stake_weight_msg_out );
237 0 : fd_vote_stake_weight_t * stake_weights = stake_weight_msg->weights;
238 :
239 0 : stake_weight_msg->epoch = epoch;
240 0 : stake_weight_msg->staked_cnt = epoch_stakes->vote_stakes_len;
241 0 : stake_weight_msg->start_slot = fd_epoch_slot0( epoch_schedule, epoch );
242 0 : stake_weight_msg->slot_cnt = epoch_schedule->slots_per_epoch;
243 0 : stake_weight_msg->excluded_stake = 0UL;
244 0 : stake_weight_msg->vote_keyed_lsched = 1UL;
245 :
246 : /* FIXME: SIMD-0180 - hack to (de)activate in testnet vs mainnet.
247 : This code can be removed once the feature is active. */
248 0 : {
249 0 : if( ( 1==epoch_schedule->warmup && epoch<FD_SIMD0180_ACTIVE_EPOCH_TESTNET )
250 0 : || ( 0==epoch_schedule->warmup && epoch<FD_SIMD0180_ACTIVE_EPOCH_MAINNET ) ) {
251 0 : stake_weight_msg->vote_keyed_lsched = 0UL;
252 0 : }
253 0 : }
254 :
255 : /* epoch_stakes from manifest are already filtered (stake>0), but not sorted */
256 0 : for( ulong i=0UL; i<epoch_stakes->vote_stakes_len; i++ ) {
257 0 : stake_weights[ i ].stake = epoch_stakes->vote_stakes[ i ].stake;
258 0 : memcpy( stake_weights[ i ].id_key.uc, epoch_stakes->vote_stakes[ i ].identity, sizeof(fd_pubkey_t) );
259 0 : memcpy( stake_weights[ i ].vote_key.uc, epoch_stakes->vote_stakes[ i ].vote, sizeof(fd_pubkey_t) );
260 0 : }
261 0 : sort_vote_weights_by_stake_vote_inplace( stake_weights, epoch_stakes->vote_stakes_len);
262 :
263 0 : return fd_stake_weight_msg_sz( epoch_stakes->vote_stakes_len );
264 0 : }
265 :
266 : static void
267 : publish_stake_weights_manifest( fd_capture_tile_ctx_t * ctx,
268 : fd_stem_context_t * stem,
269 0 : fd_snapshot_manifest_t const * manifest ) {
270 0 : fd_epoch_schedule_t const * schedule = fd_type_pun_const( &manifest->epoch_schedule_params );
271 0 : ulong epoch = fd_slot_to_epoch( schedule, manifest->slot, NULL );
272 :
273 : /* current epoch */
274 0 : ulong * stake_weights_msg = fd_chunk_to_laddr( ctx->stake_out->mem, ctx->stake_out->chunk );
275 0 : ulong stake_weights_sz = generate_stake_weight_msg_manifest( epoch, schedule, &manifest->epoch_stakes[0], stake_weights_msg );
276 0 : ulong stake_weights_sig = 4UL;
277 0 : fd_stem_publish( stem, 0UL, stake_weights_sig, ctx->stake_out->chunk, stake_weights_sz, 0UL, 0UL, fd_frag_meta_ts_comp( fd_tickcount() ) );
278 0 : ctx->stake_out->chunk = fd_dcache_compact_next( ctx->stake_out->chunk, stake_weights_sz, ctx->stake_out->chunk0, ctx->stake_out->wmark );
279 0 : FD_LOG_NOTICE(("sending current epoch stake weights - epoch: %lu, stake_weight_cnt: %lu, start_slot: %lu, slot_cnt: %lu", stake_weights_msg[0], stake_weights_msg[1], stake_weights_msg[2], stake_weights_msg[3]));
280 :
281 : /* next current epoch */
282 0 : stake_weights_msg = fd_chunk_to_laddr( ctx->stake_out->mem, ctx->stake_out->chunk );
283 0 : stake_weights_sz = generate_stake_weight_msg_manifest( epoch + 1, schedule, &manifest->epoch_stakes[1], stake_weights_msg );
284 0 : stake_weights_sig = 4UL;
285 0 : fd_stem_publish( stem, 0UL, stake_weights_sig, ctx->stake_out->chunk, stake_weights_sz, 0UL, 0UL, fd_frag_meta_ts_comp( fd_tickcount() ) );
286 0 : ctx->stake_out->chunk = fd_dcache_compact_next( ctx->stake_out->chunk, stake_weights_sz, ctx->stake_out->chunk0, ctx->stake_out->wmark );
287 0 : FD_LOG_NOTICE(("sending next epoch stake weights - epoch: %lu, stake_weight_cnt: %lu, start_slot: %lu, slot_cnt: %lu", stake_weights_msg[0], stake_weights_msg[1], stake_weights_msg[2], stake_weights_msg[3]));
288 0 : }
289 :
290 : static inline int
291 : before_frag( fd_capture_tile_ctx_t * ctx,
292 : ulong in_idx,
293 : ulong seq FD_PARAM_UNUSED,
294 0 : ulong sig ) {
295 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==NET_SHRED ) ) {
296 0 : return (int)(fd_disco_netmux_sig_proto( sig )!=DST_PROTO_SHRED) & (int)(fd_disco_netmux_sig_proto( sig )!=DST_PROTO_REPAIR);
297 0 : } else if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==GOSSIP_OUT)) {
298 0 : return sig!=FD_GOSSIP_UPDATE_TAG_CONTACT_INFO;
299 0 : }
300 0 : return 0;
301 0 : }
302 :
303 : static inline void
304 : handle_new_contact_info( fd_capture_tile_ctx_t * ctx,
305 0 : uchar const * buf ) {
306 0 : fd_gossip_update_message_t const * msg = (fd_gossip_update_message_t const *)fd_type_pun_const( buf );
307 0 : char tvu_buf[1024];
308 0 : char repair_buf[1024];
309 0 : fd_ip4_port_t tvu = msg->contact_info.contact_info->sockets[ FD_CONTACT_INFO_SOCKET_TVU ];
310 0 : fd_ip4_port_t repair = msg->contact_info.contact_info->sockets[ FD_CONTACT_INFO_SOCKET_SERVE_REPAIR ];
311 :
312 0 : FD_BASE58_ENCODE_32_BYTES( msg->contact_info.contact_info->pubkey.uc, pubkey_b58 );
313 0 : if( FD_UNLIKELY( tvu.l!=0UL ) ){
314 0 : snprintf( tvu_buf, sizeof(tvu_buf),
315 0 : "%u,%u(tvu),%s,%d\n",
316 0 : tvu.addr, tvu.port, pubkey_b58, 1);
317 0 : int err = fd_io_buffered_ostream_write( &ctx->peers_ostream, tvu_buf, strlen(tvu_buf) );
318 0 : FD_TEST( err==0 );
319 0 : }
320 0 : if( FD_UNLIKELY( repair.l!=0UL ) ){
321 0 : snprintf( repair_buf, sizeof(repair_buf),
322 0 : "%u,%u(repair),%s,%d\n",
323 0 : repair.addr, repair.port, pubkey_b58, 1);
324 0 : int err = fd_io_buffered_ostream_write( &ctx->peers_ostream, repair_buf, strlen(repair_buf) );
325 0 : FD_TEST( err==0 );
326 0 : }
327 0 : }
328 :
329 : static int
330 0 : is_fec_completes_msg( ulong sz ) {
331 0 : return sz == FD_SHRED_DATA_HEADER_SZ + 2 * FD_SHRED_MERKLE_ROOT_SZ;
332 0 : }
333 :
334 : static inline void
335 : during_frag( fd_capture_tile_ctx_t * ctx,
336 : ulong in_idx,
337 : ulong seq FD_PARAM_UNUSED,
338 : ulong sig,
339 : ulong chunk,
340 : ulong sz,
341 0 : ulong ctl ) {
342 0 : ctx->skip_frag = 0;
343 0 : if( ctx->in_kind[ in_idx ]==SHRED_OUT ) {
344 0 : if( !is_fec_completes_msg( sz ) ) {
345 0 : ctx->skip_frag = 1;
346 0 : return;
347 0 : }
348 0 : fd_memcpy( ctx->shred_buffer, fd_chunk_to_laddr_const( ctx->in_links[ in_idx ].mem, chunk ), sz );
349 0 : ctx->shred_buffer_sz = sz;
350 0 : } else if( ctx->in_kind[ in_idx ] == NET_SHRED ) {
351 0 : uchar const * dcache_entry = fd_net_rx_translate_frag( &ctx->in_links[ in_idx ].net_rx, chunk, ctl, sz );
352 0 : ulong hdr_sz = fd_disco_netmux_sig_hdr_sz( sig );
353 0 : FD_TEST( hdr_sz <= sz ); /* Should be ensured by the net tile */
354 0 : fd_shred_t const * shred = fd_shred_parse( dcache_entry+hdr_sz, sz-hdr_sz );
355 0 : if( FD_UNLIKELY( !shred ) ) {
356 0 : ctx->skip_frag = 1;
357 0 : return;
358 0 : };
359 0 : fd_memcpy( ctx->shred_buffer, dcache_entry, sz );
360 0 : ctx->shred_buffer_sz = sz-hdr_sz;
361 0 : } else if( ctx->in_kind[ in_idx ] == REPAIR_NET ) {
362 : /* Repair will have outgoing pings, outgoing repair requests, and
363 : outgoing served shreds we want to filter everything but the
364 : repair requests.
365 : 1. We can index into the ip4 udp packet hdr and check if the src
366 : port is the intake listen port or serve port
367 : 2. Then we can filter on the discriminant which luckily does not
368 : require decoding! */
369 :
370 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in_links[ in_idx ].mem, chunk );
371 0 : fd_ip4_udp_hdrs_t const * hdr = (fd_ip4_udp_hdrs_t const *)dcache_entry;
372 0 : if( hdr->udp->net_sport != fd_ushort_bswap( ctx->repair_intake_listen_port ) ) {
373 0 : ctx->skip_frag = 1;
374 0 : return;
375 0 : }
376 0 : const uchar * encoded_protocol = dcache_entry + sizeof(fd_ip4_udp_hdrs_t);
377 0 : uint discriminant = FD_LOAD(uint, encoded_protocol);
378 :
379 0 : if( FD_UNLIKELY( discriminant <= FD_REPAIR_KIND_PONG ) ) {
380 0 : ctx->skip_frag = 1;
381 0 : return;
382 0 : }
383 0 : fd_memcpy( ctx->repair_buffer, dcache_entry, sz );
384 0 : ctx->repair_buffer_sz = sz;
385 0 : } else if( ctx->in_kind[ in_idx ] == REPAIR_SHREDCAP ) {
386 :
387 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in_links[ in_idx ].mem, chunk );
388 :
389 : /* FIXME this should all be happening in after_frag */
390 :
391 : /* We expect to get all of the data shreds in a batch at once. When
392 : we do we will write the header, the shreds, and a trailer. */
393 0 : ulong payload_sz = sig;
394 0 : fd_shredcap_slice_header_msg_t header = {
395 0 : .magic = FD_SHREDCAP_SLICE_HEADER_MAGIC,
396 0 : .version = FD_SHREDCAP_SLICE_HEADER_V1,
397 0 : .payload_sz = payload_sz,
398 0 : };
399 0 : int err;
400 0 : err = fd_io_buffered_ostream_write( &ctx->slices_ostream, &header, FD_SHREDCAP_SLICE_HEADER_FOOTPRINT );
401 0 : if( FD_UNLIKELY( err != 0 ) ) {
402 0 : FD_LOG_CRIT(( "failed to write slice header %d", err ));
403 0 : }
404 0 : err = fd_io_buffered_ostream_write( &ctx->slices_ostream, dcache_entry, payload_sz );
405 0 : if( FD_UNLIKELY( err != 0 ) ) {
406 0 : FD_LOG_CRIT(( "failed to write slice data %d", err ));
407 0 : }
408 0 : fd_shredcap_slice_trailer_msg_t trailer = {
409 0 : .magic = FD_SHREDCAP_SLICE_TRAILER_MAGIC,
410 0 : .version = FD_SHREDCAP_SLICE_TRAILER_V1,
411 0 : };
412 0 : err = fd_io_buffered_ostream_write( &ctx->slices_ostream, &trailer, FD_SHREDCAP_SLICE_TRAILER_FOOTPRINT );
413 0 : if( FD_UNLIKELY( err != 0 ) ) {
414 0 : FD_LOG_CRIT(( "failed to write slice trailer %d", err ));
415 0 : }
416 :
417 0 : } else if( ctx->in_kind[ in_idx ] == REPLAY_OUT ) {
418 0 : if( FD_UNLIKELY( sig!=REPLAY_SIG_SLOT_COMPLETED ) ) return;
419 :
420 : /* FIXME this should all be happening in after_frag */
421 :
422 0 : fd_replay_slot_completed_t const * msg = fd_chunk_to_laddr_const( ctx->in_links[ in_idx ].mem, chunk );
423 0 : fd_shredcap_bank_hash_msg_t bank_hash_msg = {
424 0 : .magic = FD_SHREDCAP_BANK_HASH_MAGIC,
425 0 : .version = FD_SHREDCAP_BANK_HASH_V1
426 0 : };
427 0 : fd_memcpy( &bank_hash_msg.bank_hash, msg->bank_hash.uc, sizeof(fd_hash_t) );
428 0 : bank_hash_msg.slot = msg->slot;
429 :
430 0 : fd_io_buffered_ostream_write( &ctx->bank_hashes_ostream, &bank_hash_msg, FD_SHREDCAP_BANK_HASH_FOOTPRINT );
431 :
432 0 : } else {
433 : // contact infos can be copied into a buffer
434 0 : if( FD_UNLIKELY( chunk<ctx->in_links[ in_idx ].chunk0 || chunk>ctx->in_links[ in_idx ].wmark ) ) {
435 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
436 0 : ctx->in_links[ in_idx ].chunk0, ctx->in_links[ in_idx ].wmark ));
437 0 : }
438 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in_links[ in_idx ].mem, chunk );
439 0 : fd_memcpy( ctx->contact_info_buffer, dcache_entry, sz );
440 0 : }
441 0 : }
442 :
443 : static void
444 : after_credit( fd_capture_tile_ctx_t * ctx,
445 : fd_stem_context_t * stem,
446 : int * opt_poll_in FD_PARAM_UNUSED,
447 0 : int * charge_busy FD_PARAM_UNUSED ) {
448 :
449 0 : if( FD_UNLIKELY( !ctx->manifest_load_done ) ) {
450 0 : if( FD_LIKELY( !!strcmp( ctx->manifest_path, "") ) ) {
451 : /* ctx->manifest_spad will hold the processed manifest. */
452 0 : fd_spad_reset( ctx->manifest_spad );
453 : /* do not pop from ctx->manifest_spad, the manifest needs
454 : to remain available until a new manifest is processed. */
455 :
456 0 : int fd = open( ctx->manifest_path, O_RDONLY );
457 0 : if( FD_UNLIKELY( fd < 0 ) ) {
458 0 : FD_LOG_WARNING(( "open(%s) failed (%d-%s)", ctx->manifest_path, errno, fd_io_strerror( errno ) ));
459 0 : return;
460 0 : }
461 0 : FD_LOG_NOTICE(( "manifest %s.", ctx->manifest_path ));
462 :
463 0 : fd_snapshot_manifest_t * manifest = NULL;
464 0 : FD_SPAD_FRAME_BEGIN( ctx->manifest_spad ) {
465 0 : manifest = fd_spad_alloc( ctx->manifest_spad, alignof(fd_snapshot_manifest_t), sizeof(fd_snapshot_manifest_t) );
466 0 : } FD_SPAD_FRAME_END;
467 0 : FD_TEST( manifest );
468 :
469 0 : FD_SPAD_FRAME_BEGIN( ctx->shared_spad ) {
470 0 : uchar * buf = fd_spad_alloc( ctx->shared_spad, manifest_load_align(), manifest_load_footprint() );
471 0 : ulong buf_sz = 0;
472 0 : FD_TEST( !fd_io_read( fd, buf/*dst*/, 0/*dst_min*/, manifest_load_footprint()-1UL /*dst_max*/, &buf_sz ) );
473 :
474 0 : fd_ssmanifest_parser_t * parser = fd_ssmanifest_parser_join( fd_ssmanifest_parser_new( aligned_alloc(
475 0 : fd_ssmanifest_parser_align(), fd_ssmanifest_parser_footprint() ) ) );
476 0 : FD_TEST( parser );
477 0 : fd_ssmanifest_parser_init( parser, manifest );
478 0 : int parser_err = fd_ssmanifest_parser_consume( parser, buf, buf_sz, NULL, NULL );
479 0 : FD_TEST( parser_err==1 );
480 : // if( FD_UNLIKELY( parser_err ) ) FD_LOG_ERR(( "fd_ssmanifest_parser_consume failed (%d)", parser_err ));
481 0 : } FD_SPAD_FRAME_END;
482 0 : FD_LOG_NOTICE(( "manifest bank slot %lu", manifest->slot ));
483 :
484 0 : fd_fseq_update( ctx->manifest_wmark, manifest->slot );
485 :
486 0 : uchar * chunk = fd_chunk_to_laddr( ctx->snap_out->mem, ctx->snap_out->chunk );
487 0 : ulong sz = sizeof(fd_snapshot_manifest_t);
488 0 : ulong sig = fd_ssmsg_sig( FD_SSMSG_MANIFEST_INCREMENTAL );
489 0 : memcpy( chunk, manifest, sz );
490 0 : fd_stem_publish( stem, ctx->snap_out->idx, sig, ctx->snap_out->chunk, sz, 0UL, 0UL, fd_frag_meta_ts_comp( fd_tickcount() ) );
491 0 : ctx->snap_out->chunk = fd_dcache_compact_next( ctx->snap_out->chunk, sz, ctx->snap_out->chunk0, ctx->snap_out->wmark );
492 :
493 0 : fd_stem_publish( stem, ctx->snap_out->idx, fd_ssmsg_sig( FD_SSMSG_DONE ), 0UL, 0UL, 0UL, 0UL, 0UL );
494 :
495 0 : publish_stake_weights_manifest( ctx, stem, manifest );
496 : //*charge_busy = 0;
497 0 : }
498 : /* No need to strcmp every time after_credit is called. */
499 0 : ctx->manifest_load_done = 1;
500 0 : }
501 0 : }
502 :
503 : static void
504 0 : handle_repair_request( fd_capture_tile_ctx_t * ctx ) {
505 : /* We have a valid repair request that we can finally decode.
506 : Unfortunately we actually have to decode because we cant cast
507 : directly to the protocol */
508 0 : fd_ip4_udp_hdrs_t * hdr = (fd_ip4_udp_hdrs_t *)ctx->repair_buffer;
509 :
510 0 : uint peer_ip4_addr = hdr->ip4->daddr;
511 0 : ushort peer_port = hdr->udp->net_dport;
512 0 : ulong slot = 0UL;
513 0 : ulong shred_index = UINT_MAX;
514 0 : uint nonce = 0U;
515 :
516 : /* FIXME this assumes IPv4 without options, which is not always true */
517 0 : uchar const * const buf0 = ctx->repair_buffer + sizeof(fd_ip4_udp_hdrs_t);
518 0 : uchar const * const buf1 = ctx->repair_buffer + ctx->repair_buffer_sz;
519 0 : if( FD_UNLIKELY( buf0>=buf1 ) ) return;
520 :
521 0 : uchar const * cur = buf0;
522 0 : ulong rem = (ulong)( buf1-buf0 );
523 :
524 0 : if( FD_UNLIKELY( rem<sizeof(uint) ) ) return;
525 0 : uint discriminant = FD_LOAD( uint, cur );
526 0 : cur += sizeof(uint);
527 0 : rem -= sizeof(uint);
528 :
529 0 : switch( discriminant ) {
530 0 : case FD_REPAIR_KIND_SHRED: {
531 0 : if( FD_UNLIKELY( rem<sizeof(fd_repair_shred_req_t) ) ) return;
532 0 : fd_repair_shred_req_t req = FD_LOAD( fd_repair_shred_req_t, cur );
533 0 : cur += sizeof(fd_repair_shred_req_t);
534 0 : rem -= sizeof(fd_repair_shred_req_t);
535 :
536 0 : slot = req.slot;
537 0 : shred_index = req.shred_idx;
538 0 : nonce = req.nonce;
539 0 : break;
540 0 : }
541 0 : case FD_REPAIR_KIND_HIGHEST_SHRED: {
542 0 : if( FD_UNLIKELY( rem<sizeof(fd_repair_highest_shred_req_t) ) ) return;
543 0 : fd_repair_highest_shred_req_t req = FD_LOAD( fd_repair_highest_shred_req_t, cur );
544 0 : cur += sizeof(fd_repair_highest_shred_req_t);
545 0 : rem -= sizeof(fd_repair_highest_shred_req_t);
546 :
547 0 : slot = req.slot;
548 0 : shred_index = req.shred_idx;
549 0 : nonce = req.nonce;
550 0 : break;
551 0 : }
552 0 : case FD_REPAIR_KIND_ORPHAN: {
553 0 : if( FD_UNLIKELY( rem<sizeof(fd_repair_orphan_req_t) ) ) return;
554 0 : fd_repair_orphan_req_t req = FD_LOAD( fd_repair_orphan_req_t, cur );
555 0 : cur += sizeof(fd_repair_orphan_req_t);
556 0 : rem -= sizeof(fd_repair_orphan_req_t);
557 :
558 0 : slot = req.slot;
559 0 : nonce = req.nonce;
560 0 : break;
561 0 : }
562 0 : default:
563 0 : break;
564 0 : }
565 :
566 0 : char repair_data_buf[1024];
567 0 : snprintf( repair_data_buf, sizeof(repair_data_buf),
568 0 : "%u,%u,%ld,%u,%lu,%lu\n",
569 0 : peer_ip4_addr, peer_port, fd_log_wallclock(), nonce, slot, shred_index );
570 0 : int err = fd_io_buffered_ostream_write( &ctx->repair_ostream, repair_data_buf, strlen(repair_data_buf) );
571 0 : FD_TEST( err==0 );
572 0 : }
573 :
574 : static inline void
575 : after_frag( fd_capture_tile_ctx_t * ctx,
576 : ulong in_idx,
577 : ulong seq FD_PARAM_UNUSED,
578 : ulong sig,
579 : ulong sz,
580 : ulong tsorig FD_PARAM_UNUSED,
581 : ulong tspub FD_PARAM_UNUSED,
582 0 : fd_stem_context_t * stem FD_PARAM_UNUSED ) {
583 0 : if( FD_UNLIKELY( ctx->skip_frag ) ) return;
584 :
585 0 : if( ctx->in_kind[ in_idx ] == SHRED_OUT ) {
586 : /* This is a fec completes message! we can use it to check how long
587 : it takes to complete a fec */
588 :
589 0 : fd_shred_t const * shred = (fd_shred_t *)fd_type_pun( ctx->shred_buffer );
590 0 : uint data_cnt = fd_disco_shred_out_fec_sig_data_cnt( sig );
591 0 : uint ref_tick = shred->data.flags & FD_SHRED_DATA_REF_TICK_MASK;
592 0 : char fec_complete[1024];
593 0 : snprintf( fec_complete, sizeof(fec_complete),
594 0 : "%ld,%lu,%u,%u,%u\n",
595 0 : fd_log_wallclock(), shred->slot, ref_tick, shred->fec_set_idx, data_cnt );
596 :
597 : // Last shred is guaranteed to be a data shred
598 :
599 :
600 0 : int err = fd_io_buffered_ostream_write( &ctx->fecs_ostream, fec_complete, strlen(fec_complete) );
601 0 : FD_TEST( err==0 );
602 0 : } else if( ctx->in_kind[ in_idx ] == NET_SHRED ) {
603 : /* TODO: leader schedule early exits in shred tile right around
604 : startup, which discards some turbine shreds, but there is a
605 : chance we capture this shred here. Currently handled in post, but
606 : in the future will want to get the leader schedule here so we can
607 : also benchmark whether the excepcted sender in the turbine tree
608 : matches the actual sender. */
609 :
610 0 : ulong hdr_sz = fd_disco_netmux_sig_hdr_sz( sig );
611 0 : fd_ip4_udp_hdrs_t * hdr = (fd_ip4_udp_hdrs_t *)ctx->shred_buffer;
612 0 : uint src_ip4_addr = hdr->ip4->saddr;
613 0 : ushort src_port = hdr->udp->net_sport;
614 :
615 0 : fd_shred_t const * shred = fd_shred_parse( ctx->shred_buffer + hdr_sz, sz - hdr_sz );
616 0 : int is_turbine = fd_disco_netmux_sig_proto( sig ) == DST_PROTO_SHRED;
617 0 : uint nonce = is_turbine ? 0 : FD_LOAD(uint, ctx->shred_buffer + hdr_sz + fd_shred_sz( shred ) );
618 0 : int is_data = fd_shred_is_data( fd_shred_type( shred->variant ) );
619 0 : ulong slot = shred->slot;
620 0 : uint idx = shred->idx;
621 0 : uint fec_idx = shred->fec_set_idx;
622 0 : uint ref_tick = 65;
623 0 : if( FD_UNLIKELY( is_turbine && is_data ) ) {
624 : /* We can then index into the flag and get a REFTICK */
625 0 : ref_tick = shred->data.flags & FD_SHRED_DATA_REF_TICK_MASK;
626 0 : }
627 :
628 0 : char repair_data_buf[1024];
629 0 : snprintf( repair_data_buf, sizeof(repair_data_buf),
630 0 : "%u,%u,%ld,%lu,%u,%u,%u,%d,%d,%u\n",
631 0 : src_ip4_addr, src_port, fd_log_wallclock(), slot, ref_tick, fec_idx, idx, is_turbine, is_data, nonce );
632 :
633 0 : int err = fd_io_buffered_ostream_write( &ctx->shred_ostream, repair_data_buf, strlen(repair_data_buf) );
634 0 : FD_TEST( err==0 );
635 0 : } else if( ctx->in_kind[ in_idx ] == REPAIR_NET ) {
636 0 : handle_repair_request( ctx );
637 0 : } else if( ctx->in_kind[ in_idx ] == GOSSIP_OUT ) {
638 0 : handle_new_contact_info( ctx, ctx->contact_info_buffer );
639 0 : }
640 0 : }
641 :
642 : static ulong
643 : populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED,
644 : fd_topo_tile_t const * tile,
645 : ulong out_fds_cnt FD_PARAM_UNUSED,
646 0 : int * out_fds ) {
647 0 : ulong out_cnt = 0UL;
648 :
649 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
650 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
651 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
652 0 : if( FD_LIKELY( -1!=tile->shredcap.shreds_fd ) )
653 0 : out_fds[ out_cnt++ ] = tile->shredcap.shreds_fd; /* shred file */
654 0 : if( FD_LIKELY( -1!=tile->shredcap.requests_fd ) )
655 0 : out_fds[ out_cnt++ ] = tile->shredcap.requests_fd; /* request file */
656 0 : if( FD_LIKELY( -1!=tile->shredcap.fecs_fd ) )
657 0 : out_fds[ out_cnt++ ] = tile->shredcap.fecs_fd; /* fec complete file */
658 0 : if( FD_LIKELY( -1!=tile->shredcap.peers_fd ) )
659 0 : out_fds[ out_cnt++ ] = tile->shredcap.peers_fd; /* peers file */
660 0 : if( FD_LIKELY( -1!=tile->shredcap.slices_fd ) )
661 0 : out_fds[ out_cnt++ ] = tile->shredcap.slices_fd; /* slices file */
662 0 : if( FD_LIKELY( -1!=tile->shredcap.bank_hashes_fd ) )
663 0 : out_fds[ out_cnt++ ] = tile->shredcap.bank_hashes_fd; /* bank hashes file */
664 :
665 0 : return out_cnt;
666 0 : }
667 :
668 : static void
669 : privileged_init( fd_topo_t * topo FD_PARAM_UNUSED,
670 0 : fd_topo_tile_t * tile ) {
671 0 : char file_path[PATH_MAX];
672 0 : strcpy( file_path, tile->shredcap.folder_path );
673 0 : strcat( file_path, "/shred_data.csv" );
674 0 : tile->shredcap.shreds_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
675 0 : if ( FD_UNLIKELY( tile->shredcap.shreds_fd == -1 ) ) {
676 0 : FD_LOG_ERR(( "failed to open or create shred csv dump file %s %d %s", file_path, errno, strerror(errno) ));
677 0 : }
678 :
679 0 : strcpy( file_path, tile->shredcap.folder_path );
680 0 : strcat( file_path, "/request_data.csv" );
681 0 : tile->shredcap.requests_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
682 0 : if ( FD_UNLIKELY( tile->shredcap.requests_fd == -1 ) ) {
683 0 : FD_LOG_ERR(( "failed to open or create request csv dump file %s %d %s", file_path, errno, strerror(errno) ));
684 0 : }
685 :
686 0 : strcpy( file_path, tile->shredcap.folder_path );
687 0 : strcat( file_path, "/fec_complete.csv" );
688 0 : tile->shredcap.fecs_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
689 0 : if ( FD_UNLIKELY( tile->shredcap.fecs_fd == -1 ) ) {
690 0 : FD_LOG_ERR(( "failed to open or create fec complete csv dump file %s %d %s", file_path, errno, strerror(errno) ));
691 0 : }
692 0 : FD_LOG_NOTICE(( "Opening shred csv dump file at %s", file_path ));
693 :
694 0 : strcpy( file_path, tile->shredcap.folder_path );
695 0 : strcat( file_path, "/peers.csv" );
696 0 : tile->shredcap.peers_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
697 0 : if ( FD_UNLIKELY( tile->shredcap.peers_fd == -1 ) ) {
698 0 : FD_LOG_ERR(( "failed to open or create peers csv dump file %s %d %s", file_path, errno, strerror(errno) ));
699 0 : }
700 :
701 0 : strcpy( file_path, tile->shredcap.folder_path );
702 0 : strcat( file_path, "/slices.bin" );
703 0 : tile->shredcap.slices_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
704 0 : if ( FD_UNLIKELY( tile->shredcap.slices_fd == -1 ) ) {
705 0 : FD_LOG_ERR(( "failed to open or create slices csv dump file %s %d %s", file_path, errno, strerror(errno) ));
706 0 : }
707 0 : FD_LOG_NOTICE(( "Opening val_shreds binary dump file at %s", file_path ));
708 :
709 0 : strcpy( file_path, tile->shredcap.folder_path );
710 0 : strcat( file_path, "/bank_hashes.bin" );
711 0 : tile->shredcap.bank_hashes_fd = open( file_path, O_WRONLY|O_CREAT|O_APPEND /*| O_DIRECT*/, 0644 );
712 0 : if ( FD_UNLIKELY( tile->shredcap.bank_hashes_fd == -1 ) ) {
713 0 : FD_LOG_ERR(( "failed to open or create bank_hashes csv dump file %s %d %s", file_path, errno, strerror(errno) ));
714 0 : }
715 0 : FD_LOG_NOTICE(( "Opening bank_hashes binary dump file at %s", file_path ));
716 0 : }
717 :
718 : static void
719 : init_file_handlers( fd_capture_tile_ctx_t * ctx,
720 : int * ctx_file,
721 : int tile_file,
722 : uchar ** ctx_buf,
723 0 : fd_io_buffered_ostream_t * ctx_ostream ) {
724 0 : *ctx_file = tile_file ;
725 :
726 0 : int err = ftruncate( *ctx_file, 0UL );
727 0 : if( FD_UNLIKELY( err ) ) {
728 0 : FD_LOG_ERR(( "failed to truncate file (%i-%s)", errno, fd_io_strerror( errno ) ));
729 0 : }
730 0 : long seek = lseek( *ctx_file, 0UL, SEEK_SET );
731 0 : if( FD_UNLIKELY( seek!=0L ) ) {
732 0 : FD_LOG_ERR(( "failed to seek to the beginning of file" ));
733 0 : }
734 :
735 0 : *ctx_buf = fd_alloc_malloc( ctx->alloc, 4096, ctx->write_buf_sz );
736 0 : if( FD_UNLIKELY( *ctx_buf == NULL ) ) {
737 0 : FD_LOG_ERR(( "failed to allocate ostream buffer" ));
738 0 : }
739 :
740 0 : if( FD_UNLIKELY( !fd_io_buffered_ostream_init(
741 0 : ctx_ostream,
742 0 : *ctx_file,
743 0 : *ctx_buf,
744 0 : ctx->write_buf_sz ) ) ) {
745 0 : FD_LOG_ERR(( "failed to initialize ostream" ));
746 0 : }
747 0 : }
748 :
749 :
750 : static void
751 : unprivileged_init( fd_topo_t * topo,
752 0 : fd_topo_tile_t * tile ) {
753 :
754 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
755 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
756 0 : fd_capture_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_capture_tile_ctx_t), sizeof(fd_capture_tile_ctx_t) );
757 0 : void * manifest_bank_mem = FD_SCRATCH_ALLOC_APPEND( l, manifest_bank_align(), manifest_bank_footprint() );
758 0 : void * manifest_spad_mem = FD_SCRATCH_ALLOC_APPEND( l, manifest_spad_max_alloc_align(), fd_spad_footprint( manifest_spad_max_alloc_footprint() ) );
759 0 : void * shared_spad_mem = FD_SCRATCH_ALLOC_APPEND( l, shared_spad_max_alloc_align(), fd_spad_footprint( shared_spad_max_alloc_footprint() ) );
760 0 : void * alloc_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
761 0 : FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
762 :
763 : /* Input links */
764 0 : for( ulong i=0; i<tile->in_cnt; i++ ) {
765 0 : fd_topo_link_t * link = &topo->links[ tile->in_link_id[ i ] ];
766 0 : fd_topo_wksp_t * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ];
767 0 : if( 0==strcmp( link->name, "net_shred" ) ) {
768 0 : ctx->in_kind[ i ] = NET_SHRED;
769 0 : fd_net_rx_bounds_init( &ctx->in_links[ i ].net_rx, link->dcache );
770 0 : continue;
771 0 : } else if( 0==strcmp( link->name, "repair_net" ) ) {
772 0 : ctx->in_kind[ i ] = REPAIR_NET;
773 0 : } else if( 0==strcmp( link->name, "shred_out" ) ) {
774 0 : ctx->in_kind[ i ] = SHRED_OUT;
775 0 : } else if( 0==strcmp( link->name, "gossip_out" ) ) {
776 0 : ctx->in_kind[ i ] = GOSSIP_OUT;
777 0 : } else if( 0==strcmp( link->name, "repair_scap" ) ) {
778 0 : ctx->in_kind[ i ] = REPAIR_SHREDCAP;
779 0 : } else if( 0==strcmp( link->name, "replay_out" ) ) {
780 0 : ctx->in_kind[ i ] = REPLAY_OUT;
781 0 : } else {
782 0 : FD_LOG_ERR(( "scap tile has unexpected input link %s", link->name ));
783 0 : }
784 :
785 0 : ctx->in_links[ i ].mem = link_wksp->wksp;
786 0 : ctx->in_links[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->in_links[ i ].mem, link->dcache );
787 0 : ctx->in_links[ i ].wmark = fd_dcache_compact_wmark ( ctx->in_links[ i ].mem, link->dcache, link->mtu );
788 0 : }
789 :
790 0 : ctx->repair_intake_listen_port = tile->shredcap.repair_intake_listen_port;
791 0 : ctx->write_buf_sz = tile->shredcap.write_buffer_size ? tile->shredcap.write_buffer_size : FD_SHREDCAP_DEFAULT_WRITER_BUF_SZ;
792 :
793 : /* Set up stake weights tile output */
794 0 : ctx->stake_out->idx = fd_topo_find_tile_out_link( topo, tile, "replay_stake", 0 );
795 0 : if( FD_LIKELY( ctx->stake_out->idx!=ULONG_MAX ) ) {
796 0 : fd_topo_link_t * stake_weights_out = &topo->links[ tile->out_link_id[ ctx->stake_out->idx] ];
797 0 : ctx->stake_out->mcache = stake_weights_out->mcache;
798 0 : ctx->stake_out->mem = topo->workspaces[ topo->objs[ stake_weights_out->dcache_obj_id ].wksp_id ].wksp;
799 0 : ctx->stake_out->sync = fd_mcache_seq_laddr ( ctx->stake_out->mcache );
800 0 : ctx->stake_out->depth = fd_mcache_depth ( ctx->stake_out->mcache );
801 0 : ctx->stake_out->seq = fd_mcache_seq_query ( ctx->stake_out->sync );
802 0 : ctx->stake_out->chunk0 = fd_dcache_compact_chunk0( ctx->stake_out->mem, stake_weights_out->dcache );
803 0 : ctx->stake_out->wmark = fd_dcache_compact_wmark ( ctx->stake_out->mem, stake_weights_out->dcache, stake_weights_out->mtu );
804 0 : ctx->stake_out->chunk = ctx->stake_out->chunk0;
805 0 : } else {
806 0 : FD_LOG_WARNING(( "no connection to stake_out link" ));
807 0 : memset( ctx->stake_out, 0, sizeof(out_link_t) );
808 0 : }
809 :
810 0 : ctx->snap_out->idx = fd_topo_find_tile_out_link( topo, tile, "snapin_manif", 0 );
811 0 : if( FD_LIKELY( ctx->snap_out->idx!=ULONG_MAX ) ) {
812 0 : fd_topo_link_t * snap_out = &topo->links[tile->out_link_id[ctx->snap_out->idx]];
813 0 : ctx->snap_out->mem = topo->workspaces[topo->objs[snap_out->dcache_obj_id].wksp_id].wksp;
814 0 : ctx->snap_out->chunk0 = fd_dcache_compact_chunk0( ctx->snap_out->mem, snap_out->dcache );
815 0 : ctx->snap_out->wmark = fd_dcache_compact_wmark( ctx->snap_out->mem, snap_out->dcache, snap_out->mtu );
816 0 : ctx->snap_out->chunk = ctx->snap_out->chunk0;
817 0 : } else {
818 0 : FD_LOG_WARNING(( "no connection to snap_out link" ));
819 0 : memset( ctx->snap_out, 0, sizeof(out_link_t) );
820 0 : }
821 :
822 : /* If the manifest is enabled (for processing), the stake_out link
823 : must be connected to the tile. TODO in principle, it should be
824 : possible to gate the remaining of the manifest-related config. */
825 0 : ctx->enable_publish_stake_weights = tile->shredcap.enable_publish_stake_weights;
826 0 : FD_LOG_NOTICE(( "enable_publish_stake_weights ? %d", ctx->enable_publish_stake_weights ));
827 :
828 : /* manifest_wmark (root slot) */
829 0 : ulong root_slot_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "root_slot" );
830 0 : if( FD_LIKELY( root_slot_obj_id!=ULONG_MAX ) ) { /* for profiler */
831 0 : ctx->manifest_wmark = fd_fseq_join( fd_topo_obj_laddr( topo, root_slot_obj_id ) );
832 0 : if( FD_UNLIKELY( !ctx->manifest_wmark ) ) FD_LOG_ERR(( "no root_slot fseq" ));
833 0 : FD_TEST( ULONG_MAX==fd_fseq_query( ctx->manifest_wmark ) );
834 0 : }
835 :
836 0 : ctx->manifest_bank_mem = manifest_bank_mem;
837 :
838 : // TODO: ???? Why is this calling fd_banks_new ... does not seem right
839 0 : ctx->banks = fd_banks_join( fd_banks_new( ctx->manifest_bank_mem, MANIFEST_MAX_TOTAL_BANKS, MANIFEST_MAX_FORK_WIDTH, 0 /* TODO? */, 8888UL /* TODO? */ ) );
840 0 : FD_TEST( ctx->banks );
841 0 : ctx->bank = fd_banks_init_bank( ctx->banks );
842 0 : fd_bank_slot_set( ctx->bank, 0UL );
843 0 : FD_TEST( ctx->bank );
844 :
845 0 : strncpy( ctx->manifest_path, tile->shredcap.manifest_path, PATH_MAX );
846 0 : ctx->manifest_load_done = 0;
847 0 : ctx->manifest_spad_mem = manifest_spad_mem;
848 0 : ctx->manifest_spad = fd_spad_join( fd_spad_new( ctx->manifest_spad_mem, manifest_spad_max_alloc_footprint() ) );
849 0 : ctx->shared_spad_mem = shared_spad_mem;
850 0 : ctx->shared_spad = fd_spad_join( fd_spad_new( ctx->shared_spad_mem, shared_spad_max_alloc_footprint() ) );
851 :
852 : /* Allocate the write buffers */
853 0 : ctx->alloc = fd_alloc_join( fd_alloc_new( alloc_mem, FD_SHREDCAP_ALLOC_TAG ), fd_tile_idx() );
854 0 : if( FD_UNLIKELY( !ctx->alloc ) ) {
855 0 : FD_LOG_ERR( ( "fd_alloc_join failed" ) );
856 0 : }
857 :
858 : /* Setup the csv files to be in the expected state */
859 :
860 0 : init_file_handlers( ctx, &ctx->shreds_fd, tile->shredcap.shreds_fd, &ctx->shreds_buf, &ctx->shred_ostream );
861 0 : init_file_handlers( ctx, &ctx->requests_fd, tile->shredcap.requests_fd, &ctx->requests_buf, &ctx->repair_ostream );
862 0 : init_file_handlers( ctx, &ctx->fecs_fd, tile->shredcap.fecs_fd, &ctx->fecs_buf, &ctx->fecs_ostream );
863 0 : init_file_handlers( ctx, &ctx->peers_fd, tile->shredcap.peers_fd, &ctx->peers_buf, &ctx->peers_ostream );
864 :
865 0 : int err = fd_io_buffered_ostream_write( &ctx->shred_ostream, "src_ip,src_port,timestamp,slot,ref_tick,fec_set_idx,idx,is_turbine,is_data,nonce\n", 81UL );
866 0 : err |= fd_io_buffered_ostream_write( &ctx->repair_ostream, "dst_ip,dst_port,timestamp,nonce,slot,idx\n", 41UL );
867 0 : err |= fd_io_buffered_ostream_write( &ctx->fecs_ostream, "timestamp,slot,ref_tick,fec_set_idx,data_cnt\n", 45UL );
868 0 : err |= fd_io_buffered_ostream_write( &ctx->peers_ostream, "peer_ip4_addr,peer_port,pubkey,turbine\n", 48UL );
869 :
870 0 : if( FD_UNLIKELY( err ) ) {
871 0 : FD_LOG_ERR(( "failed to write header to any of the 4 csv files (%i-%s)", errno, fd_io_strerror( errno ) ));
872 0 : }
873 :
874 : /* Setup the binary files to be in the expected state. These files are
875 : not csv, so we don't need headers. */
876 0 : init_file_handlers( ctx, &ctx->slices_fd, tile->shredcap.slices_fd, &ctx->slices_buf, &ctx->slices_ostream );
877 0 : init_file_handlers( ctx, &ctx->bank_hashes_fd, tile->shredcap.bank_hashes_fd, &ctx->bank_hashes_buf, &ctx->bank_hashes_ostream );
878 0 : }
879 :
880 0 : #define STEM_BURST (1UL)
881 0 : #define STEM_LAZY (50UL)
882 :
883 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_capture_tile_ctx_t
884 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_capture_tile_ctx_t)
885 :
886 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
887 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
888 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
889 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
890 :
891 : #include "../../disco/stem/fd_stem.c"
892 :
893 : fd_topo_run_tile_t fd_tile_shredcap = {
894 : .name = "scap",
895 : .loose_footprint = loose_footprint,
896 : .populate_allowed_seccomp = populate_allowed_seccomp,
897 : .populate_allowed_fds = populate_allowed_fds,
898 : .scratch_align = scratch_align,
899 : .scratch_footprint = scratch_footprint,
900 : .privileged_init = privileged_init,
901 : .unprivileged_init = unprivileged_init,
902 : .run = stem_run,
903 : };
|