Line data Source code
1 : #define _GNU_SOURCE /* Enable GNU and POSIX extensions */
2 :
3 : #include "../tiles.h"
4 :
5 : #include "fd_archiver.h"
6 : #include <errno.h>
7 : #include <fcntl.h>
8 : #include <string.h>
9 : #include <sys/mman.h>
10 : #include <sys/stat.h>
11 : #include <unistd.h>
12 : #include <linux/unistd.h>
13 : #include <sys/socket.h>
14 : #include <linux/if_xdp.h>
15 : #include "generated/archiver_playback_seccomp.h"
16 : #include "../../util/pod/fd_pod_format.h"
17 : /* The archiver playback tile consumes from the archive file, adds artificial delay
18 : to reproduce exactly the timing from the capture, and forwards these fragments to the
19 : receiver tiles (shred/quic/gossip/repair).
20 :
21 : There should be a single archiver playback tile, and it should replace the input links to the
22 : receiver tiles.
23 : */
24 :
25 0 : #define NET_SHRED_OUT_IDX (0UL)
26 0 : #define NET_REPAIR_OUT_IDX (1UL)
27 :
28 0 : #define FD_ARCHIVER_PLAYBACK_ALLOC_TAG (3UL)
29 :
30 : #define FD_ARCHIVER_STARTUP_DELAY_SECONDS (1)
31 0 : #define FD_ARCHIVE_PLAYBACK_BUFFER_SZ (FD_SHMEM_GIGANTIC_PAGE_SZ)
32 :
33 : struct fd_archiver_playback_stats {
34 : ulong net_shred_out_cnt;
35 : ulong net_quic_out_cnt;
36 : ulong net_gossip_out_cnt;
37 : ulong net_repair_out_cnt;
38 :
39 : };
40 : typedef struct fd_archiver_playback_stats fd_archiver_playback_stats_t;
41 :
42 : typedef struct {
43 : fd_wksp_t * mem;
44 : ulong mtu;
45 : ulong chunk0;
46 : ulong wmark;
47 : ulong chunk;
48 : } fd_archiver_playback_out_ctx_t;
49 :
50 : struct fd_archiver_playback_tile_ctx {
51 : fd_io_buffered_istream_t istream;
52 : uchar * istream_buf;
53 :
54 : fd_archiver_playback_stats_t stats;
55 :
56 : double tick_per_ns;
57 :
58 : ulong prev_publish_time;
59 : ulong now;
60 : ulong need_notify;
61 : ulong notified;
62 :
63 : fd_archiver_playback_out_ctx_t out[ 32 ];
64 :
65 : fd_alloc_t * alloc;
66 : fd_valloc_t valloc;
67 :
68 : ulong playback_done;
69 : ulong done_time;
70 : ulong playback_started;
71 : ulong playback_cnt[FD_ARCHIVER_TILE_CNT];
72 :
73 : ulong * published_wmark; /* same as the one in replay tile */
74 : };
75 : typedef struct fd_archiver_playback_tile_ctx fd_archiver_playback_tile_ctx_t;
76 :
77 : FD_FN_CONST static inline ulong
78 0 : scratch_align( void ) {
79 0 : return 4096UL;
80 0 : }
81 :
82 : FD_FN_PURE static inline ulong
83 0 : loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
84 0 : return 1UL * FD_SHMEM_GIGANTIC_PAGE_SZ;
85 0 : }
86 :
87 : static ulong
88 : populate_allowed_seccomp( fd_topo_t const * topo,
89 : fd_topo_tile_t const * tile,
90 : ulong out_cnt,
91 0 : struct sock_filter * out ) {
92 0 : (void)topo;
93 :
94 0 : populate_sock_filter_policy_archiver_playback( out_cnt,
95 0 : out,
96 0 : (uint)fd_log_private_logfile_fd(),
97 0 : (uint)tile->archiver.archive_fd );
98 0 : return sock_filter_policy_archiver_playback_instr_cnt;
99 0 : }
100 :
101 : static ulong
102 : populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED,
103 : fd_topo_tile_t const * tile,
104 : ulong out_fds_cnt FD_PARAM_UNUSED,
105 0 : int * out_fds ) {
106 0 : ulong out_cnt = 0UL;
107 :
108 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
109 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
110 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
111 0 : if( FD_LIKELY( -1!=tile->archiver.archive_fd ) )
112 0 : out_fds[ out_cnt++ ] = tile->archiver.archive_fd; /* archive file */
113 :
114 0 : return out_cnt;
115 0 : }
116 :
117 : FD_FN_PURE static inline ulong
118 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
119 0 : (void)tile;
120 0 : ulong l = FD_LAYOUT_INIT;
121 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_archiver_playback_tile_ctx_t), sizeof(fd_archiver_playback_tile_ctx_t) );
122 0 : return FD_LAYOUT_FINI( l, scratch_align() );
123 0 : }
124 :
125 : static void
126 : privileged_init( fd_topo_t * topo,
127 0 : fd_topo_tile_t * tile ) {
128 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
129 :
130 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
131 0 : fd_archiver_playback_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_archiver_playback_tile_ctx_t), sizeof(fd_archiver_playback_tile_ctx_t) );
132 0 : memset( ctx, 0, sizeof(fd_archiver_playback_tile_ctx_t) );
133 0 : FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
134 0 : FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
135 :
136 0 : tile->archiver.archive_fd = open( tile->archiver.archiver_path, O_RDONLY | O_DIRECT, 0666 );
137 0 : if ( FD_UNLIKELY( tile->archiver.archive_fd == -1 ) ) {
138 0 : FD_LOG_ERR(( "failed to open archive file %s %d %d %s", tile->archiver.archiver_path, tile->archiver.archive_fd, errno, strerror(errno) ));
139 0 : }
140 0 : }
141 :
142 : static void
143 : unprivileged_init( fd_topo_t * topo,
144 0 : fd_topo_tile_t * tile ) {
145 :
146 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
147 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
148 0 : fd_archiver_playback_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_archiver_playback_tile_ctx_t), sizeof(fd_archiver_playback_tile_ctx_t) );
149 0 : void * alloc_shmem = FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
150 0 : FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
151 :
152 0 : ctx->tick_per_ns = fd_tempo_tick_per_ns( NULL );
153 :
154 : /* Allocator */
155 0 : ctx->alloc = fd_alloc_join( fd_alloc_new( alloc_shmem, FD_ARCHIVER_PLAYBACK_ALLOC_TAG ), fd_tile_idx() );
156 0 : if( FD_UNLIKELY( !ctx->alloc ) ) {
157 0 : FD_LOG_ERR( ( "fd_alloc_join failed" ) );
158 0 : }
159 0 : ctx->valloc = fd_alloc_virtual( ctx->alloc );
160 :
161 : /* Allocate output buffer */
162 0 : ctx->istream_buf = fd_valloc_malloc( ctx->valloc, 4096, FD_ARCHIVE_PLAYBACK_BUFFER_SZ );
163 0 : if( FD_UNLIKELY( !ctx->istream_buf ) ) {
164 0 : FD_LOG_ERR(( "failed to allocate input buffer" ));
165 0 : }
166 :
167 : /* initialize the file reader */
168 0 : fd_io_buffered_istream_init( &ctx->istream, tile->archiver.archive_fd, ctx->istream_buf, FD_ARCHIVE_PLAYBACK_BUFFER_SZ );
169 :
170 : /* perform the initial read */
171 0 : if( FD_UNLIKELY(( !fd_io_buffered_istream_fetch( &ctx->istream ) )) ) {
172 0 : FD_LOG_WARNING(( "failed initial read" ));
173 0 : }
174 :
175 : /* Setup output links */
176 0 : for( ulong i=0; i<tile->out_cnt; i++ ) {
177 0 : fd_topo_link_t * link = &topo->links[ tile->out_link_id[ i ] ];
178 0 : fd_topo_wksp_t * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ];
179 :
180 0 : ctx->out[ i ].mtu = link->mtu;
181 0 : ctx->out[ i ].mem = link_wksp->wksp;
182 0 : ctx->out[ i ].chunk0 = fd_dcache_compact_chunk0( link_wksp->wksp, link->dcache );
183 0 : ctx->out[ i ].wmark = fd_dcache_compact_wmark( link_wksp->wksp, link->dcache, link->mtu );
184 0 : ctx->out[ i ].chunk = ctx->out[ i ].chunk0;
185 0 : }
186 :
187 0 : ctx->playback_done = 0;
188 0 : ctx->playback_started = 0;
189 0 : ctx->now = 0;
190 0 : ctx->prev_publish_time = 0;
191 : /* for now, we require a notification before playback another frag */
192 0 : FD_TEST( tile->in_cnt==1 );
193 0 : ctx->need_notify = 1;
194 0 : ctx->notified = 1;
195 0 : ctx->playback_cnt[FD_ARCHIVER_TILE_ID_SHRED] = 0;
196 0 : ctx->playback_cnt[FD_ARCHIVER_TILE_ID_REPAIR] = 0;
197 :
198 0 : ulong root_slot_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "root_slot" );
199 0 : FD_TEST( root_slot_obj_id!=ULONG_MAX );
200 0 : ctx->published_wmark = fd_fseq_join( fd_topo_obj_laddr( topo, root_slot_obj_id ) );
201 0 : if( FD_UNLIKELY( !ctx->published_wmark ) ) FD_LOG_ERR(( "replay tile has no root_slot fseq" ));
202 0 : FD_TEST( ULONG_MAX==fd_fseq_query( ctx->published_wmark ) );
203 :
204 0 : FD_LOG_WARNING(( "Playback tile finishes initialization" ));
205 0 : }
206 :
207 : static void
208 0 : during_housekeeping( fd_archiver_playback_tile_ctx_t * ctx ) {
209 0 : ctx->now =(ulong)((double)(fd_tickcount()) / ctx->tick_per_ns);
210 0 : }
211 :
212 : static void
213 : after_frag( fd_archiver_playback_tile_ctx_t * ctx,
214 : ulong in_idx,
215 : ulong seq FD_PARAM_UNUSED,
216 : ulong sig FD_PARAM_UNUSED,
217 : ulong sz FD_PARAM_UNUSED,
218 : ulong tsorig FD_PARAM_UNUSED,
219 : ulong tspub FD_PARAM_UNUSED,
220 0 : fd_stem_context_t * stem FD_PARAM_UNUSED ) {
221 0 : if( FD_UNLIKELY( in_idx!=0 ) ) FD_LOG_ERR(( "Playback seems corrupted." ));
222 0 : ctx->notified = 1;
223 0 : }
224 :
225 : static inline void
226 : after_credit( fd_archiver_playback_tile_ctx_t * ctx,
227 : fd_stem_context_t * stem,
228 : int * opt_poll_in FD_PARAM_UNUSED,
229 0 : int * charge_busy FD_PARAM_UNUSED ) {
230 0 : if( FD_UNLIKELY( ctx->playback_done ) ) {
231 0 : if( ctx->now>ctx->done_time+1000000000UL*5UL ) {
232 0 : FD_LOG_ERR(( "Playback is done with %lu shred frags and %lu repair frags.",
233 0 : ctx->playback_cnt[FD_ARCHIVER_TILE_ID_SHRED],
234 0 : ctx->playback_cnt[FD_ARCHIVER_TILE_ID_REPAIR] ));
235 0 : }
236 0 : return;
237 0 : }
238 :
239 0 : if( FD_UNLIKELY( !ctx->playback_started ) ) {
240 0 : ulong wmark = fd_fseq_query( ctx->published_wmark );
241 0 : if( wmark!=ULONG_MAX ) {
242 : /* Replay tile has updated root_slot (aka. published_wmark), meaning
243 : * (1) snapshot has been loaded; (2) blockstore has been initialized */
244 0 : ctx->playback_started = 1;
245 0 : FD_LOG_WARNING(( "playback starts with wmark=%lu", wmark ));
246 0 : } else {
247 0 : return;
248 0 : }
249 0 : }
250 :
251 : /* Peek the header without consuming anything, to see if we need to wait */
252 0 : char const * peek = fd_io_buffered_istream_peek( &ctx->istream );
253 0 : if( FD_UNLIKELY(( !peek )) ) {
254 0 : FD_LOG_ERR(( "failed to peek" ));
255 0 : }
256 :
257 : /* Consume the header */
258 0 : fd_archiver_frag_header_t * header = fd_type_pun( (char *)peek );
259 0 : if( FD_UNLIKELY( header->magic != FD_ARCHIVER_HEADER_MAGIC ) ) {
260 0 : FD_LOG_WARNING(( "bad magic in archive header: %lu", header->magic ));
261 0 : ctx->playback_done = 1;
262 0 : ctx->done_time = ctx->now;
263 0 : return;
264 0 : }
265 :
266 : /* Determine if we should wait before publishing this
267 : need to delay if now > (when we should publish it) */
268 0 : if( ctx->prev_publish_time != 0UL &&
269 0 : ( ctx->now < ( ctx->prev_publish_time + header->ns_since_prev_fragment ) )) {
270 0 : return;
271 0 : }
272 :
273 : /* Determine if playback receives the notification for
274 : the previous frag from storei tile. */
275 0 : if( FD_LIKELY( ctx->need_notify && !ctx->notified ) ) return;
276 :
277 : /* Consume the header from the stream */
278 0 : fd_archiver_frag_header_t header_tmp;
279 0 : if( FD_UNLIKELY( fd_io_buffered_istream_read( &ctx->istream, &header_tmp, FD_ARCHIVER_FRAG_HEADER_FOOTPRINT ) )) {
280 0 : FD_LOG_WARNING(( "failed to consume header" ));
281 0 : ctx->playback_done = 1;
282 0 : ctx->done_time = ctx->now;
283 0 : return;
284 0 : }
285 :
286 : /* Determine the output link on which to send the frag */
287 0 : ulong out_link_idx = 0UL;
288 0 : switch ( header_tmp.tile_id ) {
289 0 : case FD_ARCHIVER_TILE_ID_SHRED:
290 0 : out_link_idx = NET_SHRED_OUT_IDX;
291 0 : ctx->playback_cnt[FD_ARCHIVER_TILE_ID_SHRED]++;
292 0 : break;
293 0 : case FD_ARCHIVER_TILE_ID_REPAIR:
294 0 : out_link_idx = NET_REPAIR_OUT_IDX;
295 0 : ctx->playback_cnt[FD_ARCHIVER_TILE_ID_REPAIR]++;
296 0 : break;
297 0 : default:
298 0 : FD_LOG_ERR(( "unsupported tile id" ));
299 0 : }
300 :
301 : /* Consume the fragment from the stream */
302 0 : uchar * dst = (uchar *)fd_chunk_to_laddr( ctx->out[ out_link_idx ].mem, ctx->out[ out_link_idx ].chunk );
303 0 : if( FD_UNLIKELY( fd_io_buffered_istream_read( &ctx->istream, dst, header_tmp.sz ) ) ) {
304 0 : FD_LOG_WARNING(( "failed to consume frag" ));
305 0 : ctx->playback_done = 1;
306 0 : ctx->done_time = ctx->now;
307 0 : return;
308 0 : }
309 :
310 0 : if( FD_LIKELY( ctx->need_notify ) ) ctx->notified=0;
311 0 : if( FD_UNLIKELY(( ctx->out[ out_link_idx ].mtu<header_tmp.sz )) ) {
312 0 : FD_LOG_ERR(( "Try to playback frag with sz=%lu, exceeding mtu=%lu for link%lu",
313 0 : header_tmp.sz, ctx->out[ out_link_idx ].mtu, out_link_idx ));
314 0 : }
315 0 : fd_stem_publish( stem, out_link_idx, header_tmp.sig, ctx->out[ out_link_idx ].chunk, header_tmp.sz, 0UL, 0UL, 0UL);
316 0 : ctx->out[ out_link_idx ].chunk = fd_dcache_compact_next( ctx->out[ out_link_idx ].chunk,
317 0 : header_tmp.sz,
318 0 : ctx->out[ out_link_idx ].chunk0,
319 0 : ctx->out[ out_link_idx ].wmark );
320 0 : ctx->prev_publish_time = ctx->now;
321 0 : }
322 :
323 0 : #define STEM_BURST (1UL)
324 0 : #define STEM_LAZY (50UL)
325 :
326 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_archiver_playback_tile_ctx_t
327 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_archiver_playback_tile_ctx_t)
328 :
329 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
330 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
331 0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
332 :
333 : #include "../stem/fd_stem.c"
334 :
335 : fd_topo_run_tile_t fd_tile_archiver_playback = {
336 : .name = "arch_p",
337 : .loose_footprint = loose_footprint,
338 : .populate_allowed_seccomp = populate_allowed_seccomp,
339 : .populate_allowed_fds = populate_allowed_fds,
340 : .scratch_align = scratch_align,
341 : .scratch_footprint = scratch_footprint,
342 : .privileged_init = privileged_init,
343 : .unprivileged_init = unprivileged_init,
344 : .run = stem_run,
345 : };
|