Line data Source code
1 : #ifndef HEADER_fd_src_disco_quic_fd_tpu_h
2 : #define HEADER_fd_src_disco_quic_fd_tpu_h
3 :
4 : /* fd_tpu provides the server-side of the TPU/QUIC protocol.
5 :
6 : TPU/QUIC is the protocol used to submit transactions to a block
7 : producer. For each txn to be transferred, the client opens a
8 : unidirectional QUIC stream and sends its serialization (see
9 : fd_txn_parse). In the happy case, a txn only requires one packet.
10 :
11 : For txn exceeding MTU size, the txn is fragmented over multiple
12 : packets. For more information, see the specification:
13 : https://github.com/solana-foundation/specs/blob/main/p2p/tpu.md */
14 :
15 : #include "../fd_disco_base.h"
16 :
17 : /* FD_TPU_REASM_MTU is the max tango frag sz sent by an fd_tpu_reasm_t.
18 : FD_TPU_REASM_CHUNK_MTU*FD_CHUNK_SZ == FD_TPU_REASM_MTU */
19 :
20 690588 : #define FD_TPU_REASM_CHUNK_MTU (FD_ULONG_ALIGN_UP( FD_TPU_MTU, FD_CHUNK_SZ )>>FD_CHUNK_LG_SZ)
21 690588 : #define FD_TPU_REASM_MTU (FD_TPU_REASM_CHUNK_MTU<<FD_CHUNK_LG_SZ)
22 :
23 : #define FD_TPU_REASM_ALIGN FD_CHUNK_ALIGN
24 :
25 : #define FD_TPU_REASM_REQ_DATA_SZ(depth, reasm_max) (((depth)+(reasm_max))*FD_TPU_REASM_MTU)
26 :
27 : /* FD_TPU_REASM_{SUCCESS,ERR_{...}} are error codes. These values are
28 : persisted to logs. Entries should not be renumbered and numeric
29 : values should never be reused. */
30 :
31 131682 : #define FD_TPU_REASM_SUCCESS (0)
32 0 : #define FD_TPU_REASM_ERR_SZ (1) /* oversz msg */
33 0 : #define FD_TPU_REASM_ERR_SKIP (2) /* out-of-order data within QUIC stream */
34 0 : #define FD_TPU_REASM_ERR_STATE (3) /* unexpected slot state */
35 :
36 : /* FD_TPU_REASM_STATE_{...} are reasm slot states */
37 :
38 19470351 : #define FD_TPU_REASM_STATE_FREE ((uchar)0) /* free */
39 19470735 : #define FD_TPU_REASM_STATE_BUSY ((uchar)1) /* active reassembly */
40 19535877 : #define FD_TPU_REASM_STATE_PUB ((uchar)2) /* published */
41 :
42 : /* fd_tpu_reasm_t handles incoming data fragments of TPU/QUIC streams.
43 : Frags are expected to be provided via fd_quic callback. Each
44 : tpu_reasm object may only serve a single fd_quic object. Dispatches
45 : reassembled messages to an mcache.) Should not be persisted.
46 :
47 : ### Flow Control
48 :
49 : fd_tpu_reasm is wired up as follows:
50 :
51 : ┌────────┐ ┌───────┐ ┌────────┐
52 : │ QUIC │ callbacks │ tpu_ │ tango │ sig_ │
53 : │ Server ├───────────► reasm ├───────► verify │
54 : └────────┘ └───────┘ └────────┘
55 :
56 : Neither of the pictured links backpressure. Packet loss occurs if
57 : (1) the QUIC server accepts more concurrent streams than available
58 : reassembly slots. Also if (2) the bank of sig verify tiles is too
59 : slow to keepup with incoming transactions.
60 :
61 : The application should thus adjust the QUIC server to throttle the
62 : concurrent stream count and transaction rate to appropriate levels.
63 : (Via QUIC connection quotas)
64 :
65 : The tpu_reasm MUST be the only writer to the mcache.
66 :
67 : ### Eviction Policy
68 :
69 : Aforementioned case 1 specifically happens whenever the QUIC server
70 : accepts a stream and tpu_reasm doesn't find a free slot. tpu_reasm
71 : hardcodes a FIFO eviction policy to handle this case by cancelling
72 : the least recently prepared reassembly. This also guarantees that
73 : unfragmented transaction never get dropped.
74 :
75 : ### Internals
76 :
77 : fd_tpu_reasm internally manages an array of message reassembly
78 : buffers. Each of these is called a "slot" (fd_tpu_reasm_slot_t).
79 :
80 : Slots are either owned by the reassembly fifo (FREE, BUSY states), or
81 : the mcache (PUB state). The ownership separation prevents in-flight
82 : reassemblies from thrashing data exposed to consumers via the mcache.
83 : (Data races transitioning between reassembly and fifo ownership are
84 : handled by the speculative receive pattern.)
85 :
86 : The lifecycle of a slot is:
87 :
88 : prepare() publish()
89 : ┌─► FREE ───► BUSY ───► PUB ─┐
90 : │ │ │
91 : ▲ ▼ cancel() ▼ implied by a later
92 : │ │ │ publish()/cancel()
93 : └──────◄───────┴──────◄──────┘
94 :
95 : prepare: The transition from FREE to BUSY occurs when a new QUIC
96 : stream is accepted.
97 : cancel: The transition from BUSY to FREE occurs when stream/txn
98 : reassembly is aborted. This can happen for whatever
99 : explicit reason (peer kicked, network error), or implicitly
100 : when prepare() is called but no free slot was found.
101 : publish: The transition from BUSY to PUB occurs when a slot holding
102 : a complete txn is made visible to downstream consumers.
103 : This moves a slot from the reassembly fifo to the mcache.
104 :
105 : The transition from PUB to FREE also occurs at the same time (for a
106 : different slot). This moves the least recently published slot from
107 : the mcache into the reassembly fifo. This keeps the number of slots
108 : owned by the mcache at _exactly_ depth at all times and exactly
109 : mirroring the set of packets exposed downstream (notwithstanding a
110 : startup transient of up to depth packets). This also guarantees that
111 : the number of slots in the FREE and BUSY states is kept at _exactly_
112 : reasm_max at all times.
113 :
114 : In order to support the above, the 'pub_slots' lookup table tracks
115 : which published mcache lines (indexed by `seq % depth`) correspond to
116 : which slot indexes. */
117 :
118 :
119 : /* fd_tpu_reasm_slot_t holds a message reassembly buffer.
120 : Carefully tuned to 32 byte size. */
121 :
122 : struct fd_tpu_reasm_key {
123 : ulong conn_uid; /* ULONG_MAX means invalid */
124 : ulong stream_id : 48;
125 : ulong sz : 14;
126 : ulong state : 2;
127 : };
128 :
129 19548183 : #define FD_TPU_REASM_SID_MASK (0xffffffffffffUL)
130 65841 : #define FD_TPU_REASM_SZ_MASK (0x3fffUL)
131 :
132 : typedef struct fd_tpu_reasm_key fd_tpu_reasm_key_t;
133 :
134 : struct __attribute__((aligned(16))) fd_tpu_reasm_slot {
135 : fd_tpu_reasm_key_t k; /* FIXME ugly: the compound key has to be a single struct member */
136 : uint lru_prev;
137 : uint lru_next;
138 : uint chain_next;
139 : uint tsorig_comp;
140 : };
141 :
142 : typedef struct fd_tpu_reasm_slot fd_tpu_reasm_slot_t;
143 :
144 : struct __attribute__((aligned(FD_TPU_REASM_ALIGN))) fd_tpu_reasm {
145 : ulong magic; /* ==FD_TPU_REASM_MAGIC */
146 :
147 : ulong slots_off; /* slots mem */
148 : ulong pub_slots_off; /* pub_slots mem */
149 : ulong map_off; /* map mem */
150 : uchar * dcache; /* points to first dcache data byte in local address space */
151 :
152 : uint depth; /* mcache depth */
153 : uint burst; /* max concurrent reassemblies */
154 :
155 : uint head; /* least recent reassembly */
156 : uint tail; /* most recent reassembly */
157 :
158 : uint slot_cnt;
159 : ushort orig; /* tango orig */
160 : };
161 :
162 : typedef struct fd_tpu_reasm fd_tpu_reasm_t;
163 :
164 : FD_PROTOTYPES_BEGIN
165 :
166 : /* Private accessors */
167 :
168 : static inline FD_FN_PURE fd_tpu_reasm_slot_t *
169 20076942 : fd_tpu_reasm_slots_laddr( fd_tpu_reasm_t * reasm ) {
170 20076942 : return (fd_tpu_reasm_slot_t *)( (ulong)reasm + reasm->slots_off );
171 20076942 : }
172 :
173 : static inline FD_FN_PURE fd_tpu_reasm_slot_t const *
174 358953 : fd_tpu_reasm_slots_laddr_const( fd_tpu_reasm_t const * reasm ) {
175 358953 : return (fd_tpu_reasm_slot_t const *)( (ulong)reasm + reasm->slots_off );
176 358953 : }
177 :
178 : static inline FD_FN_PURE uint *
179 216783 : fd_tpu_reasm_pub_slots_laddr( fd_tpu_reasm_t * reasm ) {
180 216783 : return (uint *)( (ulong)reasm + reasm->pub_slots_off );
181 216783 : }
182 :
183 : /* Construction API */
184 :
185 : /* fd_tpu_reasm_{align,footprint} return the required alignment and
186 : footprint of a memory region suitable for use as a tpu_reasm that
187 : can reassemble up to 'reasm_max' txns concurrently. 'depth' is the
188 : entry count of the target mcache. mtu is the max sz of a serialized
189 : txn (usually FD_TXN_MTU). */
190 :
191 : FD_FN_CONST ulong
192 : fd_tpu_reasm_align( void );
193 :
194 : FD_FN_CONST ulong
195 : fd_tpu_reasm_footprint( ulong depth, /* Assumed in {2^0,2^1,2^2,...,2^31} */
196 : ulong reasm_max ); /* Assumed in [1,2^31) */
197 :
198 : FD_FN_CONST static inline ulong
199 : fd_tpu_reasm_req_data_sz( ulong depth,
200 3 : ulong reasm_max ) { /* Assumed in [1,2^31) */
201 3 : return (depth+reasm_max) * FD_TPU_REASM_MTU;
202 3 : }
203 :
204 : /* fd_tpu_reasm_new formats an unused memory region for use as a
205 : tpu_reasm. shmem is a non-NULL pointer to this region in the local
206 : address space with the required footprint and alignment. {depth,
207 : reasm_max,mtu} as described above. orig is the Tango origin ID of
208 : this tpu_reasm. dcache is a local join to an fd_dcache that
209 : tpu_reasm will write frags to. dcache should have at least
210 : fd_tpu_reasm_req_data_sz() bytes of data_sz. The dcache app region
211 : is ignored and not written to. */
212 :
213 : void *
214 : fd_tpu_reasm_new( void * shmem,
215 : ulong depth, /* Assumed in {2^0,2^1,2^2,...,2^32} */
216 : ulong reasm_max, /* Assumed in [1,2^32) */
217 : ulong orig, /* Assumed in [0,FD_FRAG_META_ORIG_MAX) */
218 : void * dcache );
219 :
220 : fd_tpu_reasm_t *
221 : fd_tpu_reasm_join( void * shreasm );
222 :
223 : void *
224 : fd_tpu_reasm_leave( fd_tpu_reasm_t * reasm );
225 :
226 : void *
227 : fd_tpu_reasm_delete( void * shreasm );
228 :
229 : /* Accessor API */
230 :
231 : fd_tpu_reasm_slot_t *
232 : fd_tpu_reasm_query( fd_tpu_reasm_t * reasm,
233 : ulong conn_uid,
234 : ulong stream_id );
235 :
236 : FD_FN_PURE static inline fd_tpu_reasm_slot_t *
237 0 : fd_tpu_reasm_peek_tail( fd_tpu_reasm_t * reasm ) {
238 0 : uint tail_idx = reasm->tail;
239 0 : fd_tpu_reasm_slot_t * tail = fd_tpu_reasm_slots_laddr( reasm ) + tail_idx;
240 0 : return tail;
241 0 : }
242 :
243 : fd_tpu_reasm_slot_t *
244 : fd_tpu_reasm_prepare( fd_tpu_reasm_t * reasm,
245 : ulong conn_uid,
246 : ulong stream_id,
247 : long tspub );
248 :
249 : static inline fd_tpu_reasm_slot_t *
250 : fd_tpu_reasm_acquire( fd_tpu_reasm_t * reasm,
251 : ulong conn_uid,
252 : ulong stream_id,
253 76509 : long tspub ) {
254 76509 : fd_tpu_reasm_slot_t * slot = fd_tpu_reasm_query( reasm, conn_uid, stream_id );
255 76509 : if( !slot ) {
256 76125 : slot = fd_tpu_reasm_prepare( reasm, conn_uid, stream_id, tspub );
257 76125 : }
258 76509 : return slot;
259 76509 : }
260 :
261 : /* fd_tpu_reasm_frag appends a new stream frag to the reasm slot.
262 : [data,data+data_sz) is the memory region containing the stream data.
263 : data_off is the offset of this stream data. Slot reassembly buffer
264 : is appended with copy of [data,data+data_sz) on success. On failure,
265 : cancels the reassembly.
266 :
267 : Return values one of:
268 :
269 : FD_TPU_REASM_SUCCESS: success, fragment added to reassembly
270 : FD_TPU_REASM_EAGAIN: incomplete
271 : FD_TPU_REASM_ERR_SZ: fail, data_off + data_sz > mtu
272 : FD_TPU_REASM_ERR_SKIP: fail, data_off - slot->sz > 0
273 :
274 : Note on SKIP error: RFC 9000 Section 2.2 specifies "QUIC makes no
275 : specific allowances for delivery of stream data out of order." */
276 :
277 : int
278 : fd_tpu_reasm_frag( fd_tpu_reasm_t * reasm,
279 : fd_tpu_reasm_slot_t * slot,
280 : uchar const * data,
281 : ulong sz,
282 : ulong off );
283 :
284 : /* fd_tpu_reasm_publish completes a stream reassembly and publishes the
285 : message to an mcache for downstream consumption. base is the address
286 : of the chunk whose index is 0 (chunk0 param of fd_chunk_to_laddr).
287 : {seq,sig,tspub} are mcache frag params. If slot does not have active
288 : reassembly or txn parsing failed, returns NULL. If base is not valid
289 : for tpu_reasm, aborts. Final msg sz in [0,mtu+FD_CHUNK_SZ). */
290 :
291 : int
292 : fd_tpu_reasm_publish( fd_tpu_reasm_t * reasm,
293 : fd_tpu_reasm_slot_t * slot,
294 : fd_frag_meta_t * mcache,
295 : void * base, /* Assumed aligned FD_CHUNK_ALIGN */
296 : ulong seq,
297 : long tspub );
298 :
299 : /* fd_tpu_reasm_publish_fast is a streamlined version of acquire/frag/
300 : publish. */
301 :
302 : int
303 : fd_tpu_reasm_publish_fast( fd_tpu_reasm_t * reasm,
304 : uchar const * data,
305 : ulong sz,
306 : fd_frag_meta_t * mcache,
307 : void * base, /* Assumed aligned FD_CHUNK_ALIGN */
308 : ulong seq,
309 : long tspub );
310 :
311 : /* fd_tpu_reasm_cancel cancels the given stream reassembly. */
312 :
313 : void
314 : fd_tpu_reasm_cancel( fd_tpu_reasm_t * reasm,
315 : fd_tpu_reasm_slot_t * slot );
316 :
317 : /* fd_tpu_reasm_key_hash is an unrolled version of fd_hash (xxhash-r39) */
318 :
319 78230124 : #define C1 (11400714785074694791UL)
320 58672593 : #define C2 (14029467366897019727UL)
321 19557531 : #define C3 ( 1609587929392839161UL)
322 39115062 : #define C4 ( 9650029242287828579UL)
323 19557531 : #define C5 ( 2870177450012600261UL)
324 :
325 : static inline ulong
326 : fd_tpu_reasm_key_hash( fd_tpu_reasm_key_t const * k,
327 19557531 : ulong seed ) {
328 :
329 19557531 : ulong h = seed + C5 + 16UL;
330 19557531 : ulong w0 = k->conn_uid;
331 19557531 : ulong w1 = k->stream_id;
332 :
333 19557531 : w0 *= C2; w0 = fd_ulong_rotate_left( w0, 31 ); w0 *= C1; h ^= w0; h = fd_ulong_rotate_left( h, 27 )*C1 + C4;
334 19557531 : w1 *= C2; w1 = fd_ulong_rotate_left( w1, 31 ); w1 *= C1; h ^= w1; h = fd_ulong_rotate_left( h, 27 )*C1 + C4;
335 :
336 : /* Final avalanche */
337 19557531 : h ^= h >> 33;
338 19557531 : h *= C2;
339 19557531 : h ^= h >> 29;
340 19557531 : h *= C3;
341 19557531 : h ^= h >> 32;
342 :
343 19557531 : return h;
344 19557531 : }
345 :
346 : #undef C1
347 : #undef C2
348 : #undef C3
349 : #undef C4
350 : #undef C5
351 :
352 : FD_PROTOTYPES_END
353 :
354 : #endif /* HEADER_fd_src_disco_quic_fd_tpu_h */
|