Line data Source code
1 : #include "fd_tpu_reasm_private.h"
2 :
3 : FD_FN_CONST ulong
4 6 : fd_tpu_reasm_align( void ) {
5 6 : return alignof(fd_tpu_reasm_t);
6 6 : }
7 :
8 : FD_FN_CONST ulong
9 : fd_tpu_reasm_footprint( ulong depth,
10 21 : ulong burst ) {
11 :
12 21 : if( FD_UNLIKELY(
13 21 : ( fd_ulong_popcnt( depth )!=1 ) |
14 21 : ( depth>0x7fffffffUL ) |
15 21 : ( burst<2 ) |
16 21 : ( burst>0x7fffffffUL ) ) )
17 15 : return 0UL;
18 :
19 6 : return FD_TPU_REASM_FOOTPRINT( depth, burst );
20 21 : }
21 :
22 : void *
23 : fd_tpu_reasm_new( void * shmem,
24 : ulong depth,
25 : ulong burst,
26 3 : ulong orig ) {
27 :
28 3 : if( FD_UNLIKELY( !shmem ) ) return NULL;
29 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, FD_TPU_REASM_ALIGN ) ) ) return NULL;
30 3 : if( FD_UNLIKELY( !fd_tpu_reasm_footprint( depth, burst ) ) ) return NULL;
31 3 : if( FD_UNLIKELY( orig > FD_FRAG_META_ORIG_MAX ) ) return NULL;
32 :
33 : /* Memory layout */
34 :
35 3 : ulong slot_cnt = depth+burst;
36 3 : if( FD_UNLIKELY( !slot_cnt ) ) return NULL;
37 :
38 3 : FD_SCRATCH_ALLOC_INIT( l, shmem );
39 3 : fd_tpu_reasm_t * reasm = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_tpu_reasm_t), sizeof(fd_tpu_reasm_t) );
40 3 : ulong * pub_slots = FD_SCRATCH_ALLOC_APPEND( l, alignof(uint), depth*sizeof(uint) );
41 3 : fd_tpu_reasm_slot_t * slots = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_tpu_reasm_slot_t), slot_cnt*sizeof(fd_tpu_reasm_slot_t) );
42 3 : uchar * chunks = FD_SCRATCH_ALLOC_APPEND( l, FD_CHUNK_ALIGN, slot_cnt*FD_TPU_REASM_MTU );
43 3 : FD_SCRATCH_ALLOC_FINI( l, alignof(fd_tpu_reasm_t) );
44 :
45 3 : fd_memset( reasm, 0, sizeof(fd_tpu_reasm_t) );
46 3 : fd_memset( slots, 0, burst*sizeof(fd_tpu_reasm_slot_t) );
47 :
48 : /* Initialize reasm object */
49 :
50 3 : reasm->slots_off = (ulong)( (uchar *)slots - (uchar *)reasm );
51 3 : reasm->pub_slots_off = (ulong)( (uchar *)pub_slots - (uchar *)reasm );
52 3 : reasm->chunks_off = (ulong)( (uchar *)chunks - (uchar *)reasm );
53 :
54 3 : reasm->depth = (uint)depth;
55 3 : reasm->burst = (uint)burst;
56 3 : reasm->head = (uint)slot_cnt-1U;
57 3 : reasm->tail = (uint)depth;
58 3 : reasm->slot_cnt = (uint)slot_cnt;
59 3 : reasm->orig = (ushort)orig;
60 :
61 : /* Initial slot distribution */
62 :
63 3 : fd_tpu_reasm_reset( reasm );
64 :
65 3 : FD_COMPILER_MFENCE();
66 3 : reasm->magic = FD_TPU_REASM_MAGIC;
67 3 : FD_COMPILER_MFENCE();
68 :
69 3 : return reasm;
70 3 : }
71 :
72 : void
73 6 : fd_tpu_reasm_reset( fd_tpu_reasm_t * reasm ) {
74 :
75 6 : uint depth = reasm->depth;
76 6 : uint burst = reasm->burst;
77 6 : uint node_cnt = depth+burst;
78 6 : fd_tpu_reasm_slot_t * slots = fd_tpu_reasm_slots_laddr( reasm );
79 6 : uint * pub_slots = fd_tpu_reasm_pub_slots_laddr( reasm );
80 :
81 : /* The initial state moves the first 'depth' slots to the mcache (PUB)
82 : and leaves the rest as FREE. */
83 :
84 774 : for( uint j=0U; j<depth; j++ ) {
85 768 : slots [ j ].state = FD_TPU_REASM_STATE_PUB;
86 768 : pub_slots[ j ] = j;
87 768 : }
88 774 : for( uint j=depth; j<node_cnt; j++ ) {
89 768 : fd_tpu_reasm_slot_t * slot = slots + j;
90 768 : slot->state = FD_TPU_REASM_STATE_FREE;
91 768 : slot->prev_idx = fd_uint_if( j<node_cnt-1U, j+1U, UINT_MAX );
92 768 : slot->next_idx = fd_uint_if( j>depth, j-1U, UINT_MAX );
93 768 : }
94 6 : }
95 :
96 : fd_tpu_reasm_t *
97 3 : fd_tpu_reasm_join( void * shreasm ) {
98 3 : fd_tpu_reasm_t * reasm = shreasm;
99 3 : if( FD_UNLIKELY( reasm->magic != FD_TPU_REASM_MAGIC ) ) {
100 0 : FD_LOG_WARNING(( "bad magic" ));
101 0 : return NULL;
102 0 : }
103 3 : return reasm;
104 3 : }
105 :
106 : void *
107 3 : fd_tpu_reasm_leave( fd_tpu_reasm_t * reasm ) {
108 3 : return reasm;
109 3 : }
110 :
111 : void *
112 3 : fd_tpu_reasm_delete( void * shreasm ) {
113 3 : fd_tpu_reasm_t * reasm = shreasm;
114 3 : if( FD_UNLIKELY( !reasm ) ) return NULL;
115 3 : reasm->magic = 0UL;
116 3 : return shreasm;
117 3 : }
118 :
119 : FD_FN_CONST ulong
120 : fd_tpu_reasm_chunk0( fd_tpu_reasm_t const * reasm,
121 3 : void const * base ) {
122 3 : return fd_laddr_to_chunk( base, slot_get_data_const( reasm, 0UL ) );
123 3 : }
124 :
125 :
126 : FD_FN_CONST ulong
127 : fd_tpu_reasm_wmark( fd_tpu_reasm_t const * reasm,
128 3 : void const * base ) {
129 : /* U.B. if slot_cnt==0, but this is checked in fd_tpu_reasm_new */
130 3 : return fd_laddr_to_chunk( base, slot_get_data_const( reasm, reasm->slot_cnt - 1UL ) );
131 3 : }
132 :
133 :
134 : fd_tpu_reasm_slot_t *
135 : fd_tpu_reasm_prepare( fd_tpu_reasm_t * reasm,
136 76203 : ulong tsorig ) {
137 76203 : fd_tpu_reasm_slot_t * slot = slotq_pop_tail( reasm );
138 76203 : slot_begin( slot );
139 76203 : slotq_push_head( reasm, slot );
140 76203 : slot->tsorig = (uint)tsorig;
141 76203 : return slot;
142 76203 : }
143 :
144 : int
145 : fd_tpu_reasm_append( fd_tpu_reasm_t * reasm,
146 : fd_tpu_reasm_slot_t * slot,
147 : uchar const * data,
148 : ulong data_sz,
149 65874 : ulong data_off ) {
150 :
151 65874 : if( FD_UNLIKELY( slot->state != FD_TPU_REASM_STATE_BUSY ) )
152 0 : return FD_TPU_REASM_ERR_STATE;
153 :
154 65874 : ulong slot_idx = slot_get_idx( reasm, slot );
155 65874 : ulong mtu = FD_TXN_MTU;
156 65874 : ulong sz0 = slot->sz;
157 :
158 65874 : if( FD_UNLIKELY( data_off>sz0 ) ) {
159 0 : fd_tpu_reasm_cancel( reasm, slot );
160 0 : return FD_TPU_REASM_ERR_SKIP;
161 0 : }
162 :
163 65874 : if( FD_UNLIKELY( data_off<sz0 ) ) {
164 : /* Fragment partially known ... should not happen */
165 0 : ulong skip = sz0 - data_off;
166 0 : if( skip>data_sz ) return FD_TPU_REASM_SUCCESS;
167 0 : data_off += skip;
168 0 : data_sz -= skip;
169 0 : data += skip;
170 0 : }
171 :
172 65874 : ulong sz1 = sz0 + data_sz;
173 65874 : if( FD_UNLIKELY( (sz1<sz0)|(sz1>mtu) ) ) {
174 0 : fd_tpu_reasm_cancel( reasm, slot );
175 0 : return FD_TPU_REASM_ERR_SZ;
176 0 : }
177 :
178 65874 : uchar * msg = slot_get_data( reasm, slot_idx );
179 65874 : fd_memcpy( msg+sz0, data, data_sz );
180 :
181 65874 : slot->sz = (ushort)sz1;
182 65874 : return FD_TPU_REASM_SUCCESS;
183 65874 : }
184 :
185 : int
186 : fd_tpu_reasm_publish( fd_tpu_reasm_t * reasm,
187 : fd_tpu_reasm_slot_t * slot,
188 : fd_frag_meta_t * mcache,
189 : void * base, /* Assumed aligned FD_CHUNK_ALIGN */
190 : ulong seq,
191 65874 : ulong tspub ) {
192 :
193 65874 : if( FD_UNLIKELY( slot->state != FD_TPU_REASM_STATE_BUSY ) )
194 0 : return FD_TPU_REASM_ERR_STATE;
195 :
196 : /* Derive chunk index */
197 65874 : uint slot_idx = slot_get_idx( reasm, slot );
198 65874 : uchar * data = slot_get_data( reasm, slot_idx );
199 65874 : ulong data_laddr = (ulong)data;
200 65874 : ulong chunk = fd_laddr_to_chunk( base, (void *)data_laddr );
201 65874 : if( FD_UNLIKELY( ( data_laddr<(ulong)base ) |
202 65874 : ( chunk>UINT_MAX ) ) ) {
203 0 : FD_LOG_CRIT(( "invalid base %p for slot %p in tpu_reasm %p",
204 0 : base, (void *)slot, (void *)reasm ));
205 0 : }
206 :
207 : /* Acquire mcache line */
208 65874 : ulong depth = reasm->depth;
209 :
210 : /* Detect which slot this message belongs to */
211 65874 : uint * pub_slot = fd_tpu_reasm_pub_slots_laddr( reasm ) + fd_mcache_line_idx( seq, depth );
212 65874 : uint freed_slot_idx = *pub_slot;
213 :
214 65874 : if( FD_UNLIKELY( freed_slot_idx >= reasm->slot_cnt ) ) {
215 : /* mcache corruption */
216 0 : FD_LOG_WARNING(( "mcache corruption detected! tpu_reasm slot %u out of bounds (max %u)",
217 0 : freed_slot_idx, reasm->slot_cnt ));
218 0 : fd_tpu_reasm_reset( reasm );
219 0 : return FD_TPU_REASM_ERR_STATE;
220 0 : }
221 :
222 : /* Mark new slot as published */
223 65874 : slotq_remove( reasm, slot );
224 65874 : slot->state = FD_TPU_REASM_STATE_PUB;
225 65874 : *pub_slot = slot_idx;
226 :
227 : /* Free oldest published slot */
228 65874 : fd_tpu_reasm_slot_t * free_slot = fd_tpu_reasm_slots_laddr( reasm ) + freed_slot_idx;
229 65874 : uint free_slot_state = free_slot->state;
230 65874 : if( FD_UNLIKELY( free_slot_state != FD_TPU_REASM_STATE_PUB ) ) {
231 : /* mcache/slots out of sync (memory leak) */
232 0 : FD_LOG_WARNING(( "mcache corruption detected! tpu_reasm seq %lu owns slot %u, but it's state is %u",
233 0 : seq, freed_slot_idx, free_slot_state ));
234 0 : fd_tpu_reasm_reset( reasm );
235 0 : return FD_TPU_REASM_ERR_STATE;
236 0 : }
237 65874 : free_slot->state = FD_TPU_REASM_STATE_FREE;
238 65874 : slotq_push_tail( reasm, free_slot );
239 :
240 : /* Publish to mcache */
241 65874 : ulong sz = slot->sz;
242 65874 : ulong ctl = fd_frag_meta_ctl( reasm->orig, 1, 1, 0 );
243 65874 : ulong tsorig = slot->tsorig;
244 :
245 65874 : # if FD_HAS_AVX
246 65874 : fd_mcache_publish_avx( mcache, depth, seq, 0UL, chunk, sz, ctl, tsorig, tspub );
247 : # elif FD_HAS_SSE
248 : fd_mcache_publish_sse( mcache, depth, seq, 0UL, chunk, sz, ctl, tsorig, tspub );
249 : # else
250 : fd_frag_meta_t * meta = mcache + fd_mcache_line_idx( seq, depth );
251 : FD_COMPILER_MFENCE();
252 : meta->seq = fd_seq_dec( seq, 1UL );
253 : FD_COMPILER_MFENCE();
254 : meta->chunk = (uint )chunk;
255 : meta->sz = (ushort)sz;
256 : meta->ctl = (ushort)ctl;
257 : meta->tsorig = (uint )tsorig;
258 : meta->tspub = (uint )tspub;
259 : FD_COMPILER_MFENCE();
260 : meta->seq = seq;
261 : FD_COMPILER_MFENCE();
262 : # endif
263 :
264 65874 : return FD_TPU_REASM_SUCCESS;
265 65874 : }
266 :
267 : void
268 : fd_tpu_reasm_cancel( fd_tpu_reasm_t * reasm,
269 9393 : fd_tpu_reasm_slot_t * slot ) {
270 9393 : if( FD_UNLIKELY( slot->state == FD_TPU_REASM_STATE_FREE ) ) return;
271 9393 : slotq_remove( reasm, slot );
272 9393 : slot->state = FD_TPU_REASM_STATE_FREE;
273 9393 : slot->conn_id = 0UL;
274 9393 : slot->stream_id = 0UL;
275 9393 : slotq_push_tail( reasm, slot );
276 9393 : }
|