Line data Source code
1 : #include "fd_tpu.h"
2 : #include "fd_tpu_reasm_private.h"
3 : #include "../../tango/dcache/fd_dcache.h"
4 : #include "../../tango/mcache/fd_mcache.h"
5 :
6 : FD_FN_CONST ulong
7 63 : fd_tpu_reasm_align( void ) {
8 63 : return alignof(fd_tpu_reasm_t);
9 63 : }
10 :
11 : FD_FN_CONST ulong
12 : fd_tpu_reasm_footprint( ulong depth,
13 27 : ulong burst ) {
14 :
15 27 : if( FD_UNLIKELY(
16 27 : ( fd_ulong_popcnt( depth )!=1 ) |
17 27 : ( depth>0x7fffffffUL ) |
18 27 : ( burst<2 ) |
19 27 : ( burst>0x7fffffffUL ) ) )
20 15 : return 0UL;
21 :
22 12 : ulong slot_cnt = depth+burst;
23 12 : ulong chain_cnt = fd_tpu_reasm_map_chain_cnt_est( slot_cnt );
24 12 : return FD_LAYOUT_FINI( FD_LAYOUT_APPEND( FD_LAYOUT_APPEND( FD_LAYOUT_APPEND( FD_LAYOUT_APPEND( FD_LAYOUT_INIT,
25 27 : fd_tpu_reasm_align(), sizeof(fd_tpu_reasm_t) ), /* hdr */
26 27 : alignof(uint), depth *sizeof(uint) ), /* pub_slots */
27 27 : alignof(fd_tpu_reasm_slot_t), slot_cnt*sizeof(fd_tpu_reasm_slot_t) ), /* slots */
28 27 : fd_tpu_reasm_map_align(), fd_tpu_reasm_map_footprint( chain_cnt ) ), /* map */
29 27 : fd_tpu_reasm_align() );
30 :
31 27 : }
32 :
33 : void *
34 : fd_tpu_reasm_new( void * shmem,
35 : ulong depth,
36 : ulong burst,
37 : ulong orig,
38 3 : void * dcache ) {
39 :
40 3 : if( FD_UNLIKELY( !shmem ) ) return NULL;
41 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, FD_TPU_REASM_ALIGN ) ) ) return NULL;
42 3 : if( FD_UNLIKELY( !fd_tpu_reasm_footprint( depth, burst ) ) ) return NULL;
43 3 : if( FD_UNLIKELY( orig > FD_FRAG_META_ORIG_MAX ) ) return NULL;
44 :
45 3 : ulong req_data_sz = fd_tpu_reasm_req_data_sz( depth, burst );
46 3 : if( FD_UNLIKELY( fd_dcache_data_sz( dcache )<req_data_sz ) ) {
47 0 : FD_LOG_WARNING(( "dcache data_sz is too small (need %lu, have %lu)", req_data_sz, fd_dcache_data_sz( dcache ) ));
48 0 : return NULL;
49 0 : }
50 :
51 : /* Memory layout */
52 :
53 3 : ulong slot_cnt = depth+burst;
54 3 : if( FD_UNLIKELY( !slot_cnt ) ) return NULL;
55 3 : ulong chain_cnt = fd_tpu_reasm_map_chain_cnt_est( slot_cnt );
56 :
57 3 : FD_SCRATCH_ALLOC_INIT( l, shmem );
58 3 : fd_tpu_reasm_t * reasm = FD_SCRATCH_ALLOC_APPEND( l, fd_tpu_reasm_align(), sizeof(fd_tpu_reasm_t) );
59 3 : ulong * pub_slots = FD_SCRATCH_ALLOC_APPEND( l, alignof(uint), depth*sizeof(uint) );
60 3 : fd_tpu_reasm_slot_t * slots = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_tpu_reasm_slot_t), slot_cnt*sizeof(fd_tpu_reasm_slot_t) );
61 3 : void * map_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_tpu_reasm_map_align(), fd_tpu_reasm_map_footprint( chain_cnt ) );
62 3 : FD_SCRATCH_ALLOC_FINI( l, fd_tpu_reasm_align() );
63 :
64 3 : fd_memset( reasm, 0, sizeof(fd_tpu_reasm_t) );
65 3 : fd_memset( slots, 0, burst*sizeof(fd_tpu_reasm_slot_t) );
66 :
67 3 : fd_tpu_reasm_map_t * map = fd_tpu_reasm_map_join( fd_tpu_reasm_map_new( map_mem, chain_cnt, 0UL ) );
68 3 : if( FD_UNLIKELY( !map ) ) {
69 0 : FD_LOG_WARNING(( "fd_tpu_reasm_map_new failed" ));
70 0 : return NULL;
71 0 : }
72 :
73 : /* Initialize reasm object */
74 :
75 3 : reasm->slots_off = (ulong)( (uchar *)slots - (uchar *)reasm );
76 3 : reasm->pub_slots_off = (ulong)( (uchar *)pub_slots - (uchar *)reasm );
77 3 : reasm->map_off = (ulong)( (uchar *)map - (uchar *)reasm );
78 3 : reasm->dcache = dcache;
79 :
80 3 : reasm->depth = (uint)depth;
81 3 : reasm->burst = (uint)burst;
82 3 : reasm->head = (uint)slot_cnt-1U;
83 3 : reasm->tail = (uint)depth;
84 3 : reasm->slot_cnt = (uint)slot_cnt;
85 3 : reasm->orig = (ushort)orig;
86 :
87 : /* Initial slot distribution */
88 :
89 3 : fd_tpu_reasm_reset( reasm );
90 :
91 3 : FD_COMPILER_MFENCE();
92 3 : reasm->magic = FD_TPU_REASM_MAGIC;
93 3 : FD_COMPILER_MFENCE();
94 :
95 3 : return reasm;
96 3 : }
97 :
98 : void
99 6 : fd_tpu_reasm_reset( fd_tpu_reasm_t * reasm ) {
100 :
101 6 : uint depth = reasm->depth;
102 6 : uint burst = reasm->burst;
103 6 : uint node_cnt = depth+burst;
104 :
105 6 : fd_tpu_reasm_slot_t * slots = fd_tpu_reasm_slots_laddr( reasm );
106 6 : uint * pub_slots = fd_tpu_reasm_pub_slots_laddr( reasm );
107 6 : fd_tpu_reasm_map_t * map = fd_tpu_reasm_map_laddr( reasm );
108 :
109 : /* The initial state moves the first 'depth' slots to the mcache (PUB)
110 : and leaves the rest as FREE. */
111 :
112 774 : for( uint j=0U; j<depth; j++ ) {
113 768 : fd_tpu_reasm_slot_t * slot = slots + j;
114 768 : slot->k.state = FD_TPU_REASM_STATE_PUB;
115 768 : slot->k.conn_uid = ULONG_MAX;
116 768 : slot->k.stream_id = 0xffffffffffff;
117 768 : slot->k.sz = 0;
118 768 : slot->chain_next = UINT_MAX;
119 768 : pub_slots[ j ] = j;
120 768 : }
121 774 : for( uint j=depth; j<node_cnt; j++ ) {
122 768 : fd_tpu_reasm_slot_t * slot = slots + j;
123 768 : slot->k.state = FD_TPU_REASM_STATE_FREE;
124 768 : slot->k.conn_uid = ULONG_MAX;
125 768 : slot->k.stream_id = 0xffffffffffff;
126 768 : slot->k.sz = 0;
127 768 : slot->lru_prev = fd_uint_if( j<node_cnt-1U, j+1U, UINT_MAX );
128 768 : slot->lru_next = fd_uint_if( j>depth, j-1U, UINT_MAX );
129 768 : slot->chain_next = UINT_MAX;
130 768 : }
131 :
132 : /* Clear the entire hash map */
133 :
134 6 : ulong chain_cnt = fd_tpu_reasm_map_chain_cnt( map );
135 6 : uint * chains = fd_tpu_reasm_map_private_chain( map );
136 774 : for( uint j=0U; j<chain_cnt; j++ ) {
137 768 : chains[ j ] = UINT_MAX;
138 768 : }
139 6 : }
140 :
141 : fd_tpu_reasm_t *
142 3 : fd_tpu_reasm_join( void * shreasm ) {
143 3 : fd_tpu_reasm_t * reasm = shreasm;
144 3 : if( FD_UNLIKELY( reasm->magic != FD_TPU_REASM_MAGIC ) ) {
145 0 : FD_LOG_WARNING(( "bad magic" ));
146 0 : return NULL;
147 0 : }
148 3 : return reasm;
149 3 : }
150 :
151 : void *
152 3 : fd_tpu_reasm_leave( fd_tpu_reasm_t * reasm ) {
153 3 : return reasm;
154 3 : }
155 :
156 : void *
157 3 : fd_tpu_reasm_delete( void * shreasm ) {
158 3 : fd_tpu_reasm_t * reasm = shreasm;
159 3 : if( FD_UNLIKELY( !reasm ) ) return NULL;
160 3 : reasm->magic = 0UL;
161 3 : return shreasm;
162 3 : }
163 :
164 : fd_tpu_reasm_slot_t *
165 : fd_tpu_reasm_query( fd_tpu_reasm_t * reasm,
166 : ulong conn_uid,
167 76509 : ulong stream_id ) {
168 76509 : return smap_query( reasm, conn_uid, stream_id );
169 76509 : }
170 :
171 : fd_tpu_reasm_slot_t *
172 : fd_tpu_reasm_prepare( fd_tpu_reasm_t * reasm,
173 : ulong conn_uid,
174 : ulong stream_id,
175 76125 : long tsorig ) {
176 76125 : fd_tpu_reasm_slot_t * slot = slotq_pop_tail( reasm );
177 76125 : smap_remove( reasm, slot );
178 76125 : slot_begin( slot );
179 76125 : slotq_push_head( reasm, slot );
180 76125 : slot->k.conn_uid = conn_uid;
181 76125 : slot->k.stream_id = stream_id & FD_TPU_REASM_SID_MASK;
182 76125 : smap_insert( reasm, slot );
183 76125 : slot->tsorig_comp = (uint)fd_frag_meta_ts_comp( tsorig );
184 76125 : return slot;
185 76125 : }
186 :
187 : int
188 : fd_tpu_reasm_frag( fd_tpu_reasm_t * reasm,
189 : fd_tpu_reasm_slot_t * slot,
190 : uchar const * data,
191 : ulong data_sz,
192 65841 : ulong data_off ) {
193 :
194 65841 : if( FD_UNLIKELY( slot->k.state != FD_TPU_REASM_STATE_BUSY ) )
195 0 : return FD_TPU_REASM_ERR_STATE;
196 :
197 65841 : ulong slot_idx = slot_get_idx( reasm, slot );
198 65841 : ulong mtu = FD_TPU_REASM_MTU;
199 65841 : ulong sz0 = slot->k.sz;
200 :
201 65841 : if( FD_UNLIKELY( data_off>sz0 ) ) {
202 0 : fd_tpu_reasm_cancel( reasm, slot );
203 0 : return FD_TPU_REASM_ERR_SKIP;
204 0 : }
205 :
206 65841 : if( FD_UNLIKELY( data_off<sz0 ) ) {
207 : /* Fragment partially known ... should not happen */
208 0 : ulong skip = sz0 - data_off;
209 0 : if( skip>data_sz ) return FD_TPU_REASM_SUCCESS;
210 0 : data_off += skip;
211 0 : data_sz -= skip;
212 0 : data += skip;
213 0 : }
214 :
215 65841 : ulong sz1 = sz0 + data_sz;
216 65841 : if( FD_UNLIKELY( (sz1<sz0)|(sz1>mtu) ) ) {
217 0 : fd_tpu_reasm_cancel( reasm, slot );
218 0 : return FD_TPU_REASM_ERR_SZ;
219 0 : }
220 :
221 65841 : uchar * msg = slot_get_data_pkt_payload( reasm, slot_idx );
222 65841 : fd_memcpy( msg+sz0, data, data_sz );
223 :
224 65841 : slot->k.sz = (ushort)( sz1 & FD_TPU_REASM_SZ_MASK );
225 65841 : return FD_TPU_REASM_SUCCESS;
226 65841 : }
227 :
228 : int
229 : fd_tpu_reasm_publish( fd_tpu_reasm_t * reasm,
230 : fd_tpu_reasm_slot_t * slot,
231 : fd_frag_meta_t * mcache,
232 : void * base, /* Assumed aligned FD_CHUNK_ALIGN */
233 : ulong seq,
234 : long tspub,
235 : uint source_ipv4,
236 65841 : uchar source_tpu ) {
237 :
238 65841 : ulong depth = reasm->depth;
239 :
240 65841 : if( FD_UNLIKELY( slot->k.state != FD_TPU_REASM_STATE_BUSY ) )
241 0 : return FD_TPU_REASM_ERR_STATE;
242 :
243 : /* Derive chunk index */
244 65841 : uint slot_idx = slot_get_idx( reasm, slot );
245 65841 : uchar * buf = slot_get_data( reasm, slot_idx );
246 65841 : ulong chunk = fd_laddr_to_chunk( base, buf );
247 65841 : if( FD_UNLIKELY( ( (ulong)buf<(ulong)base ) |
248 65841 : ( chunk>UINT_MAX ) ) ) {
249 0 : FD_LOG_CRIT(( "invalid base %p for slot %p in tpu_reasm %p",
250 0 : base, (void *)slot, (void *)reasm ));
251 0 : }
252 :
253 : /* Find least recently published slot. This is our "freed slot".
254 : (Every time a new slot is published, another slot is simultaneously
255 : freed) */
256 65841 : uint * pub_slot = fd_tpu_reasm_pub_slots_laddr( reasm ) + fd_mcache_line_idx( seq, depth );
257 65841 : uint freed_slot_idx = *pub_slot;
258 65841 : if( FD_UNLIKELY( freed_slot_idx >= reasm->slot_cnt ) ) {
259 : /* mcache corruption */
260 0 : FD_LOG_WARNING(( "mcache corruption detected! tpu_reasm slot %u out of bounds (max %u)",
261 0 : freed_slot_idx, reasm->slot_cnt ));
262 0 : fd_tpu_reasm_reset( reasm );
263 0 : return FD_TPU_REASM_ERR_STATE;
264 0 : }
265 :
266 : /* Publish to mcache */
267 65841 : ulong sz = slot->k.sz;
268 65841 : ulong ctl = fd_frag_meta_ctl( reasm->orig, 1, 1, 0 );
269 65841 : ulong tsorig_comp = slot->tsorig_comp;
270 65841 : ulong tspub_comp = fd_frag_meta_ts_comp( tspub );
271 :
272 65841 : fd_txn_m_t * txnm = (fd_txn_m_t *)buf;
273 65841 : *txnm = (fd_txn_m_t) { 0UL };
274 65841 : txnm->payload_sz = (ushort)sz;
275 65841 : txnm->source_ipv4 = source_ipv4;
276 65841 : txnm->source_tpu = source_tpu;
277 :
278 65841 : # if FD_HAS_AVX
279 65841 : fd_mcache_publish_avx( mcache, depth, seq, 0UL, chunk, fd_txn_m_realized_footprint( txnm, 0, 0 ), ctl, tsorig_comp, tspub_comp );
280 : # elif FD_HAS_SSE
281 : fd_mcache_publish_sse( mcache, depth, seq, 0UL, chunk, fd_txn_m_realized_footprint( txnm, 0, 0 ), ctl, tsorig_comp, tspub_comp );
282 : # else
283 : fd_mcache_publish ( mcache, depth, seq, 0UL, chunk, fd_txn_m_realized_footprint( txnm, 0, 0 ), ctl, tsorig_comp, tspub_comp );
284 : # endif
285 :
286 : /* Mark new slot as published */
287 65841 : slotq_remove( reasm, slot );
288 65841 : slot->k.state = FD_TPU_REASM_STATE_PUB;
289 65841 : *pub_slot = slot_idx;
290 :
291 : /* Free oldest published slot */
292 65841 : fd_tpu_reasm_slot_t * free_slot = fd_tpu_reasm_slots_laddr( reasm ) + freed_slot_idx;
293 65841 : uint free_slot_state = free_slot->k.state;
294 65841 : if( FD_UNLIKELY( free_slot_state != FD_TPU_REASM_STATE_PUB ) ) {
295 : /* mcache/slots out of sync (memory leak) */
296 0 : FD_LOG_WARNING(( "mcache corruption detected! tpu_reasm seq %lu owns slot %u, but it's state is %u",
297 0 : seq, freed_slot_idx, free_slot_state ));
298 0 : fd_tpu_reasm_reset( reasm );
299 0 : return FD_TPU_REASM_ERR_STATE;
300 0 : }
301 65841 : free_slot->k.state = FD_TPU_REASM_STATE_FREE;
302 65841 : slotq_push_tail( reasm, free_slot );
303 :
304 65841 : return FD_TPU_REASM_SUCCESS;
305 65841 : }
306 :
307 : void
308 : fd_tpu_reasm_cancel( fd_tpu_reasm_t * reasm,
309 9348 : fd_tpu_reasm_slot_t * slot ) {
310 9348 : if( FD_UNLIKELY( slot->k.state != FD_TPU_REASM_STATE_BUSY ) ) return;
311 9348 : slotq_remove( reasm, slot );
312 9348 : smap_remove( reasm, slot );
313 9348 : slot->k.state = FD_TPU_REASM_STATE_FREE;
314 9348 : slot->k.conn_uid = ULONG_MAX;
315 9348 : slot->k.stream_id = 0UL;
316 9348 : slotq_push_tail( reasm, slot );
317 9348 : }
318 :
319 : int
320 : fd_tpu_reasm_publish_fast( fd_tpu_reasm_t * reasm,
321 : uchar const * data,
322 : ulong sz,
323 : fd_frag_meta_t * mcache,
324 : void * base, /* Assumed aligned FD_CHUNK_ALIGN */
325 : ulong seq,
326 : long tspub,
327 : uint source_ipv4,
328 0 : uchar source_tpu ) {
329 :
330 0 : ulong depth = reasm->depth;
331 0 : if( FD_UNLIKELY( sz>FD_TPU_REASM_MTU ) ) return FD_TPU_REASM_ERR_SZ;
332 :
333 : /* Acquire least recent slot. This is our "new slot" */
334 0 : fd_tpu_reasm_slot_t * slot = slotq_pop_tail( reasm );
335 0 : smap_remove( reasm, slot );
336 0 : slot_begin( slot );
337 :
338 : /* Derive buffer address of new slot */
339 0 : uint slot_idx = slot_get_idx( reasm, slot );
340 0 : uchar * buf = slot_get_data( reasm, slot_idx );
341 0 : ulong chunk = fd_laddr_to_chunk( base, buf );
342 0 : if( FD_UNLIKELY( ( (ulong)buf<(ulong)base ) |
343 0 : ( chunk>UINT_MAX ) ) ) {
344 0 : FD_LOG_ERR(( "Computed invalid chunk index (base=%p buf=%p chunk=%lx)",
345 0 : base, (void *)buf, chunk ));
346 0 : }
347 :
348 : /* Find least recently published slot. This is our "freed slot".
349 : (Every time a new slot is published, another slot is simultaneously
350 : freed) */
351 0 : uint * pub_slot = fd_tpu_reasm_pub_slots_laddr( reasm ) + fd_mcache_line_idx( seq, depth );
352 0 : uint freed_slot_idx = *pub_slot;
353 0 : if( FD_UNLIKELY( freed_slot_idx >= reasm->slot_cnt ) ) {
354 : /* mcache corruption */
355 0 : FD_LOG_WARNING(( "mcache corruption detected! tpu_reasm slot %u out of bounds (max %u)",
356 0 : freed_slot_idx, reasm->slot_cnt ));
357 0 : fd_tpu_reasm_reset( reasm );
358 0 : return FD_TPU_REASM_ERR_STATE;
359 0 : }
360 :
361 : /* Copy data into new slot */
362 0 : FD_COMPILER_MFENCE();
363 0 : slot->k.sz = sz & FD_TPU_REASM_SZ_MASK;
364 0 : fd_txn_m_t * txnm = (fd_txn_m_t *)buf;
365 0 : *txnm = (fd_txn_m_t) { 0UL };
366 0 : txnm->payload_sz = (ushort)slot->k.sz,
367 0 : txnm->source_ipv4 = source_ipv4;
368 0 : txnm->source_tpu = source_tpu;
369 0 : fd_memcpy( buf + sizeof(fd_txn_m_t), data, sz );
370 0 : FD_COMPILER_MFENCE();
371 0 : slot->k.state = FD_TPU_REASM_STATE_PUB;
372 0 : FD_COMPILER_MFENCE();
373 :
374 : /* Publish new slot, while simultaneously removing all references to
375 : the old slot */
376 0 : *pub_slot = slot_idx;
377 0 : ulong ctl = fd_frag_meta_ctl( reasm->orig, 1, 1, 0 );
378 0 : uint tsorig_comp = slot->tsorig_comp;
379 0 : uint tspub_comp = (uint)fd_frag_meta_ts_comp( tspub );
380 0 : # if FD_HAS_AVX
381 0 : fd_mcache_publish_avx( mcache, depth, seq, 0UL, chunk, fd_txn_m_realized_footprint( txnm, 0, 0 ), ctl, tsorig_comp, tspub_comp );
382 : # elif FD_HAS_SSE
383 : fd_mcache_publish_sse( mcache, depth, seq, 0UL, chunk, fd_txn_m_realized_footprint( txnm, 0, 0 ), ctl, tsorig_comp, tspub_comp );
384 : # else
385 : fd_mcache_publish ( mcache, depth, seq, 0UL, chunk, fd_txn_m_realized_footprint( txnm, 0, 0 ), ctl, tsorig_comp, tspub_comp );
386 : # endif
387 :
388 : /* Free old slot */
389 0 : fd_tpu_reasm_slot_t * free_slot = fd_tpu_reasm_slots_laddr( reasm ) + freed_slot_idx;
390 0 : uint free_slot_state = free_slot->k.state;
391 0 : if( FD_UNLIKELY( free_slot_state != FD_TPU_REASM_STATE_PUB ) ) {
392 : /* mcache/slots out of sync (memory leak) */
393 0 : FD_LOG_WARNING(( "mcache corruption detected! tpu_reasm seq %lu owns slot %u, but it's state is %u",
394 0 : seq, freed_slot_idx, free_slot_state ));
395 0 : fd_tpu_reasm_reset( reasm );
396 0 : return FD_TPU_REASM_ERR_STATE;
397 0 : }
398 0 : free_slot->k.state = FD_TPU_REASM_STATE_FREE;
399 0 : slotq_push_tail( reasm, free_slot );
400 0 : return FD_TPU_REASM_SUCCESS;
401 0 : }
|