Line data Source code
1 : #include "fd_stem.h"
2 :
3 : /* fd_stem provides services to multiplex multiple streams of input
4 : fragments and present them to a mix of reliable and unreliable
5 : consumers as though they were generated by multiple different
6 : multi-stream producers. The code can be included to generate
7 : a definition of stem_run which can be called as a tile main run
8 : loop.
9 :
10 : The template supports various callback functions which can be
11 : defined like #define STEM_CALLBACK_BEFORE_FRAG before_frag to
12 : tune the behavior of the stem_run loop. The callbacks are:
13 :
14 : SHOULD_SHUTDOWN
15 : It is called at the beginning of each iteration of the stem run loop,
16 : and if it returns non-zero, the stem will exit the run loop and
17 : return from the stem_run function. This is useful for shutting down
18 : the tile.
19 :
20 : DURING_HOUSEKEEPING
21 : Is called during the housekeeping routine, which happens infrequently
22 : on a schedule determined by the stem (based on the lazy parameter,
23 : see fd_tempo.h for more information). It is appropriate to do
24 : slightly expensive things here that wouldn't be OK to do in the main
25 : loop, like updating sequence numbers that are shared with other tiles
26 : (e.g. synchronization information), or sending batched information
27 : somewhere. The ctx is a user-provided context object from when the
28 : stem was initialized.
29 :
30 : METRICS_WRITE
31 : By convention, tiles may wish to accumulate high traffic metrics
32 : locally so they don't cause a lot of cache coherency traffic, and
33 : then periodically publish them to external observers. This callback
34 : is here to support that use case. It occurs infrequently during the
35 : housekeeping loop, and is called inside a compiler fence to ensure
36 : the writes do not get reordered, which may be important for observers
37 : or monitoring tools. The ctx is a user-provided context object from
38 : when the stem tile was initialized.
39 :
40 : BEFORE_CREDIT
41 : Is called every iteration of the stem run loop, whether there is a
42 : new frag ready to receive or not. This callback is also still
43 : invoked even if the stem is backpressured and cannot read any new
44 : fragments while waiting for downstream consumers to catch up. This
45 : callback is useful for things that need to occur even if no new frags
46 : are being handled. For example, servicing network connections could
47 : happen here. The ctx is a user-provided context object from when the
48 : stem tile was initialized. The stem is the stem which is invoking
49 : this callback. The stem should only be used for calling
50 : fd_stem_publish to publish a fragment to downstream consumers.
51 :
52 : The charge_busy argument is 0 by default, and should be set to 1 if
53 : the before_credit function is doing work that should be accounted for
54 : as part of the tiles busy indicator.
55 :
56 : AFTER_CREDIT
57 : Is called every iteration of the stem run loop, whether there is a
58 : new frag ready to receive or not, except in cases where the stem is
59 : backpressured by a downstream consumer and would not be able to
60 : publish. The callback might be used for publishing new fragments to
61 : downstream consumers in the main loop which are not in response to an
62 : incoming fragment. For example, code that collects incoming
63 : fragments over a period of 1 second and joins them together before
64 : publishing a large block fragment downstream, would publish the block
65 : here. The ctx is a user-provided context object from when the stem
66 : tile was initialized. The stem is the stem which is invoking this
67 : callback. The stem should only be used for calling fd_stem_publish to
68 : publish a fragment to downstream consumers.
69 :
70 : The opt_poll_in argument determines if the stem should proceed with
71 : checking for new fragments to consumer, or should `continue` the main
72 : stem loop to do credit checking again. This could be used if the
73 : after_credit function publishes, and the flow control needs to be
74 : checked again. By default, opt_poll_in is true and the stem will
75 : poll for fragments right away without rerunning the loop or checking
76 : for credits.
77 :
78 : The charge_busy argument is 0 by default, and should be set to 1 if
79 : the after_credit function is doing work that should be accounted for
80 : as part of the tiles busy indicator.
81 :
82 : BEFORE_FRAG
83 : Is called immediately whenever a new fragment has been detected that
84 : was published by an upstream producer. The signature and sequence
85 : number (sig and seq) provided as arguments are read atomically from
86 : shared memory, so must both match each other from the published
87 : fragment (aka. they will not be torn or partially overwritten).
88 : in_idx is an index in [0, num_ins) indicating which producer
89 : published the fragment. No fragment data has been read yet here, nor
90 : has other metadata, for example the size or timestamps of the
91 : fragment. Mainly this callback is useful for deciding whether to
92 : filter the fragment based on its signature. If the return value is
93 : non-zero, the frag will be skipped completely, no fragment data will
94 : be read, and the in will be advanced so that we now wait for the next
95 : fragment. If the return value is -1, then the frag is returned back
96 : to the message queue and will be reprocessed. The ctx is a
97 : user-provided context object from when the stem tile was initialized.
98 :
99 : DURING_FRAG
100 : Is called after the stem has received a new frag from an in, but
101 : before the stem has checked that it was overrun. This callback is
102 : not invoked if the stem is backpressured, as it would not try and
103 : read a frag from an in in the first place (instead, leaving it on the
104 : in mcache to backpressure the upstream producer). in_idx will be the
105 : index of the in that the frag was received from, skipping any unpolled
106 : links. If the producer of the frags is respecting flow control, it is
107 : safe to read frag data in any of the callbacks, but it is suggested to
108 : copy or read frag data within this callback, as if the producer does
109 : not respect flow control, the frag may be torn or corrupt due to an
110 : overrun by the reader. If the frag being read from has been
111 : overwritten while this callback is running, the frag will be ignored
112 : and the stem will not call the after_frag function. Instead it will
113 : recover from the overrun and continue with new frags. This function
114 : cannot fail. The ctx is a user-provided context object from when the
115 : stem tile was initialized. seq, sig, chunk, and sz are the respective
116 : fields from the mcache fragment that was received. If the producer
117 : is not respecting flow control, these may be corrupt or torn and
118 : should not be trusted, except for seq which is read atomically.
119 :
120 : RETURNABLE_FRAG
121 : Is called after the stem has received a new frag from an in, and
122 : assumes that the stem cannot be overrun. This special callback can
123 : instruct the stem not to advance the input sequence number, and
124 : instead return the fragment to the stem to be processed again. This
125 : is useful for processing partial data from fragments without copying
126 : it. This callback is unsafe in general contexts, since it assumes
127 : that the frag will not be overwritten while the callback is running,
128 : and that the frag data is valid throughout the function call. It
129 : should only be used when the stem is guaranteed to not be overrun.
130 : This callback is not invoked if the stem is backpressured, as it
131 : would not try and read a frag from an in in the first place (instead,
132 : leaving it on the in mcache to backpressure the upstream producer).
133 : in_idx will be the index of the in that the frag was received from.
134 : seq, sig, chunk, and sz are the respective fields from the mcache
135 : fragment that was received. tsorig and tspub are the timestamps of
136 : the fragment that was received, and are read atomically from shared
137 : memory, so must both match each other from the published fragment
138 : (aka. they will not be torn or partially overwritten). The ctx is a
139 : user-provided context object from when the stem tile was initialized.
140 : The callback should return 1 if the fragment was not fully processed
141 : and should be returned to the stem for further processing, or 0 if
142 : the fragment was fully processed and the consumer link should be
143 : advanced.
144 :
145 : AFTER_FRAG
146 : Is called immediately after the DURING_FRAG, along with an additional
147 : check that the reader was not overrun while handling the frag. If
148 : the reader was overrun, the frag is abandoned and this function is
149 : not called. This callback is not invoked if the stem is
150 : backpressured, as it would not read a frag in the first place.
151 : in_idx will be the index of the in that the frag was received from,
152 : skipping any unpolled links. You should not read the frag data directly
153 : here, as it might still get overrun, instead it should be copied out of
154 : the frag during the read callback if needed later. This function cannot
155 : fail. The ctx is a user-provided context object from when the stem tile
156 : was initialized. stem should only be used for calling fd_stem_publish
157 : to publish a fragment to downstream consumers. seq is the sequence
158 : number of the fragment that was read from the input mcache. sig,
159 : chunk, sz, tsorig, and tspub are the respective fields from the
160 : mcache fragment that was received. If the producer is not respecting
161 : flow control, these may be corrupt or torn and should not be trusted.
162 :
163 : AFTER_POLL_OVERRUN
164 : Is called when an overrun is detected while polling for new frags.
165 : This callback is not called when an overrun is detected in
166 : during_frag. */
167 :
168 : #include "../../util/log/fd_log.h"
169 : #include "../topo/fd_topo.h"
170 : #include "../metrics/fd_metrics.h"
171 : #include "../../tango/fd_tango.h"
172 :
173 : #ifndef STEM_NAME
174 : #define STEM_NAME stem
175 : #endif
176 0 : #define STEM_(n) FD_EXPAND_THEN_CONCAT3(STEM_NAME,_,n)
177 :
178 : #ifndef STEM_BURST
179 : #error "STEM_BURST must be defined"
180 : #endif
181 :
182 : #ifndef STEM_CALLBACK_CONTEXT_TYPE
183 : #error "STEM_CALLBACK_CONTEXT_TYPE must be defined"
184 : #endif
185 :
186 : #ifndef STEM_CALLBACK_CONTEXT_ALIGN
187 : #error "STEM_CALLBACK_CONTEXT_ALIGN must be defined"
188 : #endif
189 :
190 : #ifndef STEM_LAZY
191 0 : #define STEM_LAZY (0L)
192 : #endif
193 :
194 0 : #define STEM_SHUTDOWN_SEQ (ULONG_MAX-1UL)
195 :
196 : static inline void
197 0 : STEM_(in_update)( fd_stem_tile_in_t * in ) {
198 0 : __atomic_store_n( in->fseq, in->seq, __ATOMIC_RELEASE );
199 :
200 0 : volatile ulong * metrics = fd_metrics_link_in( fd_metrics_base_tl, in->idx );
201 :
202 0 : uint * accum = in->accum;
203 0 : ulong a0 = (ulong)accum[0]; ulong a1 = (ulong)accum[1]; ulong a2 = (ulong)accum[2];
204 0 : ulong a3 = (ulong)accum[3]; ulong a4 = (ulong)accum[4]; ulong a5 = (ulong)accum[5];
205 0 : FD_COMPILER_MFENCE();
206 0 : metrics[0] += a0; metrics[1] += a1; metrics[2] += a2;
207 0 : metrics[3] += a3; metrics[4] += a4; metrics[5] += a5;
208 0 : FD_COMPILER_MFENCE();
209 0 : accum[0] = 0U; accum[1] = 0U; accum[2] = 0U;
210 0 : accum[3] = 0U; accum[4] = 0U; accum[5] = 0U;
211 0 : }
212 :
213 : FD_FN_PURE static inline ulong
214 0 : STEM_(scratch_align)( void ) {
215 0 : return FD_STEM_SCRATCH_ALIGN;
216 0 : }
217 :
218 : FD_FN_PURE static inline ulong
219 : STEM_(scratch_footprint)( ulong in_cnt,
220 : ulong out_cnt,
221 0 : ulong cons_cnt ) {
222 0 : ulong l = FD_LAYOUT_INIT;
223 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_stem_tile_in_t), in_cnt*sizeof(fd_stem_tile_in_t) ); /* in */
224 0 : l = FD_LAYOUT_APPEND( l, alignof(ulong), out_cnt*sizeof(ulong) ); /* cr_avail */
225 0 : l = FD_LAYOUT_APPEND( l, alignof(ulong), out_cnt*sizeof(ulong) ); /* out_depth */
226 0 : l = FD_LAYOUT_APPEND( l, alignof(ulong), out_cnt*sizeof(ulong) ); /* out_seq */
227 0 : l = FD_LAYOUT_APPEND( l, alignof(int), out_cnt*sizeof(int) ); /* out_reliable */
228 0 : l = FD_LAYOUT_APPEND( l, alignof(ulong const *), cons_cnt*sizeof(ulong const *) ); /* cons_fseq */
229 0 : l = FD_LAYOUT_APPEND( l, alignof(ulong *), cons_cnt*sizeof(ulong *) ); /* cons_slow */
230 0 : l = FD_LAYOUT_APPEND( l, alignof(ulong), cons_cnt*sizeof(ulong) ); /* cons_out */
231 0 : l = FD_LAYOUT_APPEND( l, alignof(ulong), cons_cnt*sizeof(ulong) ); /* cons_seq */
232 0 : const ulong event_cnt = in_cnt + 1UL + cons_cnt;
233 0 : l = FD_LAYOUT_APPEND( l, alignof(ushort), event_cnt*sizeof(ushort) ); /* event_map */
234 0 : return FD_LAYOUT_FINI( l, STEM_(scratch_align)() );
235 0 : }
236 :
237 : static inline void
238 : STEM_(run1)( ulong in_cnt,
239 : fd_frag_meta_t const ** in_mcache,
240 : ulong ** in_fseq,
241 : ulong out_cnt,
242 : fd_frag_meta_t ** out_mcache,
243 : ulong cons_cnt,
244 : ulong * _cons_out,
245 : ulong ** _cons_fseq,
246 : volatile ulong ** _cons_slow,
247 : ulong burst,
248 : long lazy,
249 : fd_rng_t * rng,
250 : void * scratch,
251 0 : STEM_CALLBACK_CONTEXT_TYPE * ctx ) {
252 : /* in frag stream state */
253 0 : ulong in_seq; /* current position in input poll sequence, in [0,in_cnt) */
254 0 : fd_stem_tile_in_t * in; /* in[in_seq] for in_seq in [0,in_cnt) has information about input fragment stream currently at
255 : position in_seq in the in_idx polling sequence. The ordering of this array is continuously
256 : shuffled to avoid lighthousing effects in the output fragment stream at extreme fan-in and load */
257 :
258 : /* out frag stream state */
259 0 : ulong * out_depth; /* ==fd_mcache_depth( out_mcache[out_idx] ) for out_idx in [0, out_cnt) */
260 0 : ulong * out_seq; /* next mux frag sequence number to publish for out_idx in [0, out_cnt) ]*/
261 0 : int * out_reliable; /* out_reliable[out_idx] is 1 if out_idx has at least one reliable consumer, else 0 */
262 :
263 : /* out flow control state */
264 0 : ulong * cr_avail; /* number of flow control credits available to publish downstream across all outs */
265 0 : ulong min_cr_avail; /* minimum number of flow control credits available to publish downstream */
266 0 : ulong const ** cons_fseq; /* cons_fseq[cons_idx] for cons_idx in [0,cons_cnt) is where to receive fctl credits from consumers */
267 0 : volatile ulong ** cons_slow; /* cons_slow[cons_idx] for cons_idx in [0,cons_cnt) is where to accumulate slow events */
268 0 : ulong * cons_out; /* cons_out[cons_idx] for cons_idx in [0,cons_ct) is which out the consumer consumes from */
269 0 : ulong * cons_seq; /* cons_seq [cons_idx] is the most recent observation of cons_fseq[cons_idx] */
270 :
271 : /* housekeeping state */
272 0 : ulong event_cnt; /* ==in_cnt+cons_cnt+1, total number of housekeeping events */
273 0 : ulong event_seq; /* current position in housekeeping event sequence, in [0,event_cnt) */
274 0 : ushort * event_map; /* current mapping of event_seq to event idx, event_map[ event_seq ] is next event to process */
275 0 : ulong async_min; /* minimum number of ticks between processing a housekeeping event, positive integer power of 2 */
276 :
277 : /* performance metrics */
278 0 : ulong metric_in_backp; /* is the run loop currently backpressured by one or more of the outs, in [0,1] */
279 0 : ulong metric_backp_cnt; /* Accumulates number of transitions of tile to backpressured between housekeeping events */
280 :
281 0 : ulong metric_regime_ticks[ FD_METRICS_ENUM_TILE_REGIME_CNT ]; /* How many ticks the tile has spent in each regime */
282 :
283 0 : if( FD_UNLIKELY( !scratch ) ) FD_LOG_ERR(( "NULL scratch" ));
284 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)scratch, STEM_(scratch_align)() ) ) ) FD_LOG_ERR(( "misaligned scratch" ));
285 :
286 : /* in_backp==1, backp_cnt==0 indicates waiting for initial credits,
287 : cleared during first housekeeping if credits available */
288 0 : metric_in_backp = 1UL;
289 0 : metric_backp_cnt = 0UL;
290 0 : memset( metric_regime_ticks, 0, sizeof( metric_regime_ticks ) );
291 :
292 : /* in frag stream init */
293 :
294 0 : in_seq = 0UL; /* First in to poll */
295 :
296 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
297 0 : in = (fd_stem_tile_in_t *)FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_stem_tile_in_t), in_cnt*sizeof(fd_stem_tile_in_t) );
298 :
299 0 : if( FD_UNLIKELY( !!in_cnt && !in_mcache ) ) FD_LOG_ERR(( "NULL in_mcache" ));
300 0 : if( FD_UNLIKELY( !!in_cnt && !in_fseq ) ) FD_LOG_ERR(( "NULL in_fseq" ));
301 0 : if( FD_UNLIKELY( in_cnt > UINT_MAX ) ) FD_LOG_ERR(( "in_cnt too large" ));
302 0 : for( ulong in_idx=0UL; in_idx<in_cnt; in_idx++ ) {
303 :
304 0 : if( FD_UNLIKELY( !in_mcache[ in_idx ] ) ) FD_LOG_ERR(( "NULL in_mcache[%lu]", in_idx ));
305 0 : if( FD_UNLIKELY( !in_fseq [ in_idx ] ) ) FD_LOG_ERR(( "NULL in_fseq[%lu]", in_idx ));
306 :
307 0 : fd_stem_tile_in_t * this_in = &in[ in_idx ];
308 :
309 0 : this_in->mcache = in_mcache[ in_idx ];
310 0 : this_in->fseq = in_fseq [ in_idx ];
311 :
312 0 : ulong depth = fd_mcache_depth( this_in->mcache );
313 0 : if( FD_UNLIKELY( depth > UINT_MAX ) ) FD_LOG_ERR(( "in_mcache[%lu] too deep", in_idx ));
314 0 : this_in->depth = (uint)depth;
315 0 : this_in->idx = (uint)in_idx;
316 0 : this_in->seq = 0UL;
317 0 : this_in->mline = this_in->mcache + fd_mcache_line_idx( this_in->seq, this_in->depth );
318 :
319 0 : this_in->accum[0] = 0U; this_in->accum[1] = 0U; this_in->accum[2] = 0U;
320 0 : this_in->accum[3] = 0U; this_in->accum[4] = 0U; this_in->accum[5] = 0U;
321 0 : }
322 :
323 : /* out frag stream init */
324 :
325 0 : cr_avail = (ulong *)FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), out_cnt*sizeof(ulong) );
326 0 : min_cr_avail = fd_ulong_if( cons_cnt>0UL, 0UL, ULONG_MAX );
327 :
328 0 : out_depth = (ulong *)FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), out_cnt*sizeof(ulong) );
329 0 : out_seq = (ulong *)FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), out_cnt*sizeof(ulong) );
330 0 : out_reliable = (int *)FD_SCRATCH_ALLOC_APPEND( l, alignof(int), out_cnt*sizeof(int) );
331 :
332 0 : ulong cr_max = fd_ulong_if( !out_cnt || !cons_cnt, 128UL, ULONG_MAX );
333 :
334 0 : for( ulong out_idx=0UL; out_idx<out_cnt; out_idx++ ) {
335 :
336 0 : if( FD_UNLIKELY( !out_mcache[ out_idx ] ) ) FD_LOG_ERR(( "NULL out_mcache[%lu]", out_idx ));
337 :
338 0 : out_depth[ out_idx ] = fd_mcache_depth( out_mcache[ out_idx ] );
339 0 : out_seq[ out_idx ] = 0UL;
340 :
341 0 : cr_avail[ out_idx ] = out_depth[ out_idx ];
342 0 : out_reliable[ out_idx ] = 0;
343 0 : }
344 :
345 0 : cons_fseq = (ulong const **)FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong const *), cons_cnt*sizeof(ulong const *) );
346 0 : cons_slow = (volatile ulong **)FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong *), cons_cnt*sizeof(ulong *) );
347 0 : cons_out = (ulong *) FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), cons_cnt*sizeof(ulong) );
348 0 : cons_seq = (ulong *) FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), cons_cnt*sizeof(ulong) );
349 :
350 0 : if( FD_UNLIKELY( !!cons_cnt && !_cons_fseq ) ) FD_LOG_ERR(( "NULL cons_fseq" ));
351 0 : if( FD_UNLIKELY( !!cons_cnt && !_cons_slow ) ) FD_LOG_ERR(( "NULL cons_slow" ));
352 0 : for( ulong cons_idx=0UL; cons_idx<cons_cnt; cons_idx++ ) {
353 0 : if( FD_UNLIKELY( !_cons_fseq[ cons_idx ] ) ) FD_LOG_ERR(( "NULL cons_fseq[%lu]", cons_idx ));
354 0 : if( FD_UNLIKELY( !_cons_slow[ cons_idx ] ) ) FD_LOG_ERR(( "NULL cons_slow[%lu]", cons_idx ));
355 0 : cons_fseq[ cons_idx ] = _cons_fseq[ cons_idx ];
356 0 : cons_out [ cons_idx ] = _cons_out [ cons_idx ];
357 0 : cons_slow[ cons_idx ] = _cons_slow[ cons_idx ];
358 0 : cons_seq [ cons_idx ] = __atomic_load_n( _cons_fseq[ cons_idx ], __ATOMIC_ACQUIRE );
359 :
360 0 : out_reliable[ cons_out[ cons_idx ] ] = 1;
361 0 : cr_max = fd_ulong_min( cr_max, out_depth[ cons_out[ cons_idx ] ] );
362 0 : }
363 :
364 0 : if( FD_UNLIKELY( cons_cnt>0UL && burst>cr_max ) ) FD_LOG_ERR(( "one or more out links have insufficient depth for STEM_BURST %lu. cr_max is %lu", burst, cr_max ));
365 :
366 : /* housekeeping init */
367 :
368 0 : if( lazy<=0L ) lazy = fd_tempo_lazy_default( cr_max );
369 0 : if( FD_UNLIKELY( lazy>(long)1e9 ) ) FD_LOG_ERR(( "excessive stem lazy value: %li", lazy ));
370 0 : FD_LOG_INFO(( "Configuring housekeeping (lazy %li ns)", lazy ));
371 :
372 : /* Initialize the initial event sequence to immediately update
373 : cr_avail on the first run loop iteration and then update all the
374 : ins accordingly. */
375 :
376 0 : event_cnt = in_cnt + 1UL + cons_cnt;
377 0 : event_map = (ushort *)FD_SCRATCH_ALLOC_APPEND( l, alignof(ushort), event_cnt*sizeof(ushort) );
378 0 : event_seq = 0UL; event_map[ event_seq++ ] = (ushort)cons_cnt;
379 0 : for( ulong in_idx=0UL; in_idx< in_cnt; in_idx++ ) event_map[ event_seq++ ] = (ushort)(in_idx+cons_cnt+1UL);
380 0 : for( ulong cons_idx=0UL; cons_idx<cons_cnt; cons_idx++ ) event_map[ event_seq++ ] = (ushort)cons_idx;
381 0 : event_seq = 0UL;
382 :
383 0 : async_min = fd_tempo_async_min( lazy, event_cnt, (float)fd_tempo_tick_per_ns( NULL ) );
384 0 : if( FD_UNLIKELY( !async_min ) ) FD_LOG_ERR(( "bad lazy %lu %lu", (ulong)lazy, event_cnt ));
385 :
386 0 : FD_LOG_INFO(( "Running stem, cr_max = %lu", cr_max ));
387 0 : FD_MGAUGE_SET( TILE, STATUS, 1UL );
388 0 : long then = fd_tickcount();
389 0 : long now = then;
390 0 : for(;;) {
391 :
392 : #ifdef STEM_CALLBACK_SHOULD_SHUTDOWN
393 0 : if( FD_UNLIKELY( STEM_CALLBACK_SHOULD_SHUTDOWN( ctx ) ) ) break;
394 0 : #endif
395 :
396 : /* Do housekeeping at a low rate in the background */
397 :
398 0 : ulong housekeeping_ticks = 0UL;
399 0 : if( FD_UNLIKELY( (now-then)>=0L ) ) {
400 0 : ulong event_idx = (ulong)event_map[ event_seq ];
401 :
402 : /* Do the next async event. event_idx:
403 : <out_cnt - receive credits from out event_idx
404 : ==out_cnt - housekeeping
405 : >out_cnt - send credits to in event_idx - out_cnt - 1.
406 : Branch hints and order are optimized for the case:
407 : out_cnt >~ in_cnt >~ 1. */
408 :
409 0 : if( FD_LIKELY( event_idx<cons_cnt ) ) { /* cons fctl for cons cons_idx */
410 0 : ulong cons_idx = event_idx;
411 :
412 : /* Receive flow control credits from this out. */
413 0 : cons_seq[ cons_idx ] = __atomic_load_n( cons_fseq[ cons_idx ], __ATOMIC_ACQUIRE );
414 :
415 0 : } else if( FD_LIKELY( event_idx>cons_cnt ) ) { /* in fctl for in in_idx */
416 0 : ulong in_idx = event_idx - cons_cnt - 1UL;
417 :
418 : /* Send flow control credits and drain flow control diagnostics
419 : for in_idx. */
420 :
421 0 : STEM_(in_update)( &in[ in_idx ] );
422 :
423 0 : } else { /* event_idx==cons_cnt, housekeeping event */
424 :
425 : /* Update metrics counters to external viewers */
426 0 : FD_COMPILER_MFENCE();
427 0 : FD_MGAUGE_SET( TILE, HEARTBEAT, (ulong)fd_log_wallclock() );
428 0 : FD_MGAUGE_SET( TILE, IN_BACKPRESSURE, metric_in_backp );
429 0 : FD_MCNT_INC ( TILE, BACKPRESSURE_COUNT, metric_backp_cnt );
430 0 : FD_MCNT_ENUM_COPY( TILE, REGIME_DURATION_NANOS, metric_regime_ticks );
431 : #ifdef STEM_CALLBACK_METRICS_WRITE
432 0 : STEM_CALLBACK_METRICS_WRITE( ctx );
433 : #endif
434 0 : FD_COMPILER_MFENCE();
435 0 : metric_backp_cnt = 0UL;
436 :
437 : /* Receive flow control credits */
438 0 : if( FD_LIKELY( min_cr_avail<cr_max ) ) {
439 0 : ulong slowest_cons = ULONG_MAX;
440 0 : min_cr_avail = cr_max;
441 0 : for( ulong out_idx=0; out_idx<out_cnt; out_idx++ ) {
442 0 : cr_avail[ out_idx ] = out_depth[ out_idx ];
443 0 : }
444 :
445 0 : for( ulong cons_idx=0UL; cons_idx<cons_cnt; cons_idx++ ) {
446 0 : ulong out_idx = cons_out[ cons_idx ];
447 0 : ulong cons_cr_avail = (ulong)fd_long_max( (long)out_depth[ out_idx ]-fd_long_max( fd_seq_diff( out_seq[ out_idx ], cons_seq[ cons_idx ] ), 0L ), 0L );
448 :
449 : /* If a reliable consumer exits, they can set the credit
450 : return fseq to STEM_SHUTDOWN_SEQ to indicate they are no
451 : longer actively consuming. */
452 0 : cons_cr_avail = fd_ulong_if( cons_seq[ cons_idx ]==STEM_SHUTDOWN_SEQ, out_depth[ out_idx ], cons_cr_avail );
453 0 : slowest_cons = fd_ulong_if( cons_cr_avail<min_cr_avail, cons_idx, slowest_cons );
454 :
455 0 : cr_avail[ out_idx ] = fd_ulong_min( cr_avail[ out_idx ], cons_cr_avail );
456 0 : min_cr_avail = fd_ulong_min( cons_cr_avail, min_cr_avail );
457 0 : }
458 :
459 : /* See notes above about use of quasi-atomic diagnostic accum */
460 0 : if( FD_LIKELY( slowest_cons!=ULONG_MAX ) ) {
461 0 : FD_COMPILER_MFENCE();
462 0 : (*cons_slow[ slowest_cons ]) += metric_in_backp;
463 0 : FD_COMPILER_MFENCE();
464 0 : }
465 0 : }
466 :
467 : /* Publish producer progress sync word */
468 0 : for( ulong out_idx=0UL; out_idx<out_cnt; out_idx++ ) {
469 0 : fd_mcache_seq_update( fd_mcache_seq_laddr( out_mcache[ out_idx ] ), out_seq[ out_idx ] );
470 0 : }
471 :
472 : #ifdef STEM_CALLBACK_DURING_HOUSEKEEPING
473 0 : STEM_CALLBACK_DURING_HOUSEKEEPING( ctx );
474 : #else
475 : (void)ctx;
476 : #endif
477 0 : }
478 :
479 : /* Select which event to do next (randomized round robin) and
480 : reload the housekeeping timer. */
481 :
482 0 : event_seq++;
483 0 : if( FD_UNLIKELY( event_seq>=event_cnt ) ) {
484 0 : event_seq = 0UL;
485 :
486 : /* Randomize the order of event processing for the next event
487 : event_cnt events to avoid lighthousing effects causing input
488 : credit starvation at extreme fan in/fan out, extreme in load
489 : and high credit return laziness. */
490 :
491 0 : ulong swap_idx = (ulong)fd_rng_uint_roll( rng, (uint)event_cnt );
492 0 : ushort map_tmp = event_map[ swap_idx ];
493 0 : event_map[ swap_idx ] = event_map[ 0 ];
494 0 : event_map[ 0 ] = map_tmp;
495 :
496 : /* We also do the same with the ins to prevent there being a
497 : correlated order frag origins from different inputs
498 : downstream at extreme fan in and extreme in load. */
499 :
500 0 : if( FD_LIKELY( in_cnt>1UL ) ) {
501 0 : swap_idx = (ulong)fd_rng_uint_roll( rng, (uint)in_cnt );
502 0 : fd_stem_tile_in_t in_tmp;
503 0 : in_tmp = in[ swap_idx ];
504 0 : in[ swap_idx ] = in[ 0 ];
505 0 : in[ 0 ] = in_tmp;
506 0 : }
507 0 : }
508 :
509 : /* Reload housekeeping timer */
510 0 : then = now + (long)fd_tempo_async_reload( rng, async_min );
511 0 : long next = fd_tickcount();
512 0 : housekeeping_ticks = (ulong)(next - now);
513 0 : now = next;
514 0 : }
515 :
516 : #if defined(STEM_CALLBACK_BEFORE_CREDIT) || defined(STEM_CALLBACK_AFTER_CREDIT) || defined(STEM_CALLBACK_AFTER_FRAG) || defined(STEM_CALLBACK_RETURNABLE_FRAG)
517 : fd_stem_context_t stem = {
518 : .mcaches = out_mcache,
519 : .depths = out_depth,
520 : .seqs = out_seq,
521 :
522 : .cr_avail = cr_avail,
523 : .min_cr_avail = &min_cr_avail,
524 : .cr_decrement_amount = fd_ulong_if( out_cnt>0UL, 1UL, 0UL ),
525 : .out_reliable = out_reliable,
526 : };
527 : #endif
528 :
529 0 : int charge_busy_before = 0;
530 : #ifdef STEM_CALLBACK_BEFORE_CREDIT
531 0 : STEM_CALLBACK_BEFORE_CREDIT( ctx, &stem, &charge_busy_before );
532 : #endif
533 :
534 : /* Check if we are backpressured. If so, count any transition into
535 : a backpressured regime and spin to wait for flow control credits
536 : to return. We don't do a fully atomic update here as it is only
537 : diagnostic and it will still be correct in the usual case where
538 : individual diagnostic counters aren't used by writers in
539 : different threads of execution. We only count the transition
540 : from not backpressured to backpressured. */
541 :
542 0 : if( FD_UNLIKELY( min_cr_avail<burst ) ) {
543 0 : metric_backp_cnt += (ulong)!metric_in_backp;
544 0 : metric_in_backp = 1UL;
545 0 : FD_SPIN_PAUSE();
546 0 : metric_regime_ticks[2] += housekeeping_ticks;
547 0 : long next = fd_tickcount();
548 0 : metric_regime_ticks[5] += (ulong)(next - now);
549 0 : now = next;
550 0 : continue;
551 0 : }
552 0 : metric_in_backp = 0UL;
553 :
554 0 : int charge_busy_after = 0;
555 : #ifdef STEM_CALLBACK_AFTER_CREDIT
556 : int poll_in = 1;
557 0 : STEM_CALLBACK_AFTER_CREDIT( ctx, &stem, &poll_in, &charge_busy_after );
558 0 : if( FD_UNLIKELY( !poll_in ) ) {
559 0 : metric_regime_ticks[1] += housekeeping_ticks;
560 0 : long next = fd_tickcount();
561 0 : metric_regime_ticks[4] += (ulong)(next - now);
562 0 : now = next;
563 0 : continue;
564 0 : }
565 0 : #endif
566 :
567 : /* Select which in to poll next (randomized round robin) */
568 :
569 0 : if( FD_UNLIKELY( !in_cnt ) ) {
570 0 : int was_busy = charge_busy_before+charge_busy_after;
571 0 : metric_regime_ticks[0] += housekeeping_ticks;
572 0 : long next = fd_tickcount();
573 0 : if( FD_UNLIKELY( was_busy ) ) metric_regime_ticks[3] += (ulong)(next - now);
574 0 : else metric_regime_ticks[6] += (ulong)(next - now);
575 0 : now = next;
576 0 : continue;
577 0 : }
578 :
579 0 : ulong prefrag_ticks = 0UL;
580 : #if defined(STEM_CALLBACK_BEFORE_CREDIT) && defined(STEM_CALLBACK_AFTER_CREDIT)
581 0 : if( FD_LIKELY( charge_busy_before || charge_busy_after ) ) {
582 : #elif defined(STEM_CALLBACK_BEFORE_CREDIT)
583 0 : if( FD_LIKELY( charge_busy_before ) ) {
584 : #elif defined(STEM_CALLBACK_AFTER_CREDIT)
585 0 : if( FD_LIKELY( charge_busy_after ) ) {
586 0 : #endif
587 :
588 : #if defined(STEM_CALLBACK_BEFORE_CREDIT) || defined(STEM_CALLBACK_AFTER_CREDIT)
589 0 : long prefrag_next = fd_tickcount();
590 0 : prefrag_ticks = (ulong)(prefrag_next - now);
591 0 : now = prefrag_next;
592 0 : }
593 : #endif
594 :
595 0 : fd_stem_tile_in_t * this_in = &in[ in_seq ];
596 0 : in_seq++;
597 0 : if( in_seq>=in_cnt ) in_seq = 0UL; /* cmov */
598 :
599 : /* Check if this in has any new fragments to mux */
600 :
601 0 : ulong this_in_seq = this_in->seq;
602 0 : fd_frag_meta_t const * this_in_mline = this_in->mline; /* Already at appropriate line for this_in_seq */
603 :
604 0 : #if FD_HAS_AVX
605 0 : __m256i yline = FD_VOLATILE_CONST( this_in_mline->avx );
606 0 : ulong seq_found = fd_frag_meta_avx_seq( yline );
607 0 : ulong sig = fd_frag_meta_avx_sig( yline );
608 : #elif FD_HAS_SSE
609 : __m128i seq_sig = fd_frag_meta_seq_sig_query( this_in_mline );
610 : ulong seq_found = fd_frag_meta_sse0_seq( seq_sig );
611 : ulong sig = fd_frag_meta_sse0_sig( seq_sig );
612 : #elif FD_HAS_ARM
613 : ulong seq_found, sig;
614 : fd_arm_ldp16_acq_pc( this_in_mline->ul, seq_found, sig );
615 : ulong ul2, ul3;
616 : fd_arm_ldp16( this_in_mline->ul+2, ul2, ul3 );
617 : #else
618 : /* Without 128-bit atomic load, seq and sig might be read from
619 : different frags (due to overrun), which results in a before_frag
620 : and during_frag being issued with incorrect arguments, but not
621 : after_frag. */
622 : ulong seq_found = FD_VOLATILE_CONST( this_in_mline->seq );
623 : ulong sig = FD_VOLATILE_CONST( this_in_mline->sig );
624 : #endif
625 0 : (void)sig;
626 :
627 0 : long diff = fd_seq_diff( this_in_seq, seq_found );
628 0 : if( FD_UNLIKELY( diff ) ) { /* Caught up or overrun, optimize for new frag case */
629 0 : ulong * housekeeping_regime = &metric_regime_ticks[0];
630 0 : ulong * prefrag_regime = &metric_regime_ticks[3];
631 0 : ulong * finish_regime = &metric_regime_ticks[6];
632 0 : if( FD_UNLIKELY( diff<0L ) ) { /* Overrun (impossible if in is honoring our flow control) */
633 0 : this_in->seq = seq_found; /* Resume from here (probably reasonably current, could query in mcache sync directly instead) */
634 0 : housekeeping_regime = &metric_regime_ticks[1];
635 0 : prefrag_regime = &metric_regime_ticks[4];
636 0 : finish_regime = &metric_regime_ticks[7];
637 0 : this_in->accum[ FD_METRICS_COUNTER_LINK_OVERRUN_POLLING_COUNT_OFF ]++;
638 0 : this_in->accum[ FD_METRICS_COUNTER_LINK_OVERRUN_POLLING_FRAG_COUNT_OFF ] += (uint)(-diff);
639 :
640 : #ifdef STEM_CALLBACK_AFTER_POLL_OVERRUN
641 0 : STEM_CALLBACK_AFTER_POLL_OVERRUN( ctx );
642 : #endif
643 0 : }
644 :
645 : /* Don't bother with spin as polling multiple locations */
646 0 : *housekeeping_regime += housekeeping_ticks;
647 0 : *prefrag_regime += prefrag_ticks;
648 0 : long next = fd_tickcount();
649 0 : *finish_regime += (ulong)(next - now);
650 0 : now = next;
651 0 : continue;
652 0 : }
653 :
654 : #ifdef STEM_CALLBACK_BEFORE_FRAG
655 0 : int filter = STEM_CALLBACK_BEFORE_FRAG( ctx, (ulong)this_in->idx, seq_found, sig );
656 0 : if( FD_UNLIKELY( filter<0 ) ) {
657 0 : metric_regime_ticks[1] += housekeeping_ticks;
658 0 : metric_regime_ticks[4] += prefrag_ticks;
659 0 : long next = fd_tickcount();
660 0 : metric_regime_ticks[7] += (ulong)(next - now);
661 0 : now = next;
662 0 : continue;
663 0 : } else if( FD_UNLIKELY( filter>0 ) ) {
664 0 : this_in->accum[ FD_METRICS_COUNTER_LINK_FILTERED_COUNT_OFF ]++;
665 0 : this_in->accum[ FD_METRICS_COUNTER_LINK_FILTERED_SIZE_BYTES_OFF ] += (uint)this_in_mline->sz; /* TODO: This might be overrun ... ? Not loaded atomically */
666 :
667 : this_in_seq = fd_seq_inc( this_in_seq, 1UL );
668 : this_in->seq = this_in_seq;
669 : this_in->mline = this_in->mcache + fd_mcache_line_idx( this_in_seq, this_in->depth );
670 :
671 0 : metric_regime_ticks[1] += housekeeping_ticks;
672 0 : metric_regime_ticks[4] += prefrag_ticks;
673 0 : long next = fd_tickcount();
674 0 : metric_regime_ticks[7] += (ulong)(next - now);
675 0 : now = next;
676 0 : continue;
677 0 : }
678 0 : #endif
679 :
680 : /* We have a new fragment to mux. Try to load it. This attempt
681 : should always be successful if in producers are honoring our flow
682 : control. Since we can cheaply detect if there are
683 : misconfigurations (should be an L1 cache hit / predictable branch
684 : in the properly configured case), we do so anyway. */
685 0 : FD_COMPILER_MFENCE();
686 0 : #if FD_HAS_AVX
687 0 : ulong chunk = fd_frag_meta_avx_chunk ( yline ); (void)chunk;
688 0 : ulong sz = fd_frag_meta_avx_sz ( yline ); (void)sz;
689 0 : ulong ctl = fd_frag_meta_avx_ctl ( yline ); (void)ctl;
690 0 : ulong tsorig = fd_frag_meta_avx_tsorig( yline ); (void)tsorig;
691 0 : ulong tspub = fd_frag_meta_avx_tspub ( yline ); (void)tspub;
692 : #elif FD_HAS_ARM
693 : ulong chunk = fd_frag_meta_ul2_chunk ( ul2 ); (void)chunk;
694 : ulong sz = fd_frag_meta_ul2_sz ( ul2 ); (void)sz;
695 : ulong ctl = fd_frag_meta_ul2_ctl ( ul2 ); (void)ctl;
696 : ulong tsorig = fd_frag_meta_ul3_tsorig( ul3 ); (void)tsorig;
697 : ulong tspub = fd_frag_meta_ul3_tspub ( ul3 ); (void)tspub;
698 : #else
699 : ulong chunk = (ulong)this_in_mline->chunk; (void)chunk;
700 : ulong sz = (ulong)this_in_mline->sz; (void)sz;
701 : ulong ctl = (ulong)this_in_mline->ctl; (void)ctl;
702 : ulong tsorig = (ulong)this_in_mline->tsorig; (void)tsorig;
703 : ulong tspub = (ulong)this_in_mline->tspub; (void)tspub;
704 : #endif
705 :
706 : #ifdef STEM_CALLBACK_DURING_FRAG
707 0 : STEM_CALLBACK_DURING_FRAG( ctx, (ulong)this_in->idx, seq_found, sig, chunk, sz, ctl );
708 : #endif
709 :
710 0 : FD_HW_MFENCE_LD();
711 0 : ulong seq_test = FD_VOLATILE_CONST( this_in_mline->seq );
712 0 : FD_COMPILER_MFENCE();
713 :
714 0 : if( FD_UNLIKELY( fd_seq_ne( seq_test, seq_found ) ) ) { /* Overrun while reading (impossible if this_in honoring our fctl) */
715 0 : this_in->seq = seq_test; /* Resume from here (probably reasonably current, could query in mcache sync instead) */
716 0 : fd_metrics_link_in( fd_metrics_base_tl, this_in->idx )[ FD_METRICS_COUNTER_LINK_OVERRUN_READING_COUNT_OFF ]++; /* No local accum since extremely rare, faster to use smaller cache line */
717 0 : fd_metrics_link_in( fd_metrics_base_tl, this_in->idx )[ FD_METRICS_COUNTER_LINK_OVERRUN_READING_FRAG_COUNT_OFF ] += (uint)fd_seq_diff( seq_test, seq_found ); /* No local accum since extremely rare, faster to use smaller cache line */
718 : /* Don't bother with spin as polling multiple locations */
719 0 : metric_regime_ticks[1] += housekeeping_ticks;
720 0 : metric_regime_ticks[4] += prefrag_ticks;
721 0 : long next = fd_tickcount();
722 0 : metric_regime_ticks[7] += (ulong)(next - now);
723 0 : now = next;
724 0 : continue;
725 0 : }
726 :
727 : #ifdef STEM_CALLBACK_RETURNABLE_FRAG
728 0 : int return_frag = STEM_CALLBACK_RETURNABLE_FRAG( ctx, (ulong)this_in->idx, seq_found, sig, chunk, sz, ctl, tsorig, tspub, &stem );
729 0 : if( FD_UNLIKELY( return_frag ) ) {
730 0 : metric_regime_ticks[1] += housekeeping_ticks;
731 0 : long next = fd_tickcount();
732 0 : metric_regime_ticks[4] += (ulong)(next - now);
733 0 : now = next;
734 0 : continue;
735 0 : }
736 0 : #endif
737 :
738 : #ifdef STEM_CALLBACK_AFTER_FRAG
739 0 : STEM_CALLBACK_AFTER_FRAG( ctx, (ulong)this_in->idx, seq_found, sig, sz, tsorig, tspub, &stem );
740 0 : #endif
741 :
742 : /* Windup for the next in poll and accumulate diagnostics */
743 :
744 0 : this_in_seq = fd_seq_inc( this_in_seq, 1UL );
745 0 : this_in->seq = this_in_seq;
746 0 : this_in->mline = this_in->mcache + fd_mcache_line_idx( this_in_seq, this_in->depth );
747 :
748 0 : this_in->accum[ FD_METRICS_COUNTER_LINK_CONSUMED_COUNT_OFF ]++;
749 0 : this_in->accum[ FD_METRICS_COUNTER_LINK_CONSUMED_SIZE_BYTES_OFF ] += (uint)sz;
750 :
751 0 : metric_regime_ticks[1] += housekeeping_ticks;
752 0 : metric_regime_ticks[4] += prefrag_ticks;
753 0 : long next = fd_tickcount();
754 0 : metric_regime_ticks[7] += (ulong)(next - now);
755 0 : now = next;
756 0 : }
757 0 : }
758 :
759 : FD_FN_UNUSED static void
760 : STEM_(run)( fd_topo_t * topo,
761 0 : fd_topo_tile_t * tile ) {
762 0 : const fd_frag_meta_t * in_mcache[ FD_TOPO_MAX_LINKS ];
763 0 : ulong * in_fseq[ FD_TOPO_MAX_TILE_IN_LINKS ];
764 :
765 0 : ulong polled_in_cnt = 0UL;
766 0 : for( ulong i=0UL; i<tile->in_cnt; i++ ) {
767 0 : if( FD_UNLIKELY( !tile->in_link_poll[ i ] ) ) continue;
768 :
769 0 : in_mcache[ polled_in_cnt ] = topo->links[ tile->in_link_id[ i ] ].mcache;
770 0 : FD_TEST( in_mcache[ polled_in_cnt ] );
771 0 : in_fseq[ polled_in_cnt ] = tile->in_link_fseq[ i ];
772 0 : FD_TEST( in_fseq[ polled_in_cnt ] );
773 0 : polled_in_cnt += 1;
774 0 : }
775 :
776 0 : fd_frag_meta_t * out_mcache[ FD_TOPO_MAX_LINKS ];
777 0 : for( ulong i=0UL; i<tile->out_cnt; i++ ) {
778 0 : out_mcache[ i ] = topo->links[ tile->out_link_id[ i ] ].mcache;
779 0 : FD_TEST( out_mcache[ i ] );
780 0 : }
781 :
782 0 : ulong reliable_cons_cnt = 0UL;
783 0 : ulong cons_out[ FD_TOPO_MAX_LINKS ];
784 0 : ulong * cons_fseq[ FD_TOPO_MAX_LINKS ];
785 0 : volatile ulong * cons_slow[ FD_TOPO_MAX_LINKS ];
786 0 : for( ulong i=0UL; i<topo->tile_cnt; i++ ) {
787 0 : fd_topo_tile_t * consumer_tile = &topo->tiles[ i ];
788 0 : ulong polled_in_idx = 0UL;
789 0 : for( ulong j=0UL; j<consumer_tile->in_cnt; j++ ) {
790 0 : int is_polled = consumer_tile->in_link_poll[ j ];
791 0 : for( ulong k=0UL; k<tile->out_cnt; k++ ) {
792 0 : if( FD_UNLIKELY( consumer_tile->in_link_id[ j ]==tile->out_link_id[ k ] && consumer_tile->in_link_reliable[ j ] ) ) {
793 0 : cons_out[ reliable_cons_cnt ] = k;
794 0 : cons_fseq[ reliable_cons_cnt ] = consumer_tile->in_link_fseq[ j ];
795 0 : FD_TEST( cons_fseq[ reliable_cons_cnt ] );
796 0 : cons_slow[ reliable_cons_cnt ] = fd_metrics_link_in( consumer_tile->metrics, polled_in_idx ) + FD_METRICS_COUNTER_LINK_SLOW_COUNT_OFF;
797 0 : reliable_cons_cnt++;
798 : /* Need to test this, since each link may connect to many outs,
799 : you could construct a topology which has more than this
800 : consumers of links. */
801 0 : FD_TEST( reliable_cons_cnt<FD_TOPO_MAX_LINKS );
802 0 : }
803 0 : }
804 0 : if( FD_LIKELY( is_polled ) ) polled_in_idx++;
805 0 : }
806 0 : }
807 :
808 : /* The stem rng is only used internally for shuffling event/input
809 : polling ordering and for setting the housekeeping timer. It is
810 : never exposed to tile callbacks. As a result, no cryptographic
811 : quality is needed. fd_tickcount() provides per-run entropy,
812 : tile->id guarantees per-tile uniqueness, and fd_ulong_hash (a
813 : Murmur3 finalizer) gives near-perfect avalanche so even a 1-bit
814 : input difference flips ~50% of output bits. fd_rng_secure is not
815 : used here because STEM_(run) executes after the seccomp sandbox
816 : is applied. */
817 0 : fd_rng_t rng[1];
818 0 : FD_TEST( fd_rng_join( fd_rng_new( rng, (uint)fd_ulong_hash( (ulong)fd_tickcount() + tile->id ), 0UL ) ) );
819 :
820 0 : STEM_CALLBACK_CONTEXT_TYPE * ctx = (STEM_CALLBACK_CONTEXT_TYPE*)fd_ulong_align_up( (ulong)fd_topo_obj_laddr( topo, tile->tile_obj_id ), STEM_CALLBACK_CONTEXT_ALIGN );
821 :
822 0 : uchar __attribute__((aligned(FD_STEM_SCRATCH_ALIGN))) stem_scratch[ STEM_(scratch_footprint)( polled_in_cnt, tile->out_cnt, reliable_cons_cnt ) ];
823 :
824 0 : STEM_(run1)( polled_in_cnt,
825 0 : in_mcache,
826 0 : in_fseq,
827 0 : tile->out_cnt,
828 0 : out_mcache,
829 0 : reliable_cons_cnt,
830 0 : cons_out,
831 0 : cons_fseq,
832 0 : cons_slow,
833 0 : STEM_BURST,
834 0 : STEM_LAZY,
835 0 : rng,
836 0 : stem_scratch,
837 0 : ctx );
838 :
839 : #ifdef STEM_CALLBACK_METRICS_WRITE
840 : /* Write final metrics state before shutting down */
841 0 : FD_COMPILER_MFENCE();
842 0 : STEM_CALLBACK_METRICS_WRITE( ctx );
843 0 : FD_COMPILER_MFENCE();
844 : #endif
845 :
846 0 : if( FD_LIKELY( tile->allow_shutdown ) ) {
847 0 : for( ulong i=0UL; i<tile->in_cnt; i++ ) {
848 0 : if( FD_UNLIKELY( !tile->in_link_poll[ i ] || !tile->in_link_reliable[ i ] ) ) continue;
849 :
850 : /* Return infinite credits on any reliable consumer links so that
851 : producers now no longer expect us to consume. */
852 0 : ulong fseq_id = tile->in_link_fseq_obj_id[ i ];
853 0 : ulong * fseq = fd_fseq_join( fd_topo_obj_laddr( topo, fseq_id ) );
854 0 : FD_TEST( fseq );
855 0 : __atomic_store_n( fseq, STEM_SHUTDOWN_SEQ, __ATOMIC_RELEASE );
856 0 : }
857 0 : }
858 0 : }
859 :
860 : #undef STEM_NAME
861 : #undef STEM_
862 : #undef STEM_BURST
863 : #undef STEM_CALLBACK_CONTEXT_TYPE
864 : #undef STEM_CALLBACK_CONTEXT_ALIGN
865 : #undef STEM_LAZY
866 : #undef STEM_CALLBACK_SHOULD_SHUTDOWN
867 : #undef STEM_CALLBACK_DURING_HOUSEKEEPING
868 : #undef STEM_CALLBACK_METRICS_WRITE
869 : #undef STEM_CALLBACK_BEFORE_CREDIT
870 : #undef STEM_CALLBACK_AFTER_CREDIT
871 : #undef STEM_CALLBACK_BEFORE_FRAG
872 : #undef STEM_CALLBACK_DURING_FRAG
873 : #undef STEM_CALLBACK_RETURNABLE_FRAG
874 : #undef STEM_CALLBACK_AFTER_FRAG
875 : #undef STEM_CALLBACK_AFTER_POLL_OVERRUN
|