Line data Source code
1 : /* The snapwh tile updates vinyl_bstream_block integrity hashes for
2 : blocks flowing through. Assumes that:
3 : - vinyl records are not fragmented across buffers
4 : - vinyl records have trailing zeros (particular for the footer's
5 : hash numbers) */
6 :
7 : #include "utils/fd_ssctrl.h"
8 : #include "../../disco/topo/fd_topo.h"
9 : #include "../../disco/metrics/fd_metrics.h"
10 : #include "../../vinyl/bstream/fd_vinyl_bstream.h"
11 : #include "generated/fd_snapwh_tile_seccomp.h"
12 :
13 : #define NAME "snapwh"
14 :
15 : struct fd_snapwh {
16 : /* Run loop */
17 : uint state;
18 : uint idle_cnt;
19 :
20 : /* Database params */
21 : ulong const * io_seed;
22 :
23 : /* RX link */
24 : void * base;
25 :
26 : /* ACKs / flow control */
27 : ulong * up_fseq;
28 : ulong const * wr_fseq;
29 : ulong last_fseq;
30 : ulong next_seq;
31 :
32 : /* Scratch variables */
33 : ulong meta_chunk;
34 : ulong meta_ctl;
35 : };
36 :
37 : typedef struct fd_snapwh fd_snapwh_t;
38 :
39 : static ulong
40 0 : scratch_align( void ) {
41 0 : return alignof(fd_snapwh_t);
42 0 : }
43 :
44 : static ulong
45 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
46 0 : (void)tile;
47 0 : return sizeof(fd_snapwh_t);
48 0 : }
49 :
50 : static void
51 : unprivileged_init( fd_topo_t * topo,
52 0 : fd_topo_tile_t * tile ) {
53 0 : fd_snapwh_t * snapwh = fd_topo_obj_laddr( topo, tile->tile_obj_id );
54 0 : memset( snapwh, 0, sizeof(fd_snapwh_t) );
55 :
56 0 : if( FD_UNLIKELY( tile->kind_id ) ) FD_LOG_ERR(( "There can only be one `" NAME "` tile" ));
57 0 : if( FD_UNLIKELY( tile->in_cnt !=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 1", tile->in_cnt ));
58 0 : if( FD_UNLIKELY( tile->out_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 1", tile->out_cnt ));
59 :
60 0 : fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ 0 ] ];
61 0 : if( FD_UNLIKELY( !tile->in_link_reliable[ 0 ] ) ) FD_LOG_ERR(( "tile `" NAME "` in link 0 must be reliable" ));
62 0 : ulong * fseq = tile->in_link_fseq[ 0 ];
63 0 : snapwh->base = in_link->dcache;
64 0 : snapwh->up_fseq = &fseq[ 0 ];
65 :
66 0 : FD_CRIT( fd_dcache_app_sz( in_link->dcache )>=sizeof(ulong), "in_link dcache app region too small to hold io_seed" );
67 0 : snapwh->io_seed = (ulong const *)fd_dcache_app_laddr_const( in_link->dcache );
68 :
69 0 : fd_topo_link_t const * out_link = &topo->links[ tile->out_link_id[ 0 ] ];
70 0 : FD_TEST( fd_topo_link_reliable_consumer_cnt( topo, out_link )==1UL );
71 0 : for( ulong tile_idx=0UL; tile_idx<topo->tile_cnt; tile_idx++ ) {
72 0 : fd_topo_tile_t const * consumer_tile = &topo->tiles[ tile_idx ];
73 0 : for( ulong in_idx=0UL; in_idx<consumer_tile->in_cnt; in_idx++ ) {
74 0 : if( consumer_tile->in_link_id[ in_idx ]==out_link->id ) {
75 0 : snapwh->wr_fseq = consumer_tile->in_link_fseq[ in_idx ];
76 0 : break;
77 0 : }
78 0 : }
79 0 : if( snapwh->wr_fseq ) break;
80 0 : }
81 0 : if( FD_UNLIKELY( !snapwh->wr_fseq ) ) {
82 0 : FD_LOG_ERR(( "unable to find fseq for output link %s:%lu",
83 0 : out_link->name, out_link->kind_id ));
84 0 : }
85 :
86 0 : snapwh->state = FD_SNAPSHOT_STATE_IDLE;
87 0 : snapwh->last_fseq = fd_fseq_query( snapwh->up_fseq );
88 0 : }
89 :
90 : static ulong
91 : populate_allowed_fds( fd_topo_t const * topo,
92 : fd_topo_tile_t const * tile,
93 : ulong out_fds_cnt,
94 0 : int * out_fds ) {
95 0 : (void)topo; (void)tile;
96 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
97 :
98 0 : ulong out_cnt = 0;
99 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
100 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
101 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
102 0 : }
103 :
104 0 : return out_cnt;
105 0 : }
106 :
107 : static ulong
108 : populate_allowed_seccomp( fd_topo_t const * topo,
109 : fd_topo_tile_t const * tile,
110 : ulong out_cnt,
111 0 : struct sock_filter * out ) {
112 0 : (void)topo; (void)tile;
113 0 : populate_sock_filter_policy_fd_snapwh_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() );
114 0 : return sock_filter_policy_fd_snapwh_tile_instr_cnt;
115 0 : }
116 :
117 : static int
118 0 : should_shutdown( fd_snapwh_t const * ctx ) {
119 0 : return ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN && ctx->last_fseq==ctx->next_seq;
120 0 : }
121 :
122 : static void
123 : before_credit( fd_snapwh_t * ctx,
124 : fd_stem_context_t * stem,
125 0 : int * charge_busy ) {
126 0 : (void)stem;
127 0 : if( ++ctx->idle_cnt >= 1024U ) {
128 0 : fd_log_sleep( (long)1e6 ); /* 1 millisecond */
129 0 : *charge_busy = 0;
130 0 : ctx->idle_cnt = 0U;
131 0 : }
132 :
133 : /* Reverse path bubble up flow control credits received from snapwr */
134 0 : ulong wr_seq = fd_fseq_query( ctx->wr_fseq );
135 0 : if( FD_UNLIKELY( wr_seq!=ctx->last_fseq ) ) {
136 0 : fd_fseq_update( ctx->up_fseq, wr_seq );
137 0 : ctx->last_fseq = wr_seq;
138 0 : }
139 0 : }
140 :
141 : static void
142 0 : metrics_write( fd_snapwh_t * ctx ) {
143 0 : FD_MGAUGE_SET( SNAPWH, STATE, ctx->state );
144 0 : }
145 :
146 : /* handle_control_frag handles an administrative frag from the snapin
147 : tile. */
148 :
149 : static void
150 : handle_control_frag( fd_snapwh_t * ctx,
151 0 : ulong meta_ctl ) {
152 0 : switch( meta_ctl ) {
153 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
154 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_INCR:
155 0 : ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
156 0 : break;
157 0 : case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
158 0 : ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN;
159 0 : break;
160 0 : default:
161 0 : FD_LOG_CRIT(( "received unexpected ssctrl msg type %lu", meta_ctl ));
162 0 : }
163 0 : }
164 :
165 : static void
166 : handle_data_frag( fd_snapwh_t * ctx,
167 : ulong chunk, /* compressed input pointer */
168 0 : ulong sz_comp ) { /* compressed input size */
169 0 : ulong const io_seed = FD_VOLATILE_CONST( *ctx->io_seed );
170 :
171 0 : ulong rem_sz = sz_comp<<FD_VINYL_BSTREAM_BLOCK_LG_SZ;
172 0 : uchar * rem = fd_chunk_to_laddr( ctx->base, chunk );
173 0 : FD_CRIT( fd_ulong_is_aligned( (ulong)rem, FD_VINYL_BSTREAM_BLOCK_SZ ), "misaligned write request" );
174 0 : FD_CRIT( fd_ulong_is_aligned( rem_sz, FD_VINYL_BSTREAM_BLOCK_SZ ), "misaligned write request" );
175 :
176 0 : #define PAIR_HASH_N (8)
177 :
178 0 : uchar * pair[PAIR_HASH_N];
179 0 : ulong pair_sz[PAIR_HASH_N];
180 0 : ulong pair_cnt = 0UL;
181 0 : while( rem_sz ) {
182 0 : FD_CRIT( rem_sz>=FD_VINYL_BSTREAM_BLOCK_SZ, "corrupted bstream block" );
183 0 : fd_vinyl_bstream_phdr_t * phdr = (fd_vinyl_bstream_phdr_t *)rem;
184 0 : ulong ctl = phdr->ctl;
185 0 : int ctl_type = fd_vinyl_bstream_ctl_type( ctl );
186 0 : switch( ctl_type ) {
187 :
188 0 : case FD_VINYL_BSTREAM_CTL_TYPE_PAIR: {
189 0 : pair[ pair_cnt ] = rem;
190 0 : ulong val_esz = fd_vinyl_bstream_ctl_sz( ctl );
191 0 : ulong block_sz = fd_vinyl_bstream_pair_sz( val_esz );
192 0 : pair_sz[ pair_cnt ] = block_sz;
193 0 : pair_cnt += 1UL;
194 0 : rem += block_sz;
195 0 : rem_sz -= block_sz;
196 0 : break;
197 0 : }
198 :
199 0 : case FD_VINYL_BSTREAM_CTL_TYPE_ZPAD: {
200 0 : rem += FD_VINYL_BSTREAM_BLOCK_SZ;
201 0 : rem_sz -= FD_VINYL_BSTREAM_BLOCK_SZ;
202 0 : break;
203 0 : }
204 :
205 0 : default:
206 0 : FD_LOG_CRIT(( "unexpected vinyl bstream block ctl=%016lx", ctl ));
207 0 : }
208 :
209 0 : if( FD_UNLIKELY( ( pair_cnt==PAIR_HASH_N ) || ( !rem_sz ) ) ) {
210 0 : # if FD_HAS_AVX512 && defined(__AVX512DQ__)
211 0 : ulong h_seed[PAIR_HASH_N];
212 0 : ulong h_trail[PAIR_HASH_N];
213 0 : ulong h_block[PAIR_HASH_N];
214 0 : void const * h_tin [PAIR_HASH_N];
215 0 : ulong h_tinsz[PAIR_HASH_N] = {0};
216 0 : void const * h_bin [PAIR_HASH_N];
217 0 : ulong h_binsz[PAIR_HASH_N] = {0};
218 0 : for( ulong i=0UL; i<pair_cnt; i++ ) {
219 0 : h_seed[ i ] = io_seed;
220 0 : fd_vinyl_bstream_pair_zero( (fd_vinyl_bstream_block_t *)pair[ i ] );
221 0 : h_tin [ i ] = pair [ i ] + FD_VINYL_BSTREAM_BLOCK_SZ;
222 0 : h_tinsz[ i ] = pair_sz[ i ] - FD_VINYL_BSTREAM_BLOCK_SZ;
223 0 : h_bin [ i ] = pair [ i ];
224 0 : h_binsz[ i ] = FD_VINYL_BSTREAM_BLOCK_SZ;
225 0 : }
226 0 : fd_vinyl_bstream_hash_batch8( h_seed, h_trail, h_tin, h_tinsz );
227 0 : fd_vinyl_bstream_hash_batch8( h_trail, h_block, h_bin, h_binsz );
228 0 : for( ulong i=0UL; i<pair_cnt; i++ ) {
229 0 : fd_vinyl_bstream_block_t * ftr = (fd_vinyl_bstream_block_t *)( pair[ i ]+pair_sz[ i ]-FD_VINYL_BSTREAM_BLOCK_SZ );
230 0 : ftr->ftr.hash_trail = h_trail[ i ];
231 0 : ftr->ftr.hash_blocks = h_block[ i ];
232 0 : }
233 : # else
234 0 : (void)pair_sz;
235 0 : for( ulong hash_i=0UL; hash_i<pair_cnt; hash_i++ ) {
236 0 : fd_vinyl_bstream_pair_hash( io_seed, (fd_vinyl_bstream_block_t *)pair[ hash_i ] );
237 0 : }
238 0 : # endif
239 0 : pair_cnt = 0UL;
240 0 : }
241 0 : }
242 :
243 0 : #undef PAIR_HASH_N
244 0 : }
245 :
246 : static int
247 : during_frag( fd_snapwh_t * ctx,
248 : ulong in_idx,
249 : ulong meta_seq,
250 : ulong meta_sig,
251 : ulong meta_chunk,
252 : ulong meta_sz,
253 0 : ulong meta_ctl ) {
254 0 : (void)in_idx; (void)meta_seq; (void)meta_sig;
255 0 : ctx->idle_cnt = 0U;
256 :
257 0 : if( FD_UNLIKELY( meta_ctl==FD_SNAPSHOT_MSG_DATA ) ) {
258 0 : handle_data_frag( ctx, meta_chunk, meta_sz );
259 0 : } else {
260 0 : handle_control_frag( ctx, meta_ctl );
261 0 : }
262 :
263 0 : ctx->meta_chunk = meta_chunk;
264 0 : ctx->meta_ctl = meta_ctl;
265 :
266 0 : return 0;
267 0 : }
268 :
269 : static void
270 : after_frag( fd_snapwh_t * ctx,
271 : ulong in_idx,
272 : ulong meta_seq,
273 : ulong meta_sig,
274 : ulong meta_sz,
275 : ulong meta_tsorig,
276 : ulong meta_tspub,
277 0 : fd_stem_context_t * stem ) {
278 0 : (void)in_idx; (void)meta_seq;
279 0 : ulong meta_chunk = ctx->meta_chunk;
280 0 : ulong meta_ctl = ctx->meta_ctl;
281 0 : FD_CRIT( stem->seqs[0]==meta_seq, "seq desync" );
282 0 : fd_stem_publish( stem, 0UL, meta_sig, meta_chunk, meta_sz, meta_ctl, meta_tsorig, meta_tspub );
283 0 : ctx->next_seq = fd_seq_inc( meta_seq, 1UL );
284 0 : }
285 :
286 0 : #define STEM_BURST 1UL
287 0 : #define STEM_LAZY ((long)2e6)
288 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snapwh_t
289 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapwh_t)
290 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
291 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
292 0 : #define STEM_CALLBACK_BEFORE_CREDIT before_credit
293 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
294 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
295 :
296 : #include "../../disco/stem/fd_stem.c"
297 :
298 : static void
299 : run1( fd_topo_t * topo,
300 0 : fd_topo_tile_t * tile ) {
301 : /* snapwh is designed to be placed between snapin and snapwr, i.e.
302 : snapin -> snapwh -> snapwr. The in_fseq, however, needs to be
303 : propagated upstream from snapwr back to snapin. As a result,
304 : snapwh needs a dummy in_fseq that its fd_stem can write to in
305 : every iteration, without interfering with the fseq propagation. */
306 0 : static ulong tile2_in_fseq[1];
307 0 : static FD_TL fd_topo_tile_t tile2;
308 0 : tile2 = *tile;
309 0 : tile2.in_link_fseq[ 0 ] = tile2_in_fseq;
310 0 : stem_run( topo, &tile2 );
311 0 : }
312 :
313 : fd_topo_run_tile_t fd_tile_snapwh = {
314 : .name = NAME,
315 : .populate_allowed_fds = populate_allowed_fds,
316 : .populate_allowed_seccomp = populate_allowed_seccomp,
317 : .scratch_align = scratch_align,
318 : .scratch_footprint = scratch_footprint,
319 : .unprivileged_init = unprivileged_init,
320 : .run = run1
321 : };
322 :
323 : #undef NAME
|