Line data Source code
1 : #include "fd_stem.h"
2 :
3 : /* fd_stem provides services to multiplex multiple streams of input
4 : fragments and present them to a mix of reliable and unreliable
5 : consumers as though they were generated by multiple different
6 : multi-stream producers. The code can be included to generate
7 : a definition of stem_run which can be called as a tile main run
8 : loop.
9 :
10 : The template supports various callback functions which can be
11 : defined like #define STEM_CALLBACK_BEFORE_FRAG before_frag to
12 : tune the behavior of the stem_run loop. The callbacks are:
13 :
14 : DURING_HOUSEKEEPING
15 : Is called during the housekeeping routine, which happens infrequently
16 : on a schedule determined by the stem (based on the lazy parameter,
17 : see fd_tempo.h for more information). It is appropriate to do
18 : slightly expensive things here that wouldn't be OK to do in the main
19 : loop, like updating sequence numbers that are shared with other tiles
20 : (e.g. synchronization information), or sending batched information
21 : somewhere. The ctx is a user-provided context object from when the
22 : stem was initialized.
23 :
24 : METRICS_WRITE
25 : By convention, tiles may wish to accumulate high traffic metrics
26 : locally so they don't cause a lot of cache coherency traffic, and
27 : then periodically publish them to external observers. This callback
28 : is here to support that use case. It occurs infrequently during the
29 : housekeeping loop, and is called inside a compiler fence to ensure
30 : the writes do not get reordered, which may be important for observers
31 : or monitoring tools. The ctx is a user-provided context object from
32 : when the stem tile was initialized.
33 :
34 : BEFORE_CREDIT
35 : Is called every iteration of the stem run loop, whether there is a
36 : new frag ready to receive or not. This callback is also still
37 : invoked even if the stem is backpressured and cannot read any new
38 : fragments while waiting for downstream consumers to catch up. This
39 : callback is useful for things that need to occur even if no new frags
40 : are being handled. For example, servicing network connections could
41 : happen here. The ctx is a user-provided context object from when the
42 : stem tile was initialized. The stem is the stem which is invoking
43 : this callback. The stem should only be used for calling
44 : fd_stem_publish to publish a fragment to downstream consumers.
45 :
46 : The charge_busy argument is 0 by default, and should be set to 1 if
47 : the before_credit function is doing work that should be accounted for
48 : as part of the tiles busy indicator.
49 :
50 : AFTER_CREDIT
51 : Is called every iteration of the stem run loop, whether there is a
52 : new frag ready to receive or not, except in cases where the stem is
53 : backpressured by a downstream consumer and would not be able to
54 : publish. The callback might be used for publishing new fragments to
55 : downstream consumers in the main loop which are not in response to an
56 : incoming fragment. For example, code that collects incoming
57 : fragments over a period of 1 second and joins them together before
58 : publishing a large block fragment downstream, would publish the block
59 : here. The ctx is a user-provided context object from when the stem
60 : tile was initialized. The stem is the stem which is invoking this
61 : callback. The stem should only be used for calling fd_stem_publish to
62 : publish a fragment to downstream consumers.
63 :
64 : The opt_poll_in argument determines if the stem should proceed with
65 : checking for new fragments to consumer, or should `continue` the main
66 : stem loop to do credit checking again. This could be used if the
67 : after_credit function publishes, and the flow control needs to be
68 : checked again. By default, opt_poll_in is true and the stem will
69 : poll for fragments right away without rerunning the loop or checking
70 : for credits.
71 :
72 : The charge_busy argument is 0 by default, and should be set to 1 if
73 : the after_credit function is doing work that should be accounted for
74 : as part of the tiles busy indicator.
75 :
76 : BEFORE_FRAG
77 : Is called immediately whenever a new fragment has been detected that
78 : was published by an upstream producer. The signature and sequence
79 : number (sig and seq) provided as arguments are read atomically from
80 : shared memory, so must both match each other from the published
81 : fragment (aka. they will not be torn or partially overwritten).
82 : in_idx is an index in [0, num_ins) indicating which producer
83 : published the fragment. No fragment data has been read yet here, nor
84 : has other metadata, for example the size or timestamps of the
85 : fragment. Mainly this callback is useful for deciding whether to
86 : filter the fragment based on its signature. If the return value is
87 : non-zero, the frag will be skipped completely, no fragment data will
88 : be read, and the in will be advanced so that we now wait for the next
89 : fragment. The ctx is a user-provided context object from when the
90 : stem tile was initialized.
91 :
92 : DURING_FRAG
93 : Is called after the stem has received a new frag from an in, but
94 : before the stem has checked that it was overrun. This callback is
95 : not invoked if the stem is backpressured, as it would not try and
96 : read a frag from an in in the first place (instead, leaving it on the
97 : in mcache to backpressure the upstream producer). in_idx will be the
98 : index of the in that the frag was received from. If the producer of
99 : the frags is respecting flow control, it is safe to read frag data in
100 : any of the callbacks, but it is suggested to copy or read frag data
101 : within this callback, as if the producer does not respect flow
102 : control, the frag may be torn or corrupt due to an overrun by the
103 : reader. If the frag being read from has been overwritten while this
104 : callback is running, the frag will be ignored and the stem will not
105 : call the process function. Instead it will recover from the overrun
106 : and continue with new frags. This function cannot fail. The ctx is a
107 : user-provided context object from when the stem tile was initialized.
108 : seq, sig, chunk, and sz are the respective fields from the mcache
109 : fragment that was received. If the producer is not respecting flow
110 : control, these may be corrupt or torn and should not be trusted,
111 : except for seq which is read atomically.
112 :
113 : AFTER_FRAG
114 : Is is called immediately after the DURING_FRAG, along with an
115 : additional check that the reader was not overrun while handling the
116 : frag. If the reader was overrun, the frag is abandoned and this
117 : function is not called. This callback is not invoked if the stem is
118 : backpressured, as it would not read a frag in the first place.
119 : in_idx will be the index of the in that the frag was received from.
120 : You should not read the frag data directly here, as it might still
121 : get overrun, instead it should be copied out of the frag during the
122 : read callback if needed later. This function cannot fail. The ctx is
123 : a user-provided context object from when the stem tile was
124 : initialized. stem should only be used for calling fd_stem_publish to
125 : publish a fragment to downstream consumers. seq is the sequence
126 : number of the fragment that was read from the input mcache. sig,
127 : chunk, sz, and tsorig are the respective fields from the mcache
128 : fragment that was received. If the producer is not respecting flow
129 : control, these may be corrupt or torn and should not be trusted. */
130 :
131 : #if FD_HAS_SSE
132 :
133 : #include "../topo/fd_topo.h"
134 : #include "../metrics/fd_metrics.h"
135 : #include "../../tango/fd_tango.h"
136 :
137 : #ifndef STEM_BURST
138 : #error "STEM_BURST must be defined"
139 : #endif
140 :
141 : #ifndef STEM_CALLBACK_CONTEXT_TYPE
142 : #error "STEM_CALLBACK_CONTEXT_TYPE must be defined"
143 : #endif
144 :
145 : static inline void
146 0 : stem_in_update( fd_stem_tile_in_t * in ) {
147 0 : fd_fseq_update( in->fseq, in->seq );
148 :
149 0 : volatile ulong * metrics = fd_metrics_link_in( fd_metrics_base_tl, in->idx );
150 :
151 0 : uint * accum = in->accum;
152 0 : ulong a0 = (ulong)accum[0]; ulong a1 = (ulong)accum[1]; ulong a2 = (ulong)accum[2];
153 0 : ulong a3 = (ulong)accum[3]; ulong a4 = (ulong)accum[4]; ulong a5 = (ulong)accum[5];
154 0 : FD_COMPILER_MFENCE();
155 0 : metrics[0] += a0; metrics[1] += a1; metrics[2] += a2;
156 0 : metrics[3] += a3; metrics[4] += a4; metrics[5] += a5;
157 0 : FD_COMPILER_MFENCE();
158 0 : accum[0] = 0U; accum[1] = 0U; accum[2] = 0U;
159 0 : accum[3] = 0U; accum[4] = 0U; accum[5] = 0U;
160 0 : }
161 :
162 0 : #define STEM_SCRATCH_ALIGN (128UL)
163 :
164 : FD_FN_PURE static inline ulong
165 0 : stem_scratch_align( void ) {
166 0 : return STEM_SCRATCH_ALIGN;
167 0 : }
168 :
169 : FD_FN_PURE static inline ulong
170 : stem_scratch_footprint( ulong in_cnt,
171 : ulong out_cnt,
172 0 : ulong cons_cnt ) {
173 0 : ulong l = FD_LAYOUT_INIT;
174 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_stem_tile_in_t), in_cnt*sizeof(fd_stem_tile_in_t) ); /* in */
175 0 : l = FD_LAYOUT_APPEND( l, alignof(ulong), out_cnt*sizeof(ulong) ); /* out_depth */
176 0 : l = FD_LAYOUT_APPEND( l, alignof(ulong), out_cnt*sizeof(ulong) ); /* out_seq */
177 0 : l = FD_LAYOUT_APPEND( l, alignof(ulong const *), cons_cnt*sizeof(ulong const *) ); /* cons_fseq */
178 0 : l = FD_LAYOUT_APPEND( l, alignof(ulong *), cons_cnt*sizeof(ulong *) ); /* cons_slow */
179 0 : l = FD_LAYOUT_APPEND( l, alignof(ulong), cons_cnt*sizeof(ulong) ); /* cons_out */
180 0 : l = FD_LAYOUT_APPEND( l, alignof(ulong), cons_cnt*sizeof(ulong) ); /* cons_seq */
181 0 : l = FD_LAYOUT_APPEND( l, alignof(ushort), (in_cnt+cons_cnt+1UL)*sizeof(ushort) ); /* event_map */
182 0 : return FD_LAYOUT_FINI( l, stem_scratch_align() );
183 0 : }
184 :
185 : static inline void
186 : stem_run1( ulong in_cnt,
187 : fd_frag_meta_t const ** in_mcache,
188 : ulong ** in_fseq,
189 : ulong out_cnt,
190 : fd_frag_meta_t ** out_mcache,
191 : ulong cons_cnt,
192 : ulong * _cons_out,
193 : ulong ** _cons_fseq,
194 : ulong burst,
195 : long lazy,
196 : fd_rng_t * rng,
197 : void * scratch,
198 0 : STEM_CALLBACK_CONTEXT_TYPE * ctx ) {
199 : /* in frag stream state */
200 0 : ulong in_seq; /* current position in input poll sequence, in [0,in_cnt) */
201 0 : fd_stem_tile_in_t * in; /* in[in_seq] for in_seq in [0,in_cnt) has information about input fragment stream currently at
202 : position in_seq in the in_idx polling sequence. The ordering of this array is continuously
203 : shuffled to avoid lighthousing effects in the output fragment stream at extreme fan-in and load */
204 :
205 : /* out frag stream state */
206 0 : ulong * out_depth; /* ==fd_mcache_depth( out_mcache[out_idx] ) for out_idx in [0, out_cnt) */
207 0 : ulong * out_seq; /* next mux frag sequence number to publish for out_idx in [0, out_cnt) ]*/
208 :
209 : /* out flow control state */
210 0 : ulong cr_avail; /* number of flow control credits available to publish downstream, in [0,cr_max] */
211 0 : ulong const ** cons_fseq; /* cons_fseq[cons_idx] for cons_idx in [0,cons_cnt) is where to receive fctl credits from consumers */
212 0 : ulong ** cons_slow; /* cons_slow[cons_idx] for cons_idx in [0,cons_cnt) is where to accumulate slow events */
213 0 : ulong * cons_out; /* cons_out[cons_idx] for cons_idx in [0,cons_ct) is which out the consumer consumes from ]*/
214 0 : ulong * cons_seq; /* cons_seq [cons_idx] is the most recent observation of cons_fseq[cons_idx] */
215 :
216 : /* housekeeping state */
217 0 : ulong event_cnt; /* ==in_cnt+cons_cnt+1, total number of housekeeping events */
218 0 : ulong event_seq; /* current position in housekeeping event sequence, in [0,event_cnt) */
219 0 : ushort * event_map; /* current mapping of event_seq to event idx, event_map[ event_seq ] is next event to process */
220 0 : ulong async_min; /* minimum number of ticks between processing a housekeeping event, positive integer power of 2 */
221 :
222 : /* performance metrics */
223 0 : ulong metric_in_backp; /* is the run loop currently backpressured by one or more of the outs, in [0,1] */
224 0 : ulong metric_backp_cnt; /* Accumulates number of transitions of tile to backpressured between housekeeping events */
225 :
226 0 : ulong metric_regime_ticks[9]; /* How many ticks the tile has spent in each regime */
227 :
228 0 : if( FD_UNLIKELY( !scratch ) ) FD_LOG_ERR(( "NULL scratch" ));
229 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)scratch, stem_scratch_align() ) ) ) FD_LOG_ERR(( "misaligned scratch" ));
230 :
231 : /* in_backp==1, backp_cnt==0 indicates waiting for initial credits,
232 : cleared during first housekeeping if credits available */
233 0 : metric_in_backp = 1UL;
234 0 : metric_backp_cnt = 0UL;
235 0 : memset( metric_regime_ticks, 0, sizeof( metric_regime_ticks ) );
236 :
237 : /* in frag stream init */
238 :
239 0 : in_seq = 0UL; /* First in to poll */
240 :
241 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
242 0 : in = (fd_stem_tile_in_t *)FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_stem_tile_in_t), in_cnt*sizeof(fd_stem_tile_in_t) );
243 :
244 0 : ulong min_in_depth = (ulong)LONG_MAX;
245 :
246 0 : if( FD_UNLIKELY( !!in_cnt && !in_mcache ) ) FD_LOG_ERR(( "NULL in_mcache" ));
247 0 : if( FD_UNLIKELY( !!in_cnt && !in_fseq ) ) FD_LOG_ERR(( "NULL in_fseq" ));
248 0 : if( FD_UNLIKELY( in_cnt > UINT_MAX ) ) FD_LOG_ERR(( "in_cnt too large" ));
249 0 : for( ulong in_idx=0UL; in_idx<in_cnt; in_idx++ ) {
250 :
251 0 : if( FD_UNLIKELY( !in_mcache[ in_idx ] ) ) FD_LOG_ERR(( "NULL in_mcache[%lu]", in_idx ));
252 0 : if( FD_UNLIKELY( !in_fseq [ in_idx ] ) ) FD_LOG_ERR(( "NULL in_fseq[%lu]", in_idx ));
253 :
254 0 : fd_stem_tile_in_t * this_in = &in[ in_idx ];
255 :
256 0 : this_in->mcache = in_mcache[ in_idx ];
257 0 : this_in->fseq = in_fseq [ in_idx ];
258 :
259 0 : ulong depth = fd_mcache_depth( this_in->mcache ); min_in_depth = fd_ulong_min( min_in_depth, depth );
260 0 : if( FD_UNLIKELY( depth > UINT_MAX ) ) FD_LOG_ERR(( "in_mcache[%lu] too deep", in_idx ));
261 0 : this_in->depth = (uint)depth;
262 0 : this_in->idx = (uint)in_idx;
263 0 : this_in->seq = 0UL;
264 0 : this_in->mline = this_in->mcache + fd_mcache_line_idx( this_in->seq, this_in->depth );
265 :
266 0 : this_in->accum[0] = 0U; this_in->accum[1] = 0U; this_in->accum[2] = 0U;
267 0 : this_in->accum[3] = 0U; this_in->accum[4] = 0U; this_in->accum[5] = 0U;
268 0 : }
269 :
270 : /* out frag stream init */
271 :
272 0 : cr_avail = 0UL;
273 :
274 0 : out_depth = (ulong *)FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), out_cnt*sizeof(ulong) );
275 0 : out_seq = (ulong *)FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), out_cnt*sizeof(ulong) );
276 :
277 0 : ulong cr_max = fd_ulong_if( !out_cnt, 128UL, ULONG_MAX );
278 :
279 0 : for( ulong out_idx=0UL; out_idx<out_cnt; out_idx++ ) {
280 :
281 0 : if( FD_UNLIKELY( !out_mcache[ out_idx ] ) ) FD_LOG_ERR(( "NULL out_mcache[%lu]", out_idx ));
282 :
283 0 : out_depth[ out_idx ] = fd_mcache_depth( out_mcache[ out_idx ] );
284 0 : out_seq[ out_idx ] = 0UL;
285 :
286 0 : cr_max = fd_ulong_min( cr_max, out_depth[ out_idx ] );
287 0 : }
288 :
289 0 : cons_fseq = (ulong const **)FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong const *), cons_cnt*sizeof(ulong const *) );
290 0 : cons_slow = (ulong **) FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong *), cons_cnt*sizeof(ulong *) );
291 0 : cons_out = (ulong *) FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), cons_cnt*sizeof(ulong *) );
292 0 : cons_seq = (ulong *) FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), cons_cnt*sizeof(ulong) );
293 :
294 0 : if( FD_UNLIKELY( !!cons_cnt && !_cons_fseq ) ) FD_LOG_ERR(( "NULL cons_fseq" ));
295 0 : for( ulong cons_idx=0UL; cons_idx<cons_cnt; cons_idx++ ) {
296 0 : if( FD_UNLIKELY( !_cons_fseq[ cons_idx ] ) ) FD_LOG_ERR(( "NULL cons_fseq[%lu]", cons_idx ));
297 0 : cons_fseq[ cons_idx ] = _cons_fseq[ cons_idx ];
298 0 : cons_out [ cons_idx ] = _cons_out [ cons_idx ];
299 0 : cons_slow[ cons_idx ] = (ulong*)(fd_metrics_link_out( fd_metrics_base_tl, cons_idx ) + FD_METRICS_COUNTER_LINK_SLOW_COUNT_OFF);
300 0 : cons_seq [ cons_idx ] = fd_fseq_query( _cons_fseq[ cons_idx ] );
301 0 : }
302 :
303 : /* housekeeping init */
304 :
305 0 : if( lazy<=0L ) lazy = fd_tempo_lazy_default( cr_max );
306 0 : FD_LOG_INFO(( "Configuring housekeeping (lazy %li ns)", lazy ));
307 :
308 : /* Initialize the initial event sequence to immediately update
309 : cr_avail on the first run loop iteration and then update all the
310 : ins accordingly. */
311 :
312 0 : event_cnt = in_cnt + 1UL + cons_cnt;
313 0 : event_map = (ushort *)FD_SCRATCH_ALLOC_APPEND( l, alignof(ushort), event_cnt*sizeof(ushort) );
314 0 : event_seq = 0UL; event_map[ event_seq++ ] = (ushort)cons_cnt;
315 0 : for( ulong in_idx=0UL; in_idx< in_cnt; in_idx++ ) event_map[ event_seq++ ] = (ushort)(in_idx+cons_cnt+1UL);
316 0 : for( ulong cons_idx=0UL; cons_idx<cons_cnt; cons_idx++ ) event_map[ event_seq++ ] = (ushort)cons_idx;
317 0 : event_seq = 0UL;
318 :
319 0 : async_min = fd_tempo_async_min( lazy, event_cnt, (float)fd_tempo_tick_per_ns( NULL ) );
320 0 : if( FD_UNLIKELY( !async_min ) ) FD_LOG_ERR(( "bad lazy %lu %lu", (ulong)lazy, event_cnt ));
321 :
322 0 : FD_LOG_INFO(( "Running stem" ));
323 0 : FD_MGAUGE_SET( STEM, STATUS, 1UL );
324 0 : long then = fd_tickcount();
325 0 : long now = then;
326 0 : for(;;) {
327 :
328 : /* Do housekeeping at a low rate in the background */
329 :
330 0 : ulong housekeeping_ticks = 0UL;
331 0 : if( FD_UNLIKELY( (now-then)>=0L ) ) {
332 0 : ulong event_idx = (ulong)event_map[ event_seq ];
333 :
334 : /* Do the next async event. event_idx:
335 : <out_cnt - receive credits from out event_idx
336 : ==out_cnt - housekeeping
337 : >out_cnt - send credits to in event_idx - out_cnt - 1.
338 : Branch hints and order are optimized for the case:
339 : out_cnt >~ in_cnt >~ 1. */
340 :
341 0 : if( FD_LIKELY( event_idx<cons_cnt ) ) { /* cons fctl for cons cons_idx */
342 0 : ulong cons_idx = event_idx;
343 :
344 : /* Receive flow control credits from this out. */
345 0 : cons_seq[ cons_idx ] = fd_fseq_query( cons_fseq[ cons_idx ] );
346 :
347 0 : } else if( FD_LIKELY( event_idx>cons_cnt ) ) { /* in fctl for in in_idx */
348 0 : ulong in_idx = event_idx - cons_cnt - 1UL;
349 :
350 : /* Send flow control credits and drain flow control diagnostics
351 : for in_idx. */
352 :
353 0 : stem_in_update( &in[ in_idx ] );
354 :
355 0 : } else { /* event_idx==cons_cnt, housekeeping event */
356 :
357 : /* Update metrics counters to external viewers */
358 0 : FD_COMPILER_MFENCE();
359 0 : FD_MGAUGE_SET( STEM, HEARTBEAT, (ulong)now );
360 0 : FD_MGAUGE_SET( STEM, IN_BACKPRESSURE, metric_in_backp );
361 0 : FD_MCNT_INC ( STEM, BACKPRESSURE_COUNT, metric_backp_cnt );
362 0 : FD_MCNT_ENUM_COPY( STEM, REGIME_DURATION_NANOS, metric_regime_ticks );
363 : #ifdef STEM_CALLBACK_METRICS_WRITE
364 0 : STEM_CALLBACK_METRICS_WRITE( ctx );
365 : #endif
366 0 : FD_COMPILER_MFENCE();
367 0 : metric_backp_cnt = 0UL;
368 :
369 : /* Receive flow control credits */
370 0 : if( FD_LIKELY( cr_avail<cr_max ) ) {
371 0 : ulong slowest_cons = ULONG_MAX;
372 0 : cr_avail = cr_max;
373 0 : for( ulong cons_idx=0UL; cons_idx<cons_cnt; cons_idx++ ) {
374 0 : ulong cons_cr_avail = (ulong)fd_long_max( (long)cr_max-fd_long_max( fd_seq_diff( out_seq[ cons_out[ cons_idx ] ], cons_seq[ cons_idx ] ), 0L ), 0L );
375 0 : slowest_cons = fd_ulong_if( cons_cr_avail<cr_avail, cons_idx, slowest_cons );
376 0 : cr_avail = fd_ulong_min( cons_cr_avail, cr_avail );
377 0 : }
378 :
379 : /* See notes above about use of quasi-atomic diagnostic accum */
380 0 : if( FD_LIKELY( slowest_cons!=ULONG_MAX ) ) {
381 0 : FD_COMPILER_MFENCE();
382 0 : (*cons_slow[ slowest_cons ]) += metric_in_backp;
383 0 : FD_COMPILER_MFENCE();
384 0 : }
385 0 : }
386 :
387 : #ifdef STEM_CALLBACK_DURING_HOUSEKEEPING
388 0 : STEM_CALLBACK_DURING_HOUSEKEEPING( ctx );
389 : #endif
390 0 : }
391 :
392 : /* Select which event to do next (randomized round robin) and
393 : reload the housekeeping timer. */
394 :
395 0 : event_seq++;
396 0 : if( FD_UNLIKELY( event_seq>=event_cnt ) ) {
397 0 : event_seq = 0UL;
398 :
399 : /* Randomize the order of event processing for the next event
400 : event_cnt events to avoid lighthousing effects causing input
401 : credit starvation at extreme fan in/fan out, extreme in load
402 : and high credit return laziness. */
403 :
404 0 : ulong swap_idx = (ulong)fd_rng_uint_roll( rng, (uint)event_cnt );
405 0 : ushort map_tmp = event_map[ swap_idx ];
406 0 : event_map[ swap_idx ] = event_map[ 0 ];
407 0 : event_map[ 0 ] = map_tmp;
408 :
409 : /* We also do the same with the ins to prevent there being a
410 : correlated order frag origins from different inputs
411 : downstream at extreme fan in and extreme in load. */
412 :
413 0 : if( FD_LIKELY( in_cnt>1UL ) ) {
414 0 : swap_idx = (ulong)fd_rng_uint_roll( rng, (uint)in_cnt );
415 0 : fd_stem_tile_in_t in_tmp;
416 0 : in_tmp = in[ swap_idx ];
417 0 : in[ swap_idx ] = in[ 0 ];
418 0 : in[ 0 ] = in_tmp;
419 0 : }
420 0 : }
421 :
422 : /* Reload housekeeping timer */
423 0 : then = now + (long)fd_tempo_async_reload( rng, async_min );
424 0 : long next = fd_tickcount();
425 0 : housekeeping_ticks = (ulong)(next - now);
426 0 : now = next;
427 0 : }
428 :
429 : #if defined(STEM_CALLBACK_BEFORE_CREDIT) || defined(STEM_CALLBACK_AFTER_CREDIT) || defined(STEM_CALLBACK_AFTER_FRAG)
430 : fd_stem_context_t stem = {
431 : .mcaches = out_mcache,
432 : .depths = out_depth,
433 : .seqs = out_seq,
434 :
435 : .cr_avail = &cr_avail,
436 : .cr_decrement_amount = fd_ulong_if( out_cnt>0UL, 1UL, 0UL ),
437 : };
438 : #endif
439 :
440 : #ifdef STEM_CALLBACK_BEFORE_CREDIT
441 : int charge_busy_before = 0;
442 0 : STEM_CALLBACK_BEFORE_CREDIT( ctx, &stem, &charge_busy_before );
443 : #endif
444 :
445 : /* Check if we are backpressured. If so, count any transition into
446 : a backpressured regime and spin to wait for flow control credits
447 : to return. We don't do a fully atomic update here as it is only
448 : diagnostic and it will still be correct in the usual case where
449 : individual diagnostic counters aren't used by writers in
450 : different threads of execution. We only count the transition
451 : from not backpressured to backpressured. */
452 :
453 0 : if( FD_UNLIKELY( cr_avail<burst ) ) {
454 0 : metric_backp_cnt += (ulong)!metric_in_backp;
455 0 : metric_in_backp = 1UL;
456 0 : FD_SPIN_PAUSE();
457 0 : metric_regime_ticks[2] += housekeeping_ticks;
458 0 : long next = fd_tickcount();
459 0 : metric_regime_ticks[5] += (ulong)(next - now);
460 0 : now = next;
461 0 : continue;
462 0 : }
463 0 : metric_in_backp = 0UL;
464 :
465 : #ifdef STEM_CALLBACK_AFTER_CREDIT
466 : int poll_in = 1;
467 : int charge_busy_after = 0;
468 0 : STEM_CALLBACK_AFTER_CREDIT( ctx, &stem, &poll_in, &charge_busy_after );
469 0 : if( FD_UNLIKELY( !poll_in ) ) {
470 0 : metric_regime_ticks[1] += housekeeping_ticks;
471 0 : long next = fd_tickcount();
472 0 : metric_regime_ticks[4] += (ulong)(next - now);
473 0 : now = next;
474 0 : continue;
475 0 : }
476 0 : #endif
477 :
478 : /* Select which in to poll next (randomized round robin) */
479 :
480 0 : if( FD_UNLIKELY( !in_cnt ) ) {
481 0 : metric_regime_ticks[0] += housekeeping_ticks;
482 0 : long next = fd_tickcount();
483 0 : metric_regime_ticks[3] += (ulong)(next - now);
484 0 : now = next;
485 0 : continue;
486 0 : }
487 :
488 0 : ulong prefrag_ticks = 0UL;
489 : #if defined(STEM_CALLBACK_BEFORE_CREDIT) && defined(STEM_CALLBACK_AFTER_CREDIT)
490 0 : if( FD_LIKELY( charge_busy_before || charge_busy_after ) ) {
491 : #elif defined(STEM_CALLBACK_BEFORE_CREDIT)
492 0 : if( FD_LIKELY( charge_busy_before ) ) {
493 : #elif defined(STEM_CALLBACK_AFTER_CREDIT)
494 0 : if( FD_LIKELY( charge_busy_after ) ) {
495 0 : #endif
496 :
497 : #if defined(STEM_CALLBACK_BEFORE_CREDIT) || defined(STEM_CALLBACK_AFTER_CREDIT)
498 0 : long prefrag_next = fd_tickcount();
499 0 : prefrag_ticks = (ulong)(prefrag_next - now);
500 0 : now = prefrag_next;
501 0 : }
502 : #endif
503 :
504 0 : fd_stem_tile_in_t * this_in = &in[ in_seq ];
505 0 : in_seq++;
506 0 : if( in_seq>=in_cnt ) in_seq = 0UL; /* cmov */
507 :
508 : /* Check if this in has any new fragments to mux */
509 :
510 0 : ulong this_in_seq = this_in->seq;
511 0 : fd_frag_meta_t const * this_in_mline = this_in->mline; /* Already at appropriate line for this_in_seq */
512 :
513 0 : __m128i seq_sig = fd_frag_meta_seq_sig_query( this_in_mline );
514 0 : #if FD_USING_CLANG
515 : /* TODO: Clang optimizes extremely aggressively which breaks the
516 : atomicity expected by seq_sig_query. In particular, it replaces
517 : the sequence query with a second load (immediately following
518 : vector load). The signature query a few lines down is still an
519 : extract from the vector which then means that effectively the
520 : signature is loaded before the sequence number.
521 : Adding this clobbers of the vector prevents this optimization by
522 : forcing the seq query to be an extract, but we probably want a
523 : better long term solution. */
524 0 : __asm__( "" : "+x"(seq_sig) );
525 0 : #endif
526 0 : ulong seq_found = fd_frag_meta_sse0_seq( seq_sig );
527 :
528 0 : long diff = fd_seq_diff( this_in_seq, seq_found );
529 0 : if( FD_UNLIKELY( diff ) ) { /* Caught up or overrun, optimize for new frag case */
530 0 : ulong * housekeeping_regime = &metric_regime_ticks[0];
531 0 : ulong * prefrag_regime = &metric_regime_ticks[3];
532 0 : ulong * finish_regime = &metric_regime_ticks[6];
533 0 : if( FD_UNLIKELY( diff<0L ) ) { /* Overrun (impossible if in is honoring our flow control) */
534 0 : this_in->seq = seq_found; /* Resume from here (probably reasonably current, could query in mcache sync directly instead) */
535 0 : housekeeping_regime = &metric_regime_ticks[1];
536 0 : prefrag_regime = &metric_regime_ticks[4];
537 0 : finish_regime = &metric_regime_ticks[7];
538 0 : this_in->accum[ FD_METRICS_COUNTER_LINK_OVERRUN_POLLING_COUNT_OFF ]++;
539 0 : this_in->accum[ FD_METRICS_COUNTER_LINK_OVERRUN_POLLING_FRAG_COUNT_OFF ] += (uint)(-diff);
540 0 : }
541 : /* Don't bother with spin as polling multiple locations */
542 0 : *housekeeping_regime += housekeeping_ticks;
543 0 : *prefrag_regime += prefrag_ticks;
544 0 : long next = fd_tickcount();
545 0 : *finish_regime += (ulong)(next - now);
546 0 : now = next;
547 0 : continue;
548 0 : }
549 :
550 0 : ulong sig = fd_frag_meta_sse0_sig( seq_sig ); (void)sig;
551 : #ifdef STEM_CALLBACK_BEFORE_FRAG
552 0 : int filter = STEM_CALLBACK_BEFORE_FRAG( ctx, (ulong)this_in->idx, seq_found, sig );
553 0 : if( FD_UNLIKELY( filter<0 ) ) {
554 0 : metric_regime_ticks[1] += housekeeping_ticks;
555 0 : metric_regime_ticks[4] += prefrag_ticks;
556 0 : long next = fd_tickcount();
557 0 : metric_regime_ticks[7] += (ulong)(next - now);
558 0 : now = next;
559 0 : continue;
560 0 : } else if( FD_UNLIKELY( filter>0 ) ) {
561 0 : this_in->accum[ FD_METRICS_COUNTER_LINK_FILTERED_COUNT_OFF ]++;
562 0 : this_in->accum[ FD_METRICS_COUNTER_LINK_FILTERED_SIZE_BYTES_OFF ] += (uint)this_in_mline->sz; /* TODO: This might be overrun ... ? Not loaded atomically */
563 :
564 : this_in_seq = fd_seq_inc( this_in_seq, 1UL );
565 : this_in->seq = this_in_seq;
566 : this_in->mline = this_in->mcache + fd_mcache_line_idx( this_in_seq, this_in->depth );
567 :
568 0 : metric_regime_ticks[1] += housekeeping_ticks;
569 0 : metric_regime_ticks[4] += prefrag_ticks;
570 0 : long next = fd_tickcount();
571 0 : metric_regime_ticks[7] += (ulong)(next - now);
572 0 : now = next;
573 0 : continue;
574 0 : }
575 0 : #endif
576 :
577 : /* We have a new fragment to mux. Try to load it. This attempt
578 : should always be successful if in producers are honoring our flow
579 : control. Since we can cheaply detect if there are
580 : misconfigurations (should be an L1 cache hit / predictable branch
581 : in the properly configured case), we do so anyway. Note that if
582 : we are on a platform where AVX is atomic, this could be replaced
583 : by a flat AVX load of the metadata and an extraction of the found
584 : sequence number for higher performance. */
585 0 : FD_COMPILER_MFENCE();
586 0 : ulong chunk = (ulong)this_in_mline->chunk; (void)chunk;
587 0 : ulong sz = (ulong)this_in_mline->sz; (void)sz;
588 0 : ulong ctl = (ulong)this_in_mline->ctl; (void)ctl;
589 0 : ulong tsorig = (ulong)this_in_mline->tsorig; (void)tsorig;
590 :
591 : #ifdef STEM_CALLBACK_DURING_FRAG
592 0 : STEM_CALLBACK_DURING_FRAG( ctx, (ulong)this_in->idx, seq_found, sig, chunk, sz );
593 : #endif
594 :
595 0 : FD_COMPILER_MFENCE();
596 0 : ulong seq_test = this_in_mline->seq;
597 0 : FD_COMPILER_MFENCE();
598 :
599 0 : if( FD_UNLIKELY( fd_seq_ne( seq_test, seq_found ) ) ) { /* Overrun while reading (impossible if this_in honoring our fctl) */
600 0 : this_in->seq = seq_test; /* Resume from here (probably reasonably current, could query in mcache sync instead) */
601 0 : fd_metrics_link_in( fd_metrics_base_tl, this_in->idx )[ FD_METRICS_COUNTER_LINK_OVERRUN_READING_COUNT_OFF ]++; /* No local accum since extremely rare, faster to use smaller cache line */
602 0 : fd_metrics_link_in( fd_metrics_base_tl, this_in->idx )[ FD_METRICS_COUNTER_LINK_OVERRUN_READING_FRAG_COUNT_OFF ] += (uint)fd_seq_diff( seq_test, seq_found ); /* No local accum since extremely rare, faster to use smaller cache line */
603 : /* Don't bother with spin as polling multiple locations */
604 0 : metric_regime_ticks[1] += housekeeping_ticks;
605 0 : metric_regime_ticks[4] += prefrag_ticks;
606 0 : long next = fd_tickcount();
607 0 : metric_regime_ticks[7] += (ulong)(next - now);
608 0 : now = next;
609 0 : continue;
610 0 : }
611 :
612 : #ifdef STEM_CALLBACK_AFTER_FRAG
613 0 : STEM_CALLBACK_AFTER_FRAG( ctx, (ulong)this_in->idx, seq_found, sig, chunk, sz, tsorig, &stem );
614 0 : #endif
615 :
616 : /* Windup for the next in poll and accumulate diagnostics */
617 :
618 0 : this_in_seq = fd_seq_inc( this_in_seq, 1UL );
619 0 : this_in->seq = this_in_seq;
620 0 : this_in->mline = this_in->mcache + fd_mcache_line_idx( this_in_seq, this_in->depth );
621 :
622 0 : this_in->accum[ FD_METRICS_COUNTER_LINK_CONSUMED_COUNT_OFF ]++;
623 0 : this_in->accum[ FD_METRICS_COUNTER_LINK_CONSUMED_SIZE_BYTES_OFF ] += (uint)sz;
624 :
625 0 : metric_regime_ticks[1] += housekeeping_ticks;
626 0 : metric_regime_ticks[4] += prefrag_ticks;
627 0 : long next = fd_tickcount();
628 0 : metric_regime_ticks[7] += (ulong)(next - now);
629 0 : now = next;
630 0 : }
631 0 : }
632 :
633 : static void
634 : stem_run( fd_topo_t * topo,
635 0 : fd_topo_tile_t * tile ) {
636 0 : const fd_frag_meta_t * in_mcache[ FD_TOPO_MAX_LINKS ];
637 0 : ulong * in_fseq[ FD_TOPO_MAX_TILE_IN_LINKS ];
638 :
639 0 : ulong polled_in_cnt = 0UL;
640 0 : for( ulong i=0UL; i<tile->in_cnt; i++ ) {
641 0 : if( FD_UNLIKELY( !tile->in_link_poll[ i ] ) ) continue;
642 :
643 0 : in_mcache[ polled_in_cnt ] = topo->links[ tile->in_link_id[ i ] ].mcache;
644 0 : FD_TEST( in_mcache[ polled_in_cnt ] );
645 0 : in_fseq[ polled_in_cnt ] = tile->in_link_fseq[ i ];
646 0 : FD_TEST( in_fseq[ polled_in_cnt ] );
647 0 : polled_in_cnt += 1;
648 0 : }
649 :
650 0 : fd_frag_meta_t * out_mcache[ FD_TOPO_MAX_LINKS ];
651 0 : for( ulong i=0UL; i<tile->out_cnt; i++ ) {
652 0 : out_mcache[ i ] = topo->links[ tile->out_link_id[ i ] ].mcache;
653 0 : FD_TEST( out_mcache[ i ] );
654 0 : }
655 :
656 0 : ulong reliable_cons_cnt = 0UL;
657 0 : ulong cons_out[ FD_TOPO_MAX_LINKS ];
658 0 : ulong * cons_fseq[ FD_TOPO_MAX_LINKS ];
659 0 : for( ulong i=0UL; i<topo->tile_cnt; i++ ) {
660 0 : fd_topo_tile_t * consumer_tile = &topo->tiles[ i ];
661 0 : for( ulong j=0UL; j<consumer_tile->in_cnt; j++ ) {
662 0 : for( ulong k=0UL; k<tile->out_cnt; k++ ) {
663 0 : if( FD_UNLIKELY( consumer_tile->in_link_id[ j ]==tile->out_link_id[ k ] && consumer_tile->in_link_reliable[ j ] ) ) {
664 0 : cons_out[ reliable_cons_cnt ] = k;
665 0 : cons_fseq[ reliable_cons_cnt ] = consumer_tile->in_link_fseq[ j ];
666 0 : FD_TEST( cons_fseq[ reliable_cons_cnt ] );
667 0 : reliable_cons_cnt++;
668 : /* Need to test this, since each link may connect to many outs,
669 : you could construct a topology which has more than this
670 : consumers of links. */
671 0 : FD_TEST( reliable_cons_cnt<FD_TOPO_MAX_LINKS );
672 0 : }
673 0 : }
674 0 : }
675 0 : }
676 :
677 0 : fd_rng_t rng[1];
678 0 : FD_TEST( fd_rng_join( fd_rng_new( rng, 0, 0UL ) ) );
679 :
680 0 : STEM_CALLBACK_CONTEXT_TYPE * ctx = (STEM_CALLBACK_CONTEXT_TYPE*)fd_ulong_align_up( (ulong)fd_topo_obj_laddr( topo, tile->tile_obj_id ), STEM_CALLBACK_CONTEXT_ALIGN );
681 :
682 0 : stem_run1( polled_in_cnt,
683 0 : in_mcache,
684 0 : in_fseq,
685 0 : tile->out_cnt,
686 0 : out_mcache,
687 0 : reliable_cons_cnt,
688 0 : cons_out,
689 0 : cons_fseq,
690 0 : STEM_BURST,
691 : #ifdef STEM_LAZY
692 0 : STEM_LAZY,
693 : #else
694 : 0L,
695 : #endif
696 0 : rng,
697 0 : fd_alloca( STEM_SCRATCH_ALIGN, stem_scratch_footprint( polled_in_cnt, tile->out_cnt, reliable_cons_cnt ) ),
698 0 : ctx );
699 0 : }
700 :
701 : #endif /* FD_HAS_SSE */
|