Line data Source code
1 : #ifndef HEADER_fd_src_disco_stem_fd_stem_h
2 : #define HEADER_fd_src_disco_stem_fd_stem_h
3 :
4 : #include "../fd_disco_base.h"
5 :
6 0 : #define FD_STEM_SCRATCH_ALIGN (128UL)
7 :
8 : struct fd_stem_context {
9 : fd_frag_meta_t ** mcaches;
10 : ulong * seqs;
11 : ulong * depths;
12 :
13 : ulong * cr_avail;
14 : ulong * min_cr_avail;
15 : ulong cr_decrement_amount;
16 : int * out_reliable;
17 : };
18 :
19 : typedef struct fd_stem_context fd_stem_context_t;
20 :
21 : struct __attribute__((aligned(64))) fd_stem_tile_in {
22 : fd_frag_meta_t const * mcache; /* local join to this in's mcache */
23 : uint depth; /* == fd_mcache_depth( mcache ), depth of this in's cache (const) */
24 : uint idx; /* index of this in in the list of providers, [0, in_cnt) */
25 : ulong seq; /* sequence number of next frag expected from the upstream producer,
26 : updated when frag from this in is published */
27 : fd_frag_meta_t const * mline; /* == mcache + fd_mcache_line_idx( seq, depth ), location to poll next */
28 : ulong * fseq; /* local join to the fseq used to return flow control credits to the in */
29 : uint accum[6]; /* local diagnostic accumulators. These are drained during in housekeeping. */
30 : /* Assumes FD_FSEQ_DIAG_{PUB_CNT,PUB_SZ,FILT_CNT,FILT_SZ,OVRNP_CNT,OVRNP_FRAG_CNT} are 0:5 */
31 : };
32 :
33 : typedef struct fd_stem_tile_in fd_stem_tile_in_t;
34 :
35 : static inline void
36 : fd_stem_publish( fd_stem_context_t * stem,
37 : ulong out_idx,
38 : ulong sig,
39 : ulong chunk,
40 : ulong sz,
41 : ulong ctl,
42 : ulong tsorig,
43 63 : ulong tspub ) {
44 63 : fd_frag_meta_t * mcache = stem->mcaches[ out_idx ];
45 63 : ulong depth = stem->depths [ out_idx ];
46 63 : ulong * seqp = &stem->seqs [ out_idx ];
47 63 : ulong seq = *seqp;
48 63 : # if FD_HAS_AVX
49 63 : fd_mcache_publish_avx( mcache, depth, seq, sig, chunk, sz, ctl, tsorig, tspub );
50 : # elif FD_HAS_ARM
51 : fd_mcache_publish_arm( mcache, depth, seq, sig, chunk, sz, ctl, tsorig, tspub );
52 : # else
53 : fd_mcache_publish ( mcache, depth, seq, sig, chunk, sz, ctl, tsorig, tspub );
54 : # endif
55 63 : if( FD_LIKELY( stem->out_reliable[ out_idx ] ) ) {
56 63 : if( FD_UNLIKELY( stem->cr_avail[ out_idx ]<stem->cr_decrement_amount ) ) { /* Ensure producer BURST is set correctly */
57 0 : FD_LOG_ERR(( "BURST underprovisioned out_idx=%lu cr_avail=%lu min_cr_avail=%lu cr_decrement_amount=%lu", out_idx, stem->cr_avail[ out_idx ], *stem->min_cr_avail, stem->cr_decrement_amount ));
58 0 : }
59 63 : stem->cr_avail[ out_idx ] -= stem->cr_decrement_amount;
60 63 : *stem->min_cr_avail = fd_ulong_min( stem->cr_avail[ out_idx ], *stem->min_cr_avail );
61 63 : }
62 63 : *seqp = fd_seq_inc( seq, 1UL );
63 63 : }
64 :
65 : static inline ulong
66 : fd_stem_advance( fd_stem_context_t * stem,
67 0 : ulong out_idx ) {
68 0 : ulong * seqp = &stem->seqs[ out_idx ];
69 0 : ulong seq = *seqp;
70 0 : if( FD_LIKELY( stem->out_reliable[ out_idx ] ) ) {
71 0 : stem->cr_avail[ out_idx ] -= stem->cr_decrement_amount;
72 0 : *stem->min_cr_avail = fd_ulong_min( stem->cr_avail[ out_idx ], *stem->min_cr_avail );
73 0 : }
74 0 : *seqp = fd_seq_inc( seq, 1UL );
75 0 : return seq;
76 0 : }
77 :
78 : #endif /* HEADER_fd_src_disco_stem_fd_stem_h */
|