Line data Source code
1 : #include "fd_aio_tango.h"
2 :
3 : static void
4 : fd_aio_tango_send1( fd_aio_tango_tx_t * self,
5 0 : fd_aio_pkt_info_t const * pkt ) {
6 :
7 0 : fd_frag_meta_t * mcache = self->mcache;
8 0 : void * base = self->base;
9 0 : ulong depth = self->depth;
10 0 : ulong mtu = self->mtu;
11 0 : ulong orig = self->orig;
12 0 : ulong sig = self->sig;
13 0 : ulong chunk0 = self->chunk0;
14 0 : ulong wmark = self->wmark;
15 0 : uchar const * data = pkt->buf;
16 0 : ulong data_sz = pkt->buf_sz;
17 0 : ulong ts = fd_frag_meta_ts_comp( fd_tickcount() );
18 :
19 0 : int som = 1;
20 0 : int eom = 0;
21 0 : do {
22 0 : ulong frag_sz = fd_ulong_min( data_sz, mtu );
23 0 : uchar * frag = fd_chunk_to_laddr( base, self->chunk );
24 0 : /* */ eom = frag_sz == data_sz;
25 0 : ulong ctl = fd_frag_meta_ctl( orig, som, eom, 0 );
26 :
27 0 : fd_memcpy( frag, data, frag_sz );
28 0 : fd_mcache_publish( mcache, depth, self->seq, sig, self->chunk, frag_sz, ctl, ts, ts );
29 :
30 0 : self->seq = fd_seq_inc( self->seq, 1UL );
31 0 : self->chunk = fd_dcache_compact_next( self->chunk, frag_sz, chunk0, wmark );
32 0 : data += frag_sz;
33 0 : data_sz -= frag_sz;
34 0 : som = 0;
35 0 : } while( FD_UNLIKELY( !eom ) );
36 :
37 0 : }
38 :
39 : static int
40 : fd_aio_tango_send( void * ctx,
41 : fd_aio_pkt_info_t const * batch,
42 : ulong batch_cnt,
43 : ulong * opt_batch_idx,
44 0 : int flush ) {
45 0 : (void)opt_batch_idx; /* only set on failure, but this can't fail */
46 0 : (void)flush; /* always immediately publish to mcache */
47 0 : for( ulong j=0UL; j<batch_cnt; j++ ) {
48 0 : fd_aio_tango_send1( ctx, batch+j );
49 0 : }
50 0 : return FD_AIO_SUCCESS;
51 0 : }
52 :
53 : fd_aio_tango_tx_t *
54 : fd_aio_tango_tx_new( fd_aio_tango_tx_t * self,
55 : fd_frag_meta_t * mcache,
56 : void * dcache,
57 : void * base,
58 : ulong mtu,
59 : ulong orig,
60 0 : ulong sig ) {
61 0 : ulong depth = fd_mcache_depth( mcache );
62 0 : ulong chunk = fd_dcache_compact_chunk0( base, dcache );
63 0 : ulong wmark = fd_dcache_compact_wmark ( base, dcache, mtu );
64 0 : ulong seq = fd_mcache_seq0( mcache );
65 0 : *self = (fd_aio_tango_tx_t) {
66 0 : .mcache = mcache,
67 0 : .dcache = dcache,
68 0 : .base = base,
69 0 : .chunk0 = chunk,
70 0 : .wmark = wmark,
71 0 : .depth = depth,
72 0 : .mtu = mtu,
73 0 : .orig = orig,
74 0 : .sig = sig,
75 0 : .chunk = chunk,
76 0 : .seq = seq,
77 0 : };
78 0 : fd_aio_new( &self->aio, self, fd_aio_tango_send );
79 0 : return self;
80 0 : }
81 :
82 : void *
83 0 : fd_aio_tango_tx_delete( fd_aio_tango_tx_t * self ) {
84 0 : fd_aio_delete( &self->aio );
85 0 : return self;
86 0 : }
87 :
88 :
89 : fd_aio_tango_rx_t *
90 : fd_aio_tango_rx_new( fd_aio_tango_rx_t * self,
91 : fd_aio_t const * aio,
92 : fd_frag_meta_t const * mcache,
93 : ulong seq0,
94 0 : void * base ) {
95 0 : ulong depth = fd_mcache_depth( mcache );
96 0 : *self = (fd_aio_tango_rx_t) {
97 0 : .mcache = mcache,
98 0 : .depth = depth,
99 0 : .base = base,
100 0 : .seq = seq0,
101 0 : .aio = aio,
102 0 : };
103 0 : return self;
104 0 : }
105 :
106 : void *
107 0 : fd_aio_tango_rx_delete( fd_aio_tango_rx_t * self ) {
108 0 : return self;
109 0 : }
110 :
111 : void
112 0 : fd_aio_tango_rx_poll( fd_aio_tango_rx_t * self ) {
113 0 : fd_frag_meta_t const * mcache = self->mcache;
114 0 : ulong depth = self->depth;
115 0 : void * base = self->base;
116 0 : fd_aio_t const * aio = self->aio;
117 :
118 0 : # define RX_BATCH (64UL)
119 0 : fd_aio_pkt_info_t batch[ RX_BATCH ];
120 0 : ulong batch_idx;
121 0 : for( batch_idx=0UL; batch_idx<RX_BATCH; batch_idx++ ) {
122 :
123 : /* Poll next fragment */
124 0 : ulong seq_expected = self->seq;
125 0 : fd_frag_meta_t const * mline = mcache + fd_mcache_line_idx( seq_expected, depth );
126 0 : FD_COMPILER_MFENCE();
127 0 : ulong seq_found = mline->seq;
128 0 : FD_COMPILER_MFENCE();
129 0 : ulong chunk = mline->chunk;
130 0 : ulong sz = mline->sz;
131 0 : ulong ctl = mline->ctl;
132 0 : FD_COMPILER_MFENCE();
133 0 : ulong seq_test = mline->seq;
134 0 : FD_COMPILER_MFENCE();
135 0 : if( !fd_seq_eq( seq_found, seq_test ) ) break; /* overrun */
136 :
137 0 : if( fd_seq_gt( seq_expected, seq_found ) ) {
138 : /* caught up */
139 0 : break;
140 0 : }
141 :
142 0 : if( fd_seq_lt( seq_expected, seq_found ) ) {
143 : /* overrun */
144 0 : self->seq = seq_found;
145 0 : break;
146 0 : }
147 :
148 0 : if( fd_frag_meta_ctl_err( ctl ) || !fd_frag_meta_ctl_som( ctl ) ) {
149 0 : batch_idx--;
150 0 : } else {
151 0 : batch[ batch_idx ].buf = fd_chunk_to_laddr( base, chunk );
152 0 : batch[ batch_idx ].buf_sz = (ushort)sz;
153 0 : }
154 :
155 0 : self->seq = fd_seq_inc( seq_expected, 1UL );
156 :
157 0 : }
158 :
159 0 : ulong batch_cons;
160 0 : fd_aio_send( aio, batch, batch_idx, &batch_cons, 1 );
161 0 : }
|