Line data Source code
1 : #include "fd_tpu.h"
2 : #include "fd_tpu_reasm_private.h"
3 :
4 : FD_FN_CONST ulong
5 6 : fd_tpu_reasm_align( void ) {
6 6 : return alignof(fd_tpu_reasm_t);
7 6 : }
8 :
9 : FD_FN_CONST ulong
10 : fd_tpu_reasm_footprint( ulong depth,
11 24 : ulong burst ) {
12 :
13 24 : if( FD_UNLIKELY(
14 24 : ( fd_ulong_popcnt( depth )!=1 ) |
15 24 : ( depth>0x7fffffffUL ) |
16 24 : ( burst<2 ) |
17 24 : ( burst>0x7fffffffUL ) ) )
18 15 : return 0UL;
19 :
20 9 : ulong slot_cnt = depth+burst;
21 9 : ulong chain_cnt = fd_tpu_reasm_map_chain_cnt_est( slot_cnt );
22 9 : return FD_LAYOUT_FINI( FD_LAYOUT_APPEND( FD_LAYOUT_APPEND( FD_LAYOUT_APPEND( FD_LAYOUT_APPEND( FD_LAYOUT_INIT,
23 24 : fd_tpu_reasm_align(), sizeof(fd_tpu_reasm_t) ), /* hdr */
24 24 : alignof(uint), depth *sizeof(uint) ), /* pub_slots */
25 24 : alignof(fd_tpu_reasm_slot_t), slot_cnt*sizeof(fd_tpu_reasm_slot_t) ), /* slots */
26 24 : fd_tpu_reasm_map_align(), fd_tpu_reasm_map_footprint( chain_cnt ) ), /* map */
27 24 : fd_tpu_reasm_align() );
28 :
29 24 : }
30 :
31 : void *
32 : fd_tpu_reasm_new( void * shmem,
33 : ulong depth,
34 : ulong burst,
35 : ulong orig,
36 3 : void * dcache ) {
37 :
38 3 : if( FD_UNLIKELY( !shmem ) ) return NULL;
39 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, FD_TPU_REASM_ALIGN ) ) ) return NULL;
40 3 : if( FD_UNLIKELY( !fd_tpu_reasm_footprint( depth, burst ) ) ) return NULL;
41 3 : if( FD_UNLIKELY( orig > FD_FRAG_META_ORIG_MAX ) ) return NULL;
42 :
43 3 : ulong req_data_sz = fd_tpu_reasm_req_data_sz( depth, burst );
44 3 : if( FD_UNLIKELY( fd_dcache_data_sz( dcache )<req_data_sz ) ) {
45 0 : FD_LOG_WARNING(( "dcache data_sz is too small (need %lu, have %lu)", req_data_sz, fd_dcache_data_sz( dcache ) ));
46 0 : return NULL;
47 0 : }
48 :
49 : /* Memory layout */
50 :
51 3 : ulong slot_cnt = depth+burst;
52 3 : if( FD_UNLIKELY( !slot_cnt ) ) return NULL;
53 3 : ulong chain_cnt = fd_tpu_reasm_map_chain_cnt_est( slot_cnt );
54 :
55 3 : FD_SCRATCH_ALLOC_INIT( l, shmem );
56 3 : fd_tpu_reasm_t * reasm = FD_SCRATCH_ALLOC_APPEND( l, fd_tpu_reasm_align(), sizeof(fd_tpu_reasm_t) );
57 3 : ulong * pub_slots = FD_SCRATCH_ALLOC_APPEND( l, alignof(uint), depth*sizeof(uint) );
58 3 : fd_tpu_reasm_slot_t * slots = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_tpu_reasm_slot_t), slot_cnt*sizeof(fd_tpu_reasm_slot_t) );
59 3 : void * map_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_tpu_reasm_map_align(), fd_tpu_reasm_map_footprint( chain_cnt ) );
60 3 : FD_SCRATCH_ALLOC_FINI( l, fd_tpu_reasm_align() );
61 :
62 3 : fd_memset( reasm, 0, sizeof(fd_tpu_reasm_t) );
63 3 : fd_memset( slots, 0, burst*sizeof(fd_tpu_reasm_slot_t) );
64 :
65 3 : fd_tpu_reasm_map_t * map = fd_tpu_reasm_map_join( fd_tpu_reasm_map_new( map_mem, chain_cnt, 0UL ) );
66 3 : if( FD_UNLIKELY( !map ) ) {
67 0 : FD_LOG_WARNING(( "fd_tpu_reasm_map_new failed" ));
68 0 : return NULL;
69 0 : }
70 :
71 : /* Initialize reasm object */
72 :
73 3 : reasm->slots_off = (ulong)( (uchar *)slots - (uchar *)reasm );
74 3 : reasm->pub_slots_off = (ulong)( (uchar *)pub_slots - (uchar *)reasm );
75 3 : reasm->map_off = (ulong)( (uchar *)map - (uchar *)reasm );
76 3 : reasm->dcache = dcache;
77 :
78 3 : reasm->depth = (uint)depth;
79 3 : reasm->burst = (uint)burst;
80 3 : reasm->head = (uint)slot_cnt-1U;
81 3 : reasm->tail = (uint)depth;
82 3 : reasm->slot_cnt = (uint)slot_cnt;
83 3 : reasm->orig = (ushort)orig;
84 :
85 : /* Initial slot distribution */
86 :
87 3 : fd_tpu_reasm_reset( reasm );
88 :
89 3 : FD_COMPILER_MFENCE();
90 3 : reasm->magic = FD_TPU_REASM_MAGIC;
91 3 : FD_COMPILER_MFENCE();
92 :
93 3 : return reasm;
94 3 : }
95 :
96 : void
97 6 : fd_tpu_reasm_reset( fd_tpu_reasm_t * reasm ) {
98 :
99 6 : uint depth = reasm->depth;
100 6 : uint burst = reasm->burst;
101 6 : uint node_cnt = depth+burst;
102 :
103 6 : fd_tpu_reasm_slot_t * slots = fd_tpu_reasm_slots_laddr( reasm );
104 6 : uint * pub_slots = fd_tpu_reasm_pub_slots_laddr( reasm );
105 6 : fd_tpu_reasm_map_t * map = fd_tpu_reasm_map_laddr( reasm );
106 :
107 : /* The initial state moves the first 'depth' slots to the mcache (PUB)
108 : and leaves the rest as FREE. */
109 :
110 774 : for( uint j=0U; j<depth; j++ ) {
111 768 : fd_tpu_reasm_slot_t * slot = slots + j;
112 768 : slot->k.state = FD_TPU_REASM_STATE_PUB;
113 768 : slot->k.conn_uid = ULONG_MAX;
114 768 : slot->k.stream_id = 0xffffffffffff;
115 768 : slot->k.sz = 0;
116 768 : slot->chain_next = UINT_MAX;
117 768 : pub_slots[ j ] = j;
118 768 : }
119 774 : for( uint j=depth; j<node_cnt; j++ ) {
120 768 : fd_tpu_reasm_slot_t * slot = slots + j;
121 768 : slot->k.state = FD_TPU_REASM_STATE_FREE;
122 768 : slot->k.conn_uid = ULONG_MAX;
123 768 : slot->k.stream_id = 0xffffffffffff;
124 768 : slot->k.sz = 0;
125 768 : slot->lru_prev = fd_uint_if( j<node_cnt-1U, j+1U, UINT_MAX );
126 768 : slot->lru_next = fd_uint_if( j>depth, j-1U, UINT_MAX );
127 768 : slot->chain_next = UINT_MAX;
128 768 : }
129 :
130 : /* Clear the entire hash map */
131 :
132 6 : ulong chain_cnt = fd_tpu_reasm_map_chain_cnt( map );
133 6 : uint * chains = fd_tpu_reasm_map_private_chain( map );
134 774 : for( uint j=0U; j<chain_cnt; j++ ) {
135 768 : chains[ j ] = UINT_MAX;
136 768 : }
137 6 : }
138 :
139 : fd_tpu_reasm_t *
140 3 : fd_tpu_reasm_join( void * shreasm ) {
141 3 : fd_tpu_reasm_t * reasm = shreasm;
142 3 : if( FD_UNLIKELY( reasm->magic != FD_TPU_REASM_MAGIC ) ) {
143 0 : FD_LOG_WARNING(( "bad magic" ));
144 0 : return NULL;
145 0 : }
146 3 : return reasm;
147 3 : }
148 :
149 : void *
150 3 : fd_tpu_reasm_leave( fd_tpu_reasm_t * reasm ) {
151 3 : return reasm;
152 3 : }
153 :
154 : void *
155 3 : fd_tpu_reasm_delete( void * shreasm ) {
156 3 : fd_tpu_reasm_t * reasm = shreasm;
157 3 : if( FD_UNLIKELY( !reasm ) ) return NULL;
158 3 : reasm->magic = 0UL;
159 3 : return shreasm;
160 3 : }
161 :
162 : fd_tpu_reasm_slot_t *
163 : fd_tpu_reasm_query( fd_tpu_reasm_t * reasm,
164 : ulong conn_uid,
165 76509 : ulong stream_id ) {
166 76509 : return smap_query( reasm, conn_uid, stream_id );
167 76509 : }
168 :
169 : fd_tpu_reasm_slot_t *
170 : fd_tpu_reasm_prepare( fd_tpu_reasm_t * reasm,
171 : ulong conn_uid,
172 : ulong stream_id,
173 76125 : long tsorig ) {
174 76125 : fd_tpu_reasm_slot_t * slot = slotq_pop_tail( reasm );
175 76125 : smap_remove( reasm, slot );
176 76125 : slot_begin( slot );
177 76125 : slotq_push_head( reasm, slot );
178 76125 : slot->k.conn_uid = conn_uid;
179 76125 : slot->k.stream_id = stream_id & FD_TPU_REASM_SID_MASK;
180 76125 : smap_insert( reasm, slot );
181 76125 : slot->tsorig_comp = (uint)fd_frag_meta_ts_comp( tsorig );
182 76125 : return slot;
183 76125 : }
184 :
185 : int
186 : fd_tpu_reasm_frag( fd_tpu_reasm_t * reasm,
187 : fd_tpu_reasm_slot_t * slot,
188 : uchar const * data,
189 : ulong data_sz,
190 65841 : ulong data_off ) {
191 :
192 65841 : if( FD_UNLIKELY( slot->k.state != FD_TPU_REASM_STATE_BUSY ) )
193 0 : return FD_TPU_REASM_ERR_STATE;
194 :
195 65841 : ulong slot_idx = slot_get_idx( reasm, slot );
196 65841 : ulong mtu = FD_TPU_REASM_MTU;
197 65841 : ulong sz0 = slot->k.sz;
198 :
199 65841 : if( FD_UNLIKELY( data_off>sz0 ) ) {
200 0 : fd_tpu_reasm_cancel( reasm, slot );
201 0 : return FD_TPU_REASM_ERR_SKIP;
202 0 : }
203 :
204 65841 : if( FD_UNLIKELY( data_off<sz0 ) ) {
205 : /* Fragment partially known ... should not happen */
206 0 : ulong skip = sz0 - data_off;
207 0 : if( skip>data_sz ) return FD_TPU_REASM_SUCCESS;
208 0 : data_off += skip;
209 0 : data_sz -= skip;
210 0 : data += skip;
211 0 : }
212 :
213 65841 : ulong sz1 = sz0 + data_sz;
214 65841 : if( FD_UNLIKELY( (sz1<sz0)|(sz1>mtu) ) ) {
215 0 : fd_tpu_reasm_cancel( reasm, slot );
216 0 : return FD_TPU_REASM_ERR_SZ;
217 0 : }
218 :
219 65841 : uchar * msg = slot_get_data( reasm, slot_idx );
220 65841 : fd_memcpy( msg+sz0, data, data_sz );
221 :
222 65841 : slot->k.sz = (ushort)( sz1 & FD_TPU_REASM_SZ_MASK );
223 65841 : return FD_TPU_REASM_SUCCESS;
224 65841 : }
225 :
226 : int
227 : fd_tpu_reasm_publish( fd_tpu_reasm_t * reasm,
228 : fd_tpu_reasm_slot_t * slot,
229 : fd_frag_meta_t * mcache,
230 : void * base, /* Assumed aligned FD_CHUNK_ALIGN */
231 : ulong seq,
232 65841 : long tspub ) {
233 :
234 65841 : ulong depth = reasm->depth;
235 :
236 65841 : if( FD_UNLIKELY( slot->k.state != FD_TPU_REASM_STATE_BUSY ) )
237 0 : return FD_TPU_REASM_ERR_STATE;
238 :
239 : /* Derive chunk index */
240 65841 : uint slot_idx = slot_get_idx( reasm, slot );
241 65841 : uchar * data = slot_get_data( reasm, slot_idx );
242 65841 : ulong chunk = fd_laddr_to_chunk( base, data );
243 65841 : if( FD_UNLIKELY( ( (ulong)data<(ulong)base ) |
244 65841 : ( chunk>UINT_MAX ) ) ) {
245 0 : FD_LOG_CRIT(( "invalid base %p for slot %p in tpu_reasm %p",
246 0 : base, (void *)slot, (void *)reasm ));
247 0 : }
248 :
249 : /* Find least recently published slot. This is our "freed slot".
250 : (Every time a new slot is published, another slot is simultaneously
251 : freed) */
252 65841 : uint * pub_slot = fd_tpu_reasm_pub_slots_laddr( reasm ) + fd_mcache_line_idx( seq, depth );
253 65841 : uint freed_slot_idx = *pub_slot;
254 65841 : if( FD_UNLIKELY( freed_slot_idx >= reasm->slot_cnt ) ) {
255 : /* mcache corruption */
256 0 : FD_LOG_WARNING(( "mcache corruption detected! tpu_reasm slot %u out of bounds (max %u)",
257 0 : freed_slot_idx, reasm->slot_cnt ));
258 0 : fd_tpu_reasm_reset( reasm );
259 0 : return FD_TPU_REASM_ERR_STATE;
260 0 : }
261 :
262 : /* Publish to mcache */
263 65841 : ulong sz = slot->k.sz;
264 65841 : ulong ctl = fd_frag_meta_ctl( reasm->orig, 1, 1, 0 );
265 65841 : ulong tsorig_comp = slot->tsorig_comp;
266 65841 : ulong tspub_comp = fd_frag_meta_ts_comp( tspub );
267 :
268 65841 : # if FD_HAS_AVX
269 65841 : fd_mcache_publish_avx( mcache, depth, seq, 0UL, chunk, sz, ctl, tsorig_comp, tspub_comp );
270 : # elif FD_HAS_SSE
271 : fd_mcache_publish_sse( mcache, depth, seq, 0UL, chunk, sz, ctl, tsorig_comp, tspub_comp );
272 : # else
273 : fd_mcache_publish ( mcache, depth, seq, 0UL, chunk, sz, ctl, tsorig_comp, tspub_comp );
274 : # endif
275 :
276 : /* Mark new slot as published */
277 65841 : slotq_remove( reasm, slot );
278 65841 : slot->k.state = FD_TPU_REASM_STATE_PUB;
279 65841 : *pub_slot = slot_idx;
280 :
281 : /* Free oldest published slot */
282 65841 : fd_tpu_reasm_slot_t * free_slot = fd_tpu_reasm_slots_laddr( reasm ) + freed_slot_idx;
283 65841 : uint free_slot_state = free_slot->k.state;
284 65841 : if( FD_UNLIKELY( free_slot_state != FD_TPU_REASM_STATE_PUB ) ) {
285 : /* mcache/slots out of sync (memory leak) */
286 0 : FD_LOG_WARNING(( "mcache corruption detected! tpu_reasm seq %lu owns slot %u, but it's state is %u",
287 0 : seq, freed_slot_idx, free_slot_state ));
288 0 : fd_tpu_reasm_reset( reasm );
289 0 : return FD_TPU_REASM_ERR_STATE;
290 0 : }
291 65841 : free_slot->k.state = FD_TPU_REASM_STATE_FREE;
292 65841 : slotq_push_tail( reasm, free_slot );
293 :
294 65841 : return FD_TPU_REASM_SUCCESS;
295 65841 : }
296 :
297 : void
298 : fd_tpu_reasm_cancel( fd_tpu_reasm_t * reasm,
299 9348 : fd_tpu_reasm_slot_t * slot ) {
300 9348 : if( FD_UNLIKELY( slot->k.state != FD_TPU_REASM_STATE_BUSY ) ) return;
301 9348 : slotq_remove( reasm, slot );
302 9348 : smap_remove( reasm, slot );
303 9348 : slot->k.state = FD_TPU_REASM_STATE_FREE;
304 9348 : slot->k.conn_uid = ULONG_MAX;
305 9348 : slot->k.stream_id = 0UL;
306 9348 : slotq_push_tail( reasm, slot );
307 9348 : }
308 :
309 : int
310 : fd_tpu_reasm_publish_fast( fd_tpu_reasm_t * reasm,
311 : uchar const * data,
312 : ulong sz,
313 : fd_frag_meta_t * mcache,
314 : void * base, /* Assumed aligned FD_CHUNK_ALIGN */
315 : ulong seq,
316 0 : long tspub ) {
317 :
318 0 : ulong depth = reasm->depth;
319 0 : if( FD_UNLIKELY( sz>FD_TPU_REASM_MTU ) ) return FD_TPU_REASM_ERR_SZ;
320 :
321 : /* Acquire least recent slot. This is our "new slot" */
322 0 : fd_tpu_reasm_slot_t * slot = slotq_pop_tail( reasm );
323 0 : smap_remove( reasm, slot );
324 0 : slot_begin( slot );
325 :
326 : /* Derive buffer address of new slot */
327 0 : uint slot_idx = slot_get_idx( reasm, slot );
328 0 : uchar * buf = slot_get_data( reasm, slot_idx );
329 0 : ulong chunk = fd_laddr_to_chunk( base, buf );
330 0 : if( FD_UNLIKELY( ( (ulong)buf<(ulong)base ) |
331 0 : ( chunk>UINT_MAX ) ) ) {
332 0 : FD_LOG_ERR(( "Computed invalid chunk index (base=%p buf=%p chunk=%lx)",
333 0 : base, (void *)buf, chunk ));
334 0 : }
335 :
336 : /* Find least recently published slot. This is our "freed slot".
337 : (Every time a new slot is published, another slot is simultaneously
338 : freed) */
339 0 : uint * pub_slot = fd_tpu_reasm_pub_slots_laddr( reasm ) + fd_mcache_line_idx( seq, depth );
340 0 : uint freed_slot_idx = *pub_slot;
341 0 : if( FD_UNLIKELY( freed_slot_idx >= reasm->slot_cnt ) ) {
342 : /* mcache corruption */
343 0 : FD_LOG_WARNING(( "mcache corruption detected! tpu_reasm slot %u out of bounds (max %u)",
344 0 : freed_slot_idx, reasm->slot_cnt ));
345 0 : fd_tpu_reasm_reset( reasm );
346 0 : return FD_TPU_REASM_ERR_STATE;
347 0 : }
348 :
349 : /* Copy data into new slot */
350 0 : FD_COMPILER_MFENCE();
351 0 : slot->k.sz = sz & FD_TPU_REASM_SZ_MASK;
352 0 : fd_memcpy( buf, data, sz );
353 0 : FD_COMPILER_MFENCE();
354 0 : slot->k.state = FD_TPU_REASM_STATE_PUB;
355 0 : FD_COMPILER_MFENCE();
356 :
357 : /* Publish new slot, while simultaneously removing all references to
358 : the old slot */
359 0 : *pub_slot = slot_idx;
360 0 : ulong ctl = fd_frag_meta_ctl( reasm->orig, 1, 1, 0 );
361 0 : uint tsorig_comp = slot->tsorig_comp;
362 0 : uint tspub_comp = (uint)fd_frag_meta_ts_comp( tspub );
363 0 : # if FD_HAS_AVX
364 0 : fd_mcache_publish_avx( mcache, depth, seq, 0UL, chunk, sz, ctl, tsorig_comp, tspub_comp );
365 : # elif FD_HAS_SSE
366 : fd_mcache_publish_sse( mcache, depth, seq, 0UL, chunk, sz, ctl, tsorig_comp, tspub_comp );
367 : # else
368 : fd_mcache_publish ( mcache, depth, seq, 0UL, chunk, sz, ctl, tsorig_comp, tspub_comp );
369 : # endif
370 :
371 : /* Free old slot */
372 0 : fd_tpu_reasm_slot_t * free_slot = fd_tpu_reasm_slots_laddr( reasm ) + freed_slot_idx;
373 0 : uint free_slot_state = free_slot->k.state;
374 0 : if( FD_UNLIKELY( free_slot_state != FD_TPU_REASM_STATE_PUB ) ) {
375 : /* mcache/slots out of sync (memory leak) */
376 0 : FD_LOG_WARNING(( "mcache corruption detected! tpu_reasm seq %lu owns slot %u, but it's state is %u",
377 0 : seq, freed_slot_idx, free_slot_state ));
378 0 : fd_tpu_reasm_reset( reasm );
379 0 : return FD_TPU_REASM_ERR_STATE;
380 0 : }
381 0 : free_slot->k.state = FD_TPU_REASM_STATE_FREE;
382 0 : slotq_push_tail( reasm, free_slot );
383 0 : return FD_TPU_REASM_SUCCESS;
384 0 : }
|