Line data Source code
1 : #ifndef HEADER_fd_src_disco_quic_fd_tpu_h
2 : #define HEADER_fd_src_disco_quic_fd_tpu_h
3 :
4 : /* fd_tpu provides the server-side of the TPU/QUIC protocol.
5 :
6 : TPU/QUIC is the protocol used to submit transactions to a block
7 : producer. For each txn to be transferred, the client opens a
8 : unidirectional QUIC stream and sends its serialization (see
9 : fd_txn_parse). In the happy case, a txn only requires one packet.
10 :
11 : For txn exceeding MTU size, the txn is fragmented over multiple
12 : packets. For more information, see the specification:
13 : https://github.com/solana-foundation/specs/blob/main/p2p/tpu.md */
14 :
15 : #include "../fd_disco_base.h"
16 : #include "../fd_txn_m_t.h"
17 :
18 : /* FD_TPU_REASM_MTU is the max tango frag sz sent by an fd_tpu_reasm_t.
19 : FD_TPU_REASM_CHUNK_MTU*FD_CHUNK_SZ == FD_TPU_REASM_MTU */
20 :
21 715182 : #define FD_TPU_REASM_CHUNK_MTU (FD_ULONG_ALIGN_UP( FD_TPU_RAW_MTU, FD_CHUNK_SZ )>>FD_CHUNK_LG_SZ)
22 715182 : #define FD_TPU_REASM_MTU (FD_TPU_REASM_CHUNK_MTU<<FD_CHUNK_LG_SZ)
23 :
24 : #define FD_TPU_REASM_ALIGN FD_CHUNK_ALIGN
25 :
26 : #define FD_TPU_REASM_REQ_DATA_SZ(depth, reasm_max) (((depth)+(reasm_max))*FD_TPU_REASM_MTU)
27 :
28 : /* FD_TPU_REASM_{SUCCESS,ERR_{...}} are error codes. These values are
29 : persisted to logs. Entries should not be renumbered and numeric
30 : values should never be reused. */
31 :
32 131682 : #define FD_TPU_REASM_SUCCESS (0)
33 0 : #define FD_TPU_REASM_ERR_SZ (1) /* oversz msg */
34 0 : #define FD_TPU_REASM_ERR_SKIP (2) /* out-of-order data within QUIC stream */
35 0 : #define FD_TPU_REASM_ERR_STATE (3) /* unexpected slot state */
36 :
37 : /* FD_TPU_REASM_STATE_{...} are reasm slot states */
38 :
39 19470351 : #define FD_TPU_REASM_STATE_FREE ((uchar)0) /* free */
40 19470735 : #define FD_TPU_REASM_STATE_BUSY ((uchar)1) /* active reassembly */
41 19535877 : #define FD_TPU_REASM_STATE_PUB ((uchar)2) /* published */
42 :
43 : /* fd_tpu_reasm_t handles incoming data fragments of TPU/QUIC streams.
44 : Frags are expected to be provided via fd_quic callback. Each
45 : tpu_reasm object may only serve a single fd_quic object. Dispatches
46 : reassembled messages to an mcache.) Should not be persisted.
47 :
48 : ### Flow Control
49 :
50 : fd_tpu_reasm is wired up as follows:
51 :
52 : ┌────────┐ ┌───────┐ ┌────────┐
53 : │ QUIC │ callbacks │ tpu_ │ tango │ sig_ │
54 : │ Server ├───────────► reasm ├───────► verify │
55 : └────────┘ └───────┘ └────────┘
56 :
57 : Neither of the pictured links backpressure. Packet loss occurs if
58 : (1) the QUIC server accepts more concurrent streams than available
59 : reassembly slots. Also if (2) the bank of sig verify tiles is too
60 : slow to keepup with incoming transactions.
61 :
62 : The application should thus adjust the QUIC server to throttle the
63 : concurrent stream count and transaction rate to appropriate levels.
64 : (Via QUIC connection quotas)
65 :
66 : The tpu_reasm MUST be the only writer to the mcache.
67 :
68 : ### Eviction Policy
69 :
70 : Aforementioned case 1 specifically happens whenever the QUIC server
71 : accepts a stream and tpu_reasm doesn't find a free slot. tpu_reasm
72 : hardcodes a FIFO eviction policy to handle this case by cancelling
73 : the least recently prepared reassembly. This also guarantees that
74 : unfragmented transaction never get dropped.
75 :
76 : ### Internals
77 :
78 : fd_tpu_reasm internally manages an array of message reassembly
79 : buffers. Each of these is called a "slot" (fd_tpu_reasm_slot_t).
80 :
81 : Slots are either owned by the reassembly fifo (FREE, BUSY states), or
82 : the mcache (PUB state). The ownership separation prevents in-flight
83 : reassemblies from thrashing data exposed to consumers via the mcache.
84 : (Data races transitioning between reassembly and fifo ownership are
85 : handled by the speculative receive pattern.)
86 :
87 : The lifecycle of a slot is:
88 :
89 : prepare() publish()
90 : ┌─► FREE ───► BUSY ───► PUB ─┐
91 : │ │ │
92 : ▲ ▼ cancel() ▼ implied by a later
93 : │ │ │ publish()/cancel()
94 : └──────◄───────┴──────◄──────┘
95 :
96 : prepare: The transition from FREE to BUSY occurs when a new QUIC
97 : stream is accepted.
98 : cancel: The transition from BUSY to FREE occurs when stream/txn
99 : reassembly is aborted. This can happen for whatever
100 : explicit reason (peer kicked, network error), or implicitly
101 : when prepare() is called but no free slot was found.
102 : publish: The transition from BUSY to PUB occurs when a slot holding
103 : a complete txn is made visible to downstream consumers.
104 : This moves a slot from the reassembly fifo to the mcache.
105 :
106 : The transition from PUB to FREE also occurs at the same time (for a
107 : different slot). This moves the least recently published slot from
108 : the mcache into the reassembly fifo. This keeps the number of slots
109 : owned by the mcache at _exactly_ depth at all times and exactly
110 : mirroring the set of packets exposed downstream (notwithstanding a
111 : startup transient of up to depth packets). This also guarantees that
112 : the number of slots in the FREE and BUSY states is kept at _exactly_
113 : reasm_max at all times.
114 :
115 : In order to support the above, the 'pub_slots' lookup table tracks
116 : which published mcache lines (indexed by `seq % depth`) correspond to
117 : which slot indexes. */
118 :
119 :
120 : /* fd_tpu_reasm_slot_t holds a message reassembly buffer.
121 : Carefully tuned to 32 byte size. */
122 :
123 : struct fd_tpu_reasm_key {
124 : ulong conn_uid; /* ULONG_MAX means invalid */
125 : ulong stream_id : 48;
126 : ulong sz : 14; /* size of the txn payload data. does not
127 : include the sizeof(fd_txn_m_t) bytes that
128 : precedes the payload in each slot. */
129 : ulong state : 2;
130 : };
131 :
132 19548183 : #define FD_TPU_REASM_SID_MASK (0xffffffffffffUL)
133 65841 : #define FD_TPU_REASM_SZ_MASK (0x3fffUL)
134 :
135 : typedef struct fd_tpu_reasm_key fd_tpu_reasm_key_t;
136 :
137 : struct __attribute__((aligned(16))) fd_tpu_reasm_slot {
138 : fd_tpu_reasm_key_t k; /* FIXME ugly: the compound key has to be a single struct member */
139 : uint lru_prev;
140 : uint lru_next;
141 : uint chain_next;
142 : uint tsorig_comp;
143 : };
144 :
145 : typedef struct fd_tpu_reasm_slot fd_tpu_reasm_slot_t;
146 :
147 : struct __attribute__((aligned(FD_TPU_REASM_ALIGN))) fd_tpu_reasm {
148 : ulong magic; /* ==FD_TPU_REASM_MAGIC */
149 :
150 : ulong slots_off; /* slots mem */
151 : ulong pub_slots_off; /* pub_slots mem */
152 : ulong map_off; /* map mem */
153 : uchar * dcache; /* points to first dcache data byte in local address space */
154 :
155 : uint depth; /* mcache depth */
156 : uint burst; /* max concurrent reassemblies */
157 :
158 : uint head; /* least recent reassembly */
159 : uint tail; /* most recent reassembly */
160 :
161 : uint slot_cnt;
162 : ushort orig; /* tango orig */
163 : };
164 :
165 : typedef struct fd_tpu_reasm fd_tpu_reasm_t;
166 :
167 : FD_PROTOTYPES_BEGIN
168 :
169 : /* Private accessors */
170 :
171 : static inline FD_FN_PURE fd_tpu_reasm_slot_t *
172 20228256 : fd_tpu_reasm_slots_laddr( fd_tpu_reasm_t * reasm ) {
173 20228256 : return (fd_tpu_reasm_slot_t *)( (ulong)reasm + reasm->slots_off );
174 20228256 : }
175 :
176 : static inline FD_FN_PURE fd_tpu_reasm_slot_t const *
177 358953 : fd_tpu_reasm_slots_laddr_const( fd_tpu_reasm_t const * reasm ) {
178 358953 : return (fd_tpu_reasm_slot_t const *)( (ulong)reasm + reasm->slots_off );
179 358953 : }
180 :
181 : static inline FD_FN_PURE uint *
182 216783 : fd_tpu_reasm_pub_slots_laddr( fd_tpu_reasm_t * reasm ) {
183 216783 : return (uint *)( (ulong)reasm + reasm->pub_slots_off );
184 216783 : }
185 :
186 : /* Construction API */
187 :
188 : /* fd_tpu_reasm_{align,footprint} return the required alignment and
189 : footprint of a memory region suitable for use as a tpu_reasm that
190 : can reassemble up to 'reasm_max' txns concurrently. 'depth' is the
191 : entry count of the target mcache. mtu is the max sz of a serialized
192 : txn (usually FD_TXN_MTU). */
193 :
194 : FD_FN_CONST ulong
195 : fd_tpu_reasm_align( void );
196 :
197 : FD_FN_CONST ulong
198 : fd_tpu_reasm_footprint( ulong depth, /* Assumed in {2^0,2^1,2^2,...,2^31} */
199 : ulong reasm_max ); /* Assumed in [1,2^31) */
200 :
201 : FD_FN_CONST static inline ulong
202 : fd_tpu_reasm_req_data_sz( ulong depth,
203 6 : ulong reasm_max ) { /* Assumed in [1,2^31) */
204 6 : return (depth+reasm_max) * FD_TPU_REASM_MTU;
205 6 : }
206 :
207 : /* fd_tpu_reasm_new formats an unused memory region for use as a
208 : tpu_reasm. shmem is a non-NULL pointer to this region in the local
209 : address space with the required footprint and alignment. {depth,
210 : reasm_max,mtu} as described above. orig is the Tango origin ID of
211 : this tpu_reasm. dcache is a local join to an fd_dcache that
212 : tpu_reasm will write frags to. dcache should have at least
213 : fd_tpu_reasm_req_data_sz() bytes of data_sz. The dcache app region
214 : is ignored and not written to. */
215 :
216 : void *
217 : fd_tpu_reasm_new( void * shmem,
218 : ulong depth, /* Assumed in {2^0,2^1,2^2,...,2^32} */
219 : ulong reasm_max, /* Assumed in [1,2^32) */
220 : ulong orig, /* Assumed in [0,FD_FRAG_META_ORIG_MAX) */
221 : void * dcache );
222 :
223 : fd_tpu_reasm_t *
224 : fd_tpu_reasm_join( void * shreasm );
225 :
226 : void *
227 : fd_tpu_reasm_leave( fd_tpu_reasm_t * reasm );
228 :
229 : void *
230 : fd_tpu_reasm_delete( void * shreasm );
231 :
232 : /* Accessor API */
233 :
234 : fd_tpu_reasm_slot_t *
235 : fd_tpu_reasm_query( fd_tpu_reasm_t * reasm,
236 : ulong conn_uid,
237 : ulong stream_id );
238 :
239 : FD_FN_PURE static inline fd_tpu_reasm_slot_t *
240 0 : fd_tpu_reasm_peek_tail( fd_tpu_reasm_t * reasm ) {
241 0 : uint tail_idx = reasm->tail;
242 0 : fd_tpu_reasm_slot_t * tail = fd_tpu_reasm_slots_laddr( reasm ) + tail_idx;
243 0 : return tail;
244 0 : }
245 :
246 : fd_tpu_reasm_slot_t *
247 : fd_tpu_reasm_prepare( fd_tpu_reasm_t * reasm,
248 : ulong conn_uid,
249 : ulong stream_id,
250 : long tspub );
251 :
252 : static inline fd_tpu_reasm_slot_t *
253 : fd_tpu_reasm_acquire( fd_tpu_reasm_t * reasm,
254 : ulong conn_uid,
255 : ulong stream_id,
256 76509 : long tspub ) {
257 76509 : fd_tpu_reasm_slot_t * slot = fd_tpu_reasm_query( reasm, conn_uid, stream_id );
258 76509 : if( !slot ) {
259 76125 : slot = fd_tpu_reasm_prepare( reasm, conn_uid, stream_id, tspub );
260 76125 : }
261 76509 : return slot;
262 76509 : }
263 :
264 : /* fd_tpu_reasm_frag appends a new stream frag to the reasm slot.
265 : [data,data+data_sz) is the memory region containing the stream data.
266 : data_off is the offset of this stream data. Slot reassembly buffer
267 : is appended with copy of [data,data+data_sz) on success. On failure,
268 : cancels the reassembly.
269 :
270 : Return values one of:
271 :
272 : FD_TPU_REASM_SUCCESS: success, fragment added to reassembly
273 : FD_TPU_REASM_EAGAIN: incomplete
274 : FD_TPU_REASM_ERR_SZ: fail, data_off + data_sz > mtu
275 : FD_TPU_REASM_ERR_SKIP: fail, data_off - slot->sz > 0
276 :
277 : Note on SKIP error: RFC 9000 Section 2.2 specifies "QUIC makes no
278 : specific allowances for delivery of stream data out of order." */
279 :
280 : int
281 : fd_tpu_reasm_frag( fd_tpu_reasm_t * reasm,
282 : fd_tpu_reasm_slot_t * slot,
283 : uchar const * data,
284 : ulong sz,
285 : ulong off );
286 :
287 : /* fd_tpu_reasm_publish completes a stream reassembly and publishes the
288 : message to an mcache for downstream consumption. base is the address
289 : of the chunk whose index is 0 (chunk0 param of fd_chunk_to_laddr).
290 : {seq,sig,tspub} are mcache frag params. If slot does not have active
291 : reassembly or txn parsing failed, returns NULL. If base is not valid
292 : for tpu_reasm, aborts. Final msg sz in [0,mtu+FD_CHUNK_SZ). */
293 :
294 : int
295 : fd_tpu_reasm_publish( fd_tpu_reasm_t * reasm,
296 : fd_tpu_reasm_slot_t * slot,
297 : fd_frag_meta_t * mcache,
298 : void * base, /* Assumed aligned FD_CHUNK_ALIGN */
299 : ulong seq,
300 : long tspub,
301 : uint source_ipv4,
302 : uchar source_tpu );
303 :
304 : /* fd_tpu_reasm_publish_fast is a streamlined version of acquire/frag/
305 : publish. */
306 :
307 : int
308 : fd_tpu_reasm_publish_fast( fd_tpu_reasm_t * reasm,
309 : uchar const * data,
310 : ulong sz,
311 : fd_frag_meta_t * mcache,
312 : void * base, /* Assumed aligned FD_CHUNK_ALIGN */
313 : ulong seq,
314 : long tspub,
315 : uint source_ipv4,
316 : uchar source_tpu );
317 :
318 : /* fd_tpu_reasm_cancel cancels the given stream reassembly. */
319 :
320 : void
321 : fd_tpu_reasm_cancel( fd_tpu_reasm_t * reasm,
322 : fd_tpu_reasm_slot_t * slot );
323 :
324 : /* fd_tpu_reasm_key_hash is an unrolled version of fd_hash (xxhash-r39) */
325 :
326 78230124 : #define C1 (11400714785074694791UL)
327 58672593 : #define C2 (14029467366897019727UL)
328 19557531 : #define C3 ( 1609587929392839161UL)
329 39115062 : #define C4 ( 9650029242287828579UL)
330 19557531 : #define C5 ( 2870177450012600261UL)
331 :
332 : FD_FN_PURE static inline ulong
333 : fd_tpu_reasm_key_hash( fd_tpu_reasm_key_t const * k,
334 19557531 : ulong seed ) {
335 :
336 19557531 : ulong h = seed + C5 + 16UL;
337 19557531 : ulong w0 = k->conn_uid;
338 19557531 : ulong w1 = k->stream_id;
339 :
340 19557531 : w0 *= C2; w0 = fd_ulong_rotate_left( w0, 31 ); w0 *= C1; h ^= w0; h = fd_ulong_rotate_left( h, 27 )*C1 + C4;
341 19557531 : w1 *= C2; w1 = fd_ulong_rotate_left( w1, 31 ); w1 *= C1; h ^= w1; h = fd_ulong_rotate_left( h, 27 )*C1 + C4;
342 :
343 : /* Final avalanche */
344 19557531 : h ^= h >> 33;
345 19557531 : h *= C2;
346 19557531 : h ^= h >> 29;
347 19557531 : h *= C3;
348 19557531 : h ^= h >> 32;
349 :
350 19557531 : return h;
351 19557531 : }
352 :
353 : #undef C1
354 : #undef C2
355 : #undef C3
356 : #undef C4
357 : #undef C5
358 :
359 : FD_PROTOTYPES_END
360 :
361 : #endif /* HEADER_fd_src_disco_quic_fd_tpu_h */
|