Line data Source code
1 : #if !defined(__linux__)
2 : #error "fd_xsk_aio requires Linux operating system with XDP support"
3 : #endif
4 :
5 : #include "../../util/fd_util.h"
6 : #include "fd_xsk_aio_private.h"
7 :
8 : /* Forward declarations */
9 : static int
10 : fd_xsk_aio_send( void * ctx,
11 : fd_aio_pkt_info_t const * batch,
12 : ulong batch_cnt,
13 : ulong * opt_batch_idx,
14 : int flush );
15 :
16 : ulong
17 6 : fd_xsk_aio_align( void ) {
18 6 : return FD_XSK_AIO_ALIGN;
19 6 : }
20 :
21 : FD_FN_CONST ulong
22 : fd_xsk_aio_footprint( ulong tx_depth,
23 21 : ulong pkt_cnt ) {
24 21 : if( FD_UNLIKELY( tx_depth==0UL ) ) return 0UL;
25 15 : if( FD_UNLIKELY( pkt_cnt ==0UL ) ) return 0UL;
26 :
27 9 : ulong sz = 1UL*sizeof( fd_xsk_aio_t )
28 9 : + pkt_cnt*sizeof( fd_xsk_frame_meta_t )
29 9 : + pkt_cnt*sizeof( fd_aio_pkt_info_t )
30 9 : + tx_depth*sizeof( ulong );
31 :
32 9 : sz = fd_ulong_align_up( sz, FD_XSK_AIO_ALIGN );
33 : /* assert( sz%FD_XSK_AIO_ALIGN==0UL ) */
34 9 : return sz;
35 15 : }
36 :
37 : void *
38 : fd_xsk_aio_new( void * mem,
39 : ulong tx_depth,
40 15 : ulong pkt_cnt ) {
41 :
42 15 : if( FD_UNLIKELY( !mem ) ) {
43 3 : FD_LOG_WARNING(( "NULL mem" ));
44 3 : return NULL;
45 3 : }
46 :
47 12 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, fd_xsk_aio_align() ) ) ) {
48 3 : FD_LOG_WARNING(( "misaligned mem" ));
49 3 : return NULL;
50 3 : }
51 :
52 9 : ulong footprint = fd_xsk_aio_footprint( tx_depth, pkt_cnt );
53 9 : if( FD_UNLIKELY( !footprint ) ) {
54 6 : FD_LOG_WARNING(( "invalid footprint for tx_depth (%lu), pkt_cnt (%lu)",
55 6 : tx_depth, pkt_cnt ));
56 6 : return NULL;
57 6 : }
58 :
59 3 : fd_memset( mem, 0, footprint );
60 :
61 : /* Allocate objects in fd_xsk_aio_t */
62 :
63 3 : fd_xsk_aio_t * xsk_aio = (fd_xsk_aio_t *)mem;
64 :
65 : /* Assumes alignment of `fd_xsk_aio_t` matches alignment of
66 : `fd_xsk_frame_meta_t` and `fd_aio_pkt_info_t`. */
67 :
68 3 : ulong meta_off = sizeof(fd_xsk_aio_t );
69 3 : ulong pkt_off = meta_off + pkt_cnt*sizeof(fd_xsk_frame_meta_t);
70 3 : ulong tx_stack_off = pkt_off + pkt_cnt*sizeof(fd_aio_pkt_info_t );
71 :
72 3 : xsk_aio->pkt_depth = pkt_cnt;
73 3 : xsk_aio->tx_depth = tx_depth;
74 3 : xsk_aio->meta_off = meta_off;
75 3 : xsk_aio->pkt_off = pkt_off;
76 3 : xsk_aio->tx_stack_off = tx_stack_off;
77 :
78 3 : xsk_aio->metrics.tx_cnt = 0UL;
79 3 : xsk_aio->metrics.tx_sz = 0UL;
80 3 : xsk_aio->metrics.rx_cnt = 0UL;
81 3 : xsk_aio->metrics.rx_sz = 0UL;
82 :
83 : /* Mark object as valid */
84 :
85 3 : FD_COMPILER_MFENCE();
86 3 : FD_VOLATILE( xsk_aio->magic ) = FD_XSK_AIO_MAGIC;
87 3 : FD_COMPILER_MFENCE();
88 :
89 3 : return xsk_aio;
90 9 : }
91 :
92 :
93 : fd_xsk_aio_t *
94 : fd_xsk_aio_join( void * shxsk_aio,
95 9 : fd_xsk_t * xsk ) {
96 :
97 9 : if( FD_UNLIKELY( !shxsk_aio ) ) {
98 0 : FD_LOG_WARNING(( "NULL shxsk_aio" ));
99 0 : return NULL;
100 0 : }
101 :
102 9 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shxsk_aio, fd_xsk_aio_align() ) ) ) {
103 0 : FD_LOG_WARNING(( "misaligned shxsk_aio" ));
104 0 : return NULL;
105 0 : }
106 :
107 9 : if( FD_UNLIKELY( !xsk ) ) {
108 3 : FD_LOG_WARNING(( "NULL xsk" ));
109 3 : return NULL;
110 3 : }
111 :
112 : /* Validate memory layout */
113 :
114 6 : fd_xsk_aio_t * xsk_aio = (fd_xsk_aio_t *)shxsk_aio;
115 :
116 6 : if( FD_UNLIKELY( xsk_aio->magic!=FD_XSK_AIO_MAGIC ) ) {
117 3 : FD_LOG_WARNING(( "bad magic (not an fd_xsk_aio_t?)" ));
118 3 : return NULL;
119 3 : }
120 :
121 3 : if( FD_UNLIKELY( xsk_aio->xsk ) ) {
122 0 : FD_LOG_WARNING(( "xsk_aio in an unclean state, resetting" ));
123 0 : xsk_aio->xsk = NULL;
124 : /* continue */
125 0 : }
126 :
127 3 : fd_xsk_params_t const * params = fd_xsk_get_params( xsk );
128 :
129 3 : if( FD_UNLIKELY( params->tx_depth != xsk_aio->tx_depth ) ) {
130 0 : FD_LOG_WARNING(( "incompatible xsk (tx_depth=%lu) and xsk_aio (tx_depth=%lu)",
131 0 : params->tx_depth, xsk_aio->tx_depth ));
132 0 : return NULL;
133 0 : }
134 :
135 : /* Reset state */
136 :
137 3 : xsk_aio->xsk = xsk;
138 3 : fd_aio_delete( &xsk_aio->rx );
139 3 : fd_aio_delete( &xsk_aio->tx );
140 :
141 3 : xsk_aio->frame_mem = fd_xsk_umem_laddr( xsk );
142 3 : xsk_aio->frame_sz = params->frame_sz;
143 3 : xsk_aio->rx_off = 0;
144 3 : xsk_aio->tx_off = params->rx_depth;
145 3 : xsk_aio->tx_stack = fd_xsk_aio_tx_stack( xsk_aio );
146 3 : xsk_aio->tx_stack_depth = params->tx_depth;
147 3 : xsk_aio->tx_top = 0;
148 :
149 : /* Setup local TX */
150 :
151 3 : fd_aio_t * tx = fd_aio_join( fd_aio_new( &xsk_aio->tx, xsk_aio, fd_xsk_aio_send ) );
152 3 : if( FD_UNLIKELY( !tx ) ) {
153 0 : FD_LOG_WARNING(( "Failed to join local tx aio" ));
154 0 : return NULL;
155 0 : }
156 :
157 : /* Reset RX callback (laddr pointers to external object) */
158 :
159 3 : memset( &xsk_aio->rx, 0, sizeof(fd_aio_t) );
160 :
161 : /* Enqueue frames to RX ring for receive (via fill ring) */
162 :
163 3 : ulong frame_off = xsk_aio->rx_off;
164 3 : ulong frame_sz = params->frame_sz;
165 3 : ulong rx_depth = params->rx_depth;
166 3 : ulong tx_depth = params->tx_depth;
167 :
168 27 : for( ulong j=0; j<rx_depth; j++ ) {
169 24 : ulong enq_cnt = fd_xsk_rx_enqueue( xsk, &frame_off, 1U );
170 24 : frame_off += frame_sz;
171 :
172 24 : if( FD_UNLIKELY( !enq_cnt ) ) {
173 0 : FD_LOG_WARNING(( "fd_xsk_rx_enqueue() failed, was fd_xsk_t properly flushed?" ));
174 0 : return NULL;
175 0 : }
176 24 : }
177 :
178 : /* Add all TX frames to the free stack */
179 :
180 3 : frame_off = xsk_aio->tx_off*frame_sz;
181 27 : for( ulong j=0; j<tx_depth; j++ ) {
182 24 : xsk_aio->tx_stack[xsk_aio->tx_top] = frame_off;
183 24 : xsk_aio->tx_top++;
184 24 : frame_off += frame_sz;
185 24 : }
186 :
187 3 : return (fd_xsk_aio_t *)xsk_aio;
188 3 : }
189 :
190 :
191 : void *
192 3 : fd_xsk_aio_leave( fd_xsk_aio_t * xsk_aio ) {
193 :
194 3 : if( FD_UNLIKELY( !xsk_aio ) ) {
195 0 : FD_LOG_WARNING(( "NULL xsk_aio" ));
196 0 : return NULL;
197 0 : }
198 :
199 3 : xsk_aio->xsk = NULL;
200 :
201 3 : fd_aio_delete( fd_aio_leave( &xsk_aio->rx ) );
202 3 : fd_aio_delete( fd_aio_leave( &xsk_aio->tx ) );
203 :
204 3 : return (void *)xsk_aio;
205 3 : }
206 :
207 : void *
208 6 : fd_xsk_aio_delete( void * shxsk_aio ) {
209 :
210 6 : if( FD_UNLIKELY( !shxsk_aio ) ) {
211 0 : FD_LOG_WARNING(( "NULL shxsk_aio" ));
212 0 : return NULL;
213 0 : }
214 :
215 6 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shxsk_aio, fd_xsk_aio_align() ) ) ) {
216 0 : FD_LOG_WARNING(( "misaligned xsk_aio" ));
217 0 : return NULL;
218 0 : }
219 :
220 6 : fd_xsk_aio_t * xsk_aio = (fd_xsk_aio_t *)shxsk_aio;
221 :
222 6 : if( FD_UNLIKELY( xsk_aio->magic!=FD_XSK_AIO_MAGIC ) ) {
223 3 : FD_LOG_WARNING(( "bad magic" ));
224 3 : return NULL;
225 3 : }
226 :
227 3 : FD_COMPILER_MFENCE();
228 3 : FD_VOLATILE( xsk_aio->magic ) = 0UL;
229 3 : FD_COMPILER_MFENCE();
230 :
231 3 : return (void *)xsk_aio;
232 6 : }
233 :
234 :
235 : fd_aio_t const *
236 3 : fd_xsk_aio_get_tx( fd_xsk_aio_t const * xsk_aio ) {
237 3 : return &xsk_aio->tx;
238 3 : }
239 :
240 : void
241 : fd_xsk_aio_set_rx( fd_xsk_aio_t * xsk_aio,
242 3 : fd_aio_t const * aio ) {
243 3 : fd_memcpy( &xsk_aio->rx, aio, sizeof(fd_aio_t) );
244 3 : }
245 :
246 :
247 : int
248 9 : fd_xsk_aio_service( fd_xsk_aio_t * xsk_aio ) {
249 9 : fd_xsk_t * xsk = xsk_aio->xsk;
250 9 : fd_aio_t * ingress = &xsk_aio->rx;
251 9 : fd_xsk_frame_meta_t * meta = fd_xsk_aio_meta( xsk_aio );
252 9 : fd_aio_pkt_info_t * pkt = fd_xsk_aio_pkts( xsk_aio );
253 9 : ulong pkt_depth = xsk_aio->pkt_depth;
254 9 : ulong frame_laddr = (ulong)fd_xsk_umem_laddr( xsk_aio->xsk );
255 :
256 : /* try completing receives */
257 9 : ulong rx_avail = fd_xsk_rx_complete( xsk, meta, pkt_depth );
258 :
259 : /* forward to aio */
260 9 : if( rx_avail ) {
261 36 : for( ulong j=0; j<rx_avail; j++ ) {
262 30 : pkt[j] = (fd_aio_pkt_info_t) {
263 30 : .buf = (void *)(frame_laddr + meta[j].off),
264 30 : .buf_sz = (ushort)meta[j].sz
265 30 : };
266 30 : }
267 :
268 6 : fd_aio_send( ingress, pkt, rx_avail, NULL, 1 );
269 : /* TODO frames may not all be processed at this point
270 : we should count them, and possibly buffer them */
271 :
272 6 : xsk_aio->metrics.rx_cnt += rx_avail;
273 36 : for( ulong j=0; j<rx_avail; j++ ) xsk_aio->metrics.rx_sz += meta[j].sz;
274 :
275 : /* return frames to rx ring */
276 6 : ulong enq_rc = fd_xsk_rx_enqueue2( xsk, meta, rx_avail );
277 6 : if( FD_UNLIKELY( enq_rc < rx_avail ) ) {
278 : /* keep trying indefinitely */
279 : /* TODO consider adding a timeout */
280 0 : ulong j = enq_rc;
281 0 : while( rx_avail > j ) {
282 0 : ulong enq_rc = fd_xsk_rx_enqueue2( xsk, meta + j, rx_avail - j );
283 0 : j += enq_rc;
284 0 : }
285 0 : }
286 6 : }
287 :
288 : /* any tx to complete? */
289 9 : ulong tx_completed = fd_xsk_tx_complete( xsk,
290 9 : xsk_aio->tx_stack + xsk_aio->tx_top,
291 9 : xsk_aio->tx_stack_depth - xsk_aio->tx_top );
292 9 : xsk_aio->tx_top += tx_completed;
293 :
294 9 : return rx_avail || tx_completed;
295 9 : }
296 :
297 :
298 : void
299 6 : fd_xsk_aio_tx_complete( fd_xsk_aio_t * xsk_aio ) {
300 6 : ulong tx_completed = fd_xsk_tx_complete( xsk_aio->xsk,
301 6 : xsk_aio->tx_stack + xsk_aio->tx_top,
302 6 : xsk_aio->tx_stack_depth - xsk_aio->tx_top );
303 6 : xsk_aio->tx_top += tx_completed;
304 6 : }
305 :
306 :
307 : /* fd_xsk_aio_send is an aio callback that transmits the given batch of
308 : packets through the XSK. */
309 : static int
310 : fd_xsk_aio_send( void * ctx,
311 : fd_aio_pkt_info_t const * pkt,
312 : ulong pkt_cnt,
313 : ulong * opt_batch_idx,
314 9 : int flush ) {
315 :
316 9 : fd_xsk_aio_t * xsk_aio = (fd_xsk_aio_t*)ctx;
317 9 : fd_xsk_t * xsk = xsk_aio->xsk;
318 :
319 9 : if( FD_UNLIKELY( pkt_cnt==0UL ) ) {
320 3 : if( flush ) {
321 3 : fd_xsk_frame_meta_t meta[1] = {{0}};
322 3 : ulong sent_cnt = fd_xsk_tx_enqueue( xsk, meta, 0, 1 );
323 3 : (void)sent_cnt;
324 3 : }
325 3 : return FD_AIO_SUCCESS;
326 3 : }
327 :
328 : /* Check if any previous send operations completed
329 : to reclaim transmit frames. */
330 6 : fd_xsk_aio_tx_complete( xsk_aio );
331 :
332 : /* Refuse to send more packets than we have metadata frames */
333 6 : ulong batch_cnt = pkt_cnt; /* Number of frames to attempt to send */
334 6 : ulong const pkt_depth = xsk_aio->pkt_depth;
335 6 : if( FD_UNLIKELY( batch_cnt>pkt_depth ) )
336 0 : batch_cnt = pkt_depth;
337 :
338 : /* Find UMEM and meta params */
339 6 : uchar * frame_mem = xsk_aio->frame_mem; /* UMEM region */
340 6 : ulong frame_sz = xsk_aio->frame_sz; /* UMEM frame sz */
341 6 : fd_xsk_frame_meta_t * meta = fd_xsk_aio_meta( xsk_aio ); /* frame meta heap */
342 :
343 : /* Number of packets pending fd_xsk_tx_enqueue */
344 6 : ulong pending_cnt=0;
345 :
346 : /* XSK send prepare loop. Terminates when the largest possible tx
347 : batch has been formed. meta[0..pkt_idx] is populated with frames
348 : to be handed off to fd_xsk_tx_enqueue. */
349 6 : ulong pkt_idx;
350 30 : for( pkt_idx=0; pkt_idx<batch_cnt; ++pkt_idx ) {
351 : /* Pop a TX frame from our stack */
352 27 : if( FD_UNLIKELY( !xsk_aio->tx_top ) )
353 3 : break;
354 24 : --xsk_aio->tx_top;
355 24 : ulong offset = xsk_aio->tx_stack[xsk_aio->tx_top];
356 :
357 24 : uchar const * data = pkt[ pkt_idx ].buf;
358 24 : ulong data_sz = pkt[ pkt_idx ].buf_sz;
359 :
360 : /* MTU check */
361 24 : if( FD_UNLIKELY( data_sz>frame_sz ) ) {
362 0 : FD_LOG_WARNING(( "frame too large for xsk ring (%lu > %lu), aborting send",
363 0 : data_sz, frame_sz ));
364 0 : if( opt_batch_idx ) *opt_batch_idx = 0UL;
365 0 : return FD_AIO_ERR_INVAL;
366 0 : }
367 :
368 : /* Copy aio packet payload into TX frame */
369 24 : fd_memcpy( frame_mem + offset, data, data_sz );
370 :
371 : /* Write XSK meta */
372 24 : meta[pending_cnt] = (fd_xsk_frame_meta_t){
373 24 : .off = offset,
374 24 : .sz = (uint)data_sz,
375 24 : .flags = 0U
376 24 : };
377 24 : pending_cnt++;
378 24 : }
379 :
380 : /* Enqueue send */
381 6 : ulong sent_cnt=0UL;
382 6 : if( FD_LIKELY( pending_cnt>0UL || flush ) )
383 6 : sent_cnt = fd_xsk_tx_enqueue( xsk, meta, pending_cnt, flush );
384 :
385 6 : xsk_aio->metrics.tx_cnt += sent_cnt;
386 30 : for( ulong j=0; j<sent_cnt; j++ ) xsk_aio->metrics.tx_sz += meta[j].sz;
387 :
388 : /* Sent less than user requested? */
389 6 : if( FD_UNLIKELY( sent_cnt<pkt_cnt ) ) {
390 3 : if( FD_LIKELY( opt_batch_idx ) ) *opt_batch_idx = sent_cnt;
391 3 : return FD_AIO_ERR_AGAIN;
392 3 : }
393 :
394 3 : return FD_AIO_SUCCESS;
395 6 : }
|