Line data Source code
1 : /* The snapwh tile updates vinyl_bstream_block integrity hashes for
2 : blocks flowing through. Assumes that:
3 : - vinyl records are not fragmented across buffers
4 : - vinyl records have trailing zeros (particular for the footer's
5 : hash numbers) */
6 :
7 : #include "utils/fd_ssctrl.h"
8 : #include "../../disco/topo/fd_topo.h"
9 : #include "../../disco/metrics/fd_metrics.h"
10 : #include "../../vinyl/bstream/fd_vinyl_bstream.h"
11 : #include "generated/fd_snapwh_tile_seccomp.h"
12 :
13 : #define NAME "snapwh"
14 :
15 : #define FD_SNAPWH_WR_FSEQ_CNT_MAX (16UL)
16 :
17 : struct fd_snapwh {
18 : /* Run loop */
19 : uint state;
20 : uint idle_cnt;
21 :
22 : /* Database params */
23 : ulong const * io_seed;
24 :
25 : /* RX link */
26 : void * base;
27 :
28 : /* ACKs / flow control */
29 : ulong * up_fseq;
30 : ulong const * wr_fseq[FD_SNAPWH_WR_FSEQ_CNT_MAX];
31 : ulong wr_fseq_cnt;
32 : ulong last_fseq;
33 : ulong next_seq;
34 :
35 : /* Scratch variables */
36 : ulong meta_chunk;
37 : ulong meta_ctl;
38 : };
39 :
40 : typedef struct fd_snapwh fd_snapwh_t;
41 :
42 : static ulong
43 0 : scratch_align( void ) {
44 0 : return alignof(fd_snapwh_t);
45 0 : }
46 :
47 : static ulong
48 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
49 0 : (void)tile;
50 0 : return sizeof(fd_snapwh_t);
51 0 : }
52 :
53 : static void
54 : unprivileged_init( fd_topo_t * topo,
55 0 : fd_topo_tile_t * tile ) {
56 0 : fd_snapwh_t * snapwh = fd_topo_obj_laddr( topo, tile->tile_obj_id );
57 0 : memset( snapwh, 0, sizeof(fd_snapwh_t) );
58 :
59 0 : if( FD_UNLIKELY( tile->kind_id ) ) FD_LOG_ERR(( "There can only be one `" NAME "` tile" ));
60 0 : if( FD_UNLIKELY( tile->in_cnt !=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 1", tile->in_cnt ));
61 0 : if( FD_UNLIKELY( tile->out_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 1", tile->out_cnt ));
62 :
63 0 : fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ 0 ] ];
64 0 : if( FD_UNLIKELY( !tile->in_link_reliable[ 0 ] ) ) FD_LOG_ERR(( "tile `" NAME "` in link 0 must be reliable" ));
65 0 : ulong * fseq = tile->in_link_fseq[ 0 ];
66 0 : snapwh->base = in_link->dcache;
67 0 : snapwh->up_fseq = &fseq[ 0 ];
68 :
69 0 : FD_CRIT( fd_dcache_app_sz( in_link->dcache )>=sizeof(ulong), "in_link dcache app region too small to hold io_seed" );
70 0 : snapwh->io_seed = (ulong const *)fd_dcache_app_laddr_const( in_link->dcache );
71 :
72 0 : ulong wr_fseq_cnt_exp = fd_topo_tile_name_cnt( topo, "snapwr" ) + fd_topo_tile_name_cnt( topo, "snaplh" );
73 0 : FD_TEST( wr_fseq_cnt_exp<=FD_SNAPWH_WR_FSEQ_CNT_MAX );
74 0 : ulong wr_fseq_cnt = 0UL;
75 0 : fd_topo_link_t const * out_link = &topo->links[ tile->out_link_id[ 0 ] ];
76 0 : FD_TEST( fd_topo_link_reliable_consumer_cnt( topo, out_link )==wr_fseq_cnt_exp );
77 0 : for( ulong tile_idx=0UL; tile_idx<topo->tile_cnt; tile_idx++ ) {
78 0 : fd_topo_tile_t const * consumer_tile = &topo->tiles[ tile_idx ];
79 0 : for( ulong in_idx=0UL; in_idx<consumer_tile->in_cnt; in_idx++ ) {
80 0 : if( consumer_tile->in_link_id[ in_idx ]==out_link->id ) {
81 0 : snapwh->wr_fseq[ wr_fseq_cnt ] = consumer_tile->in_link_fseq[ in_idx ];
82 0 : wr_fseq_cnt++;
83 0 : }
84 0 : }
85 0 : }
86 0 : snapwh->wr_fseq_cnt = wr_fseq_cnt;
87 0 : FD_TEST( snapwh->wr_fseq_cnt==wr_fseq_cnt_exp );
88 :
89 0 : snapwh->state = FD_SNAPSHOT_STATE_IDLE;
90 0 : snapwh->last_fseq = fd_fseq_query( snapwh->up_fseq );
91 0 : }
92 :
93 : static ulong
94 : populate_allowed_fds( fd_topo_t const * topo,
95 : fd_topo_tile_t const * tile,
96 : ulong out_fds_cnt,
97 0 : int * out_fds ) {
98 0 : (void)topo; (void)tile;
99 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
100 :
101 0 : ulong out_cnt = 0;
102 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
103 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
104 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
105 0 : }
106 :
107 0 : return out_cnt;
108 0 : }
109 :
110 : static ulong
111 : populate_allowed_seccomp( fd_topo_t const * topo,
112 : fd_topo_tile_t const * tile,
113 : ulong out_cnt,
114 0 : struct sock_filter * out ) {
115 0 : (void)topo; (void)tile;
116 0 : populate_sock_filter_policy_fd_snapwh_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() );
117 0 : return sock_filter_policy_fd_snapwh_tile_instr_cnt;
118 0 : }
119 :
120 : static int
121 0 : should_shutdown( fd_snapwh_t const * ctx ) {
122 0 : return ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN;
123 0 : }
124 :
125 : static void
126 : before_credit( fd_snapwh_t * ctx,
127 : fd_stem_context_t * stem,
128 0 : int * charge_busy ) {
129 0 : (void)stem;
130 0 : if( ++ctx->idle_cnt >= 1024U ) {
131 0 : fd_log_sleep( (long)1e6 ); /* 1 millisecond */
132 0 : *charge_busy = 0;
133 0 : ctx->idle_cnt = 0U;
134 0 : }
135 0 : *charge_busy = 0;
136 :
137 : /* Reverse path bubble up flow control credits received from downstream tiles */
138 0 : ulong wr_seq_min = ULONG_MAX;
139 0 : for( ulong i=0; i<ctx->wr_fseq_cnt; i++ ){
140 0 : ulong wr_seq = fd_fseq_query( ctx->wr_fseq[ i ] );
141 0 : wr_seq_min = fd_ulong_min( wr_seq_min, wr_seq );
142 0 : }
143 0 : if( FD_UNLIKELY( wr_seq_min!=ctx->last_fseq ) ) {
144 0 : fd_fseq_update( ctx->up_fseq, wr_seq_min );
145 0 : ctx->last_fseq = wr_seq_min;
146 0 : }
147 0 : }
148 :
149 : static void
150 0 : metrics_write( fd_snapwh_t * ctx ) {
151 0 : FD_MGAUGE_SET( SNAPWH, STATE, ctx->state );
152 0 : }
153 :
154 : /* handle_control_frag handles an administrative frag from the snapin
155 : tile. */
156 :
157 : static void
158 : handle_control_frag( fd_snapwh_t * ctx,
159 0 : ulong meta_ctl ) {
160 0 : switch( meta_ctl ) {
161 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
162 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_INCR:
163 0 : ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
164 0 : break;
165 0 : case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
166 0 : ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN;
167 0 : break;
168 0 : default:
169 0 : FD_LOG_CRIT(( "received unexpected ssctrl msg type %lu", meta_ctl ));
170 0 : }
171 0 : }
172 :
173 : static void
174 : handle_data_frag( fd_snapwh_t * ctx,
175 : ulong chunk, /* compressed input pointer */
176 0 : ulong sz_comp ) { /* compressed input size */
177 0 : ulong const io_seed = FD_VOLATILE_CONST( *ctx->io_seed );
178 :
179 0 : ulong rem_sz = sz_comp<<FD_VINYL_BSTREAM_BLOCK_LG_SZ;
180 0 : uchar * rem = fd_chunk_to_laddr( ctx->base, chunk );
181 0 : FD_CRIT( fd_ulong_is_aligned( (ulong)rem, FD_VINYL_BSTREAM_BLOCK_SZ ), "misaligned write request" );
182 0 : FD_CRIT( fd_ulong_is_aligned( rem_sz, FD_VINYL_BSTREAM_BLOCK_SZ ), "misaligned write request" );
183 :
184 0 : #define PAIR_HASH_N (8)
185 :
186 0 : uchar * pair[PAIR_HASH_N];
187 0 : ulong pair_sz[PAIR_HASH_N];
188 0 : ulong pair_cnt = 0UL;
189 0 : while( rem_sz ) {
190 0 : FD_CRIT( rem_sz>=FD_VINYL_BSTREAM_BLOCK_SZ, "corrupted bstream block" );
191 0 : fd_vinyl_bstream_phdr_t * phdr = (fd_vinyl_bstream_phdr_t *)rem;
192 0 : ulong ctl = phdr->ctl;
193 0 : int ctl_type = fd_vinyl_bstream_ctl_type( ctl );
194 0 : switch( ctl_type ) {
195 :
196 0 : case FD_VINYL_BSTREAM_CTL_TYPE_PAIR: {
197 0 : pair[ pair_cnt ] = rem;
198 0 : ulong val_esz = fd_vinyl_bstream_ctl_sz( ctl );
199 0 : ulong block_sz = fd_vinyl_bstream_pair_sz( val_esz );
200 0 : pair_sz[ pair_cnt ] = block_sz;
201 0 : pair_cnt += 1UL;
202 0 : rem += block_sz;
203 0 : rem_sz -= block_sz;
204 0 : break;
205 0 : }
206 :
207 0 : case FD_VINYL_BSTREAM_CTL_TYPE_ZPAD: {
208 0 : rem += FD_VINYL_BSTREAM_BLOCK_SZ;
209 0 : rem_sz -= FD_VINYL_BSTREAM_BLOCK_SZ;
210 0 : break;
211 0 : }
212 :
213 0 : default:
214 0 : FD_LOG_CRIT(( "unexpected vinyl bstream block ctl=%016lx", ctl ));
215 0 : }
216 :
217 0 : if( FD_UNLIKELY( ( pair_cnt==PAIR_HASH_N ) || ( !rem_sz ) ) ) {
218 0 : # if FD_HAS_AVX512 && defined(__AVX512DQ__)
219 0 : ulong h_seed[PAIR_HASH_N];
220 0 : ulong h_trail[PAIR_HASH_N];
221 0 : ulong h_block[PAIR_HASH_N];
222 0 : void const * h_tin [PAIR_HASH_N];
223 0 : ulong h_tinsz[PAIR_HASH_N] = {0};
224 0 : void const * h_bin [PAIR_HASH_N];
225 0 : ulong h_binsz[PAIR_HASH_N] = {0};
226 0 : for( ulong i=0UL; i<pair_cnt; i++ ) {
227 0 : h_seed[ i ] = io_seed;
228 0 : fd_vinyl_bstream_pair_zero( (fd_vinyl_bstream_block_t *)pair[ i ] );
229 0 : h_tin [ i ] = pair [ i ] + FD_VINYL_BSTREAM_BLOCK_SZ;
230 0 : h_tinsz[ i ] = pair_sz[ i ] - FD_VINYL_BSTREAM_BLOCK_SZ;
231 0 : h_bin [ i ] = pair [ i ];
232 0 : h_binsz[ i ] = FD_VINYL_BSTREAM_BLOCK_SZ;
233 0 : }
234 0 : fd_vinyl_bstream_hash_batch8( h_seed, h_trail, h_tin, h_tinsz );
235 0 : fd_vinyl_bstream_hash_batch8( h_trail, h_block, h_bin, h_binsz );
236 0 : for( ulong i=0UL; i<pair_cnt; i++ ) {
237 0 : fd_vinyl_bstream_block_t * ftr = (fd_vinyl_bstream_block_t *)( pair[ i ]+pair_sz[ i ]-FD_VINYL_BSTREAM_BLOCK_SZ );
238 0 : ftr->ftr.hash_trail = h_trail[ i ];
239 0 : ftr->ftr.hash_blocks = h_block[ i ];
240 0 : }
241 : # else
242 0 : (void)pair_sz;
243 0 : for( ulong hash_i=0UL; hash_i<pair_cnt; hash_i++ ) {
244 0 : fd_vinyl_bstream_pair_hash( io_seed, (fd_vinyl_bstream_block_t *)pair[ hash_i ] );
245 0 : }
246 0 : # endif
247 0 : pair_cnt = 0UL;
248 0 : }
249 0 : }
250 :
251 0 : #undef PAIR_HASH_N
252 0 : }
253 :
254 : static int
255 : during_frag( fd_snapwh_t * ctx,
256 : ulong in_idx,
257 : ulong meta_seq,
258 : ulong meta_sig,
259 : ulong meta_chunk,
260 : ulong meta_sz,
261 0 : ulong meta_ctl ) {
262 0 : (void)in_idx; (void)meta_seq; (void)meta_sig;
263 0 : ctx->idle_cnt = 0U;
264 :
265 0 : if( FD_UNLIKELY( meta_ctl==FD_SNAPSHOT_MSG_DATA ) ) {
266 0 : handle_data_frag( ctx, meta_chunk, meta_sz );
267 0 : } else {
268 0 : handle_control_frag( ctx, meta_ctl );
269 0 : }
270 :
271 0 : ctx->meta_chunk = meta_chunk;
272 0 : ctx->meta_ctl = meta_ctl;
273 :
274 0 : return 0;
275 0 : }
276 :
277 : static void
278 : after_frag( fd_snapwh_t * ctx,
279 : ulong in_idx,
280 : ulong meta_seq,
281 : ulong meta_sig,
282 : ulong meta_sz,
283 : ulong meta_tsorig,
284 : ulong meta_tspub,
285 0 : fd_stem_context_t * stem ) {
286 0 : (void)in_idx; (void)meta_seq;
287 0 : ulong meta_chunk = ctx->meta_chunk;
288 0 : ulong meta_ctl = ctx->meta_ctl;
289 0 : FD_CRIT( stem->seqs[0]==meta_seq, "seq desync" );
290 0 : fd_stem_publish( stem, 0UL, meta_sig, meta_chunk, meta_sz, meta_ctl, meta_tsorig, meta_tspub );
291 0 : ctx->next_seq = fd_seq_inc( meta_seq, 1UL );
292 0 : }
293 :
294 0 : #define STEM_BURST 1UL
295 0 : #define STEM_LAZY ((long)2e6)
296 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snapwh_t
297 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapwh_t)
298 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
299 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
300 0 : #define STEM_CALLBACK_BEFORE_CREDIT before_credit
301 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
302 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
303 :
304 : #include "../../disco/stem/fd_stem.c"
305 :
306 : static void
307 : run1( fd_topo_t * topo,
308 0 : fd_topo_tile_t * tile ) {
309 : /* snapwh is designed to be placed between snapin and snapwr, i.e.
310 : snapin -> snapwh -> snapwr. The in_fseq, however, needs to be
311 : propagated upstream from snapwr back to snapin. As a result,
312 : snapwh needs a dummy in_fseq that its fd_stem can write to in
313 : every iteration, without interfering with the fseq propagation. */
314 0 : static ulong tile2_in_fseq[1];
315 0 : static FD_TL fd_topo_tile_t tile2;
316 0 : tile2 = *tile;
317 0 : tile2.in_link_fseq[ 0 ] = tile2_in_fseq;
318 0 : stem_run( topo, &tile2 );
319 0 : }
320 :
321 : fd_topo_run_tile_t fd_tile_snapwh = {
322 : .name = NAME,
323 : .populate_allowed_fds = populate_allowed_fds,
324 : .populate_allowed_seccomp = populate_allowed_seccomp,
325 : .scratch_align = scratch_align,
326 : .scratch_footprint = scratch_footprint,
327 : .unprivileged_init = unprivileged_init,
328 : .run = run1
329 : };
330 :
331 : #undef NAME
|