LCOV - code coverage report
Current view: top level - disco/quic - fd_tpu.h (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 45 53 84.9 %
Date: 2025-09-18 04:41:32 Functions: 11 91 12.1 %

          Line data    Source code
       1             : #ifndef HEADER_fd_src_disco_quic_fd_tpu_h
       2             : #define HEADER_fd_src_disco_quic_fd_tpu_h
       3             : 
       4             : /* fd_tpu provides the server-side of the TPU/QUIC protocol.
       5             : 
       6             :    TPU/QUIC is the protocol used to submit transactions to a block
       7             :    producer.  For each txn to be transferred, the client opens a
       8             :    unidirectional QUIC stream and sends its serialization (see
       9             :    fd_txn_parse).  In the happy case, a txn only requires one packet.
      10             : 
      11             :    For txn exceeding MTU size, the txn is fragmented over multiple
      12             :    packets.  For more information, see the specification:
      13             :    https://github.com/solana-foundation/specs/blob/main/p2p/tpu.md */
      14             : 
      15             : #include "../fd_disco_base.h"
      16             : #include "../fd_txn_m_t.h"
      17             : 
      18             : /* FD_TPU_REASM_MTU is the max tango frag sz sent by an fd_tpu_reasm_t.
      19             :    FD_TPU_REASM_CHUNK_MTU*FD_CHUNK_SZ == FD_TPU_REASM_MTU */
      20             : 
      21      715182 : #define FD_TPU_REASM_CHUNK_MTU (FD_ULONG_ALIGN_UP( FD_TPU_RAW_MTU, FD_CHUNK_SZ )>>FD_CHUNK_LG_SZ)
      22      715182 : #define FD_TPU_REASM_MTU       (FD_TPU_REASM_CHUNK_MTU<<FD_CHUNK_LG_SZ)
      23             : 
      24             : #define FD_TPU_REASM_ALIGN FD_CHUNK_ALIGN
      25             : 
      26             : #define FD_TPU_REASM_REQ_DATA_SZ(depth, reasm_max) (((depth)+(reasm_max))*FD_TPU_REASM_MTU)
      27             : 
      28             : /* FD_TPU_REASM_{SUCCESS,ERR_{...}} are error codes.  These values are
      29             :    persisted to logs.  Entries should not be renumbered and numeric
      30             :    values should never be reused. */
      31             : 
      32      131682 : #define FD_TPU_REASM_SUCCESS   (0)
      33           0 : #define FD_TPU_REASM_ERR_SZ    (1)  /* oversz msg */
      34           0 : #define FD_TPU_REASM_ERR_SKIP  (2)  /* out-of-order data within QUIC stream */
      35           0 : #define FD_TPU_REASM_ERR_STATE (3)  /* unexpected slot state */
      36             : 
      37             : /* FD_TPU_REASM_STATE_{...} are reasm slot states */
      38             : 
      39    19470351 : #define FD_TPU_REASM_STATE_FREE ((uchar)0)  /* free */
      40    19470735 : #define FD_TPU_REASM_STATE_BUSY ((uchar)1)  /* active reassembly */
      41    19535877 : #define FD_TPU_REASM_STATE_PUB  ((uchar)2)  /* published */
      42             : 
      43             : /* fd_tpu_reasm_t handles incoming data fragments of TPU/QUIC streams.
      44             :    Frags are expected to be provided via fd_quic callback.  Each
      45             :    tpu_reasm object may only serve a single fd_quic object.  Dispatches
      46             :    reassembled messages to an mcache.)  Should not be persisted.
      47             : 
      48             :    ### Flow Control
      49             : 
      50             :    fd_tpu_reasm is wired up as follows:
      51             : 
      52             :      ┌────────┐           ┌───────┐       ┌────────┐
      53             :      │  QUIC  │ callbacks │ tpu_  │ tango │ sig_   │
      54             :      │ Server ├───────────► reasm ├───────► verify │
      55             :      └────────┘           └───────┘       └────────┘
      56             : 
      57             :    Neither of the pictured links backpressure.  Packet loss occurs if
      58             :    (1) the QUIC server accepts more concurrent streams than available
      59             :    reassembly slots.  Also if (2) the bank of sig verify tiles is too
      60             :    slow to keepup with incoming transactions.
      61             : 
      62             :    The application should thus adjust the QUIC server to throttle the
      63             :    concurrent stream count and transaction rate to appropriate levels.
      64             :    (Via QUIC connection quotas)
      65             : 
      66             :    The tpu_reasm MUST be the only writer to the mcache.
      67             : 
      68             :    ### Eviction Policy
      69             : 
      70             :    Aforementioned case 1 specifically happens whenever the QUIC server
      71             :    accepts a stream and tpu_reasm doesn't find a free slot.  tpu_reasm
      72             :    hardcodes a FIFO eviction policy to handle this case by cancelling
      73             :    the least recently prepared reassembly.  This also guarantees that
      74             :    unfragmented transaction never get dropped.
      75             : 
      76             :    ### Internals
      77             : 
      78             :    fd_tpu_reasm internally manages an array of message reassembly
      79             :    buffers.  Each of these is called a "slot" (fd_tpu_reasm_slot_t).
      80             : 
      81             :    Slots are either owned by the reassembly fifo (FREE, BUSY states), or
      82             :    the mcache (PUB state).  The ownership separation prevents in-flight
      83             :    reassemblies from thrashing data exposed to consumers via the mcache.
      84             :    (Data races transitioning between reassembly and fifo ownership are
      85             :    handled by the speculative receive pattern.)
      86             : 
      87             :    The lifecycle of a slot is:
      88             : 
      89             :             prepare()  publish()
      90             :      ┌─► FREE ───► BUSY ───► PUB ─┐
      91             :      │              │             │
      92             :      ▲              ▼ cancel()    ▼ implied by a later
      93             :      │              │             │ publish()/cancel()
      94             :      └──────◄───────┴──────◄──────┘
      95             : 
      96             :    prepare: The transition from FREE to BUSY occurs when a new QUIC
      97             :             stream is accepted.
      98             :    cancel:  The transition from BUSY to FREE occurs when stream/txn
      99             :             reassembly is aborted.  This can happen for whatever
     100             :             explicit reason (peer kicked, network error), or implicitly
     101             :             when prepare() is called but no free slot was found.
     102             :    publish: The transition from BUSY to PUB occurs when a slot holding
     103             :             a complete txn is made visible to downstream consumers.
     104             :             This moves a slot from the reassembly fifo to the mcache.
     105             : 
     106             :    The transition from PUB to FREE also occurs at the same time (for a
     107             :    different slot).  This moves the least recently published slot from
     108             :    the mcache into the reassembly fifo.  This keeps the number of slots
     109             :    owned by the mcache at _exactly_ depth at all times and exactly
     110             :    mirroring the set of packets exposed downstream (notwithstanding a
     111             :    startup transient of up to depth packets).  This also guarantees that
     112             :    the number of slots in the FREE and BUSY states is kept at _exactly_
     113             :    reasm_max at all times.
     114             : 
     115             :    In order to support the above, the 'pub_slots' lookup table tracks
     116             :    which published mcache lines (indexed by `seq % depth`) correspond to
     117             :    which slot indexes. */
     118             : 
     119             : 
     120             : /* fd_tpu_reasm_slot_t holds a message reassembly buffer.
     121             :    Carefully tuned to 32 byte size. */
     122             : 
     123             : struct fd_tpu_reasm_key {
     124             :   ulong conn_uid; /* ULONG_MAX means invalid */
     125             :   ulong stream_id : 48;
     126             :   ulong sz        : 14; /* size of the txn payload data.  does not
     127             :                            include the sizeof(fd_txn_m_t) bytes that
     128             :                            precedes the payload in each slot. */
     129             :   ulong state     : 2;
     130             : };
     131             : 
     132    19548183 : #define FD_TPU_REASM_SID_MASK (0xffffffffffffUL)
     133       65841 : #define FD_TPU_REASM_SZ_MASK  (0x3fffUL)
     134             : 
     135             : typedef struct fd_tpu_reasm_key fd_tpu_reasm_key_t;
     136             : 
     137             : struct __attribute__((aligned(16))) fd_tpu_reasm_slot {
     138             :   fd_tpu_reasm_key_t k; /* FIXME ugly: the compound key has to be a single struct member */
     139             :   uint lru_prev;
     140             :   uint lru_next;
     141             :   uint chain_next;
     142             :   uint tsorig_comp;
     143             : };
     144             : 
     145             : typedef struct fd_tpu_reasm_slot fd_tpu_reasm_slot_t;
     146             : 
     147             : struct __attribute__((aligned(FD_TPU_REASM_ALIGN))) fd_tpu_reasm {
     148             :   ulong magic;  /* ==FD_TPU_REASM_MAGIC */
     149             : 
     150             :   ulong   slots_off;     /* slots mem     */
     151             :   ulong   pub_slots_off; /* pub_slots mem */
     152             :   ulong   map_off;       /* map mem */
     153             :   uchar * dcache;        /* points to first dcache data byte in local address space */
     154             : 
     155             :   uint   depth;       /* mcache depth */
     156             :   uint   burst;       /* max concurrent reassemblies */
     157             : 
     158             :   uint   head;        /* least recent reassembly */
     159             :   uint   tail;        /* most  recent reassembly */
     160             : 
     161             :   uint   slot_cnt;
     162             :   ushort orig;        /* tango orig */
     163             : };
     164             : 
     165             : typedef struct fd_tpu_reasm fd_tpu_reasm_t;
     166             : 
     167             : FD_PROTOTYPES_BEGIN
     168             : 
     169             : /* Private accessors */
     170             : 
     171             : static inline FD_FN_PURE fd_tpu_reasm_slot_t *
     172    20228256 : fd_tpu_reasm_slots_laddr( fd_tpu_reasm_t * reasm ) {
     173    20228256 :   return (fd_tpu_reasm_slot_t *)( (ulong)reasm + reasm->slots_off );
     174    20228256 : }
     175             : 
     176             : static inline FD_FN_PURE fd_tpu_reasm_slot_t const *
     177      358953 : fd_tpu_reasm_slots_laddr_const( fd_tpu_reasm_t const * reasm ) {
     178      358953 :   return (fd_tpu_reasm_slot_t const *)( (ulong)reasm + reasm->slots_off );
     179      358953 : }
     180             : 
     181             : static inline FD_FN_PURE uint *
     182      216783 : fd_tpu_reasm_pub_slots_laddr( fd_tpu_reasm_t * reasm ) {
     183      216783 :   return (uint *)( (ulong)reasm + reasm->pub_slots_off );
     184      216783 : }
     185             : 
     186             : /* Construction API */
     187             : 
     188             : /* fd_tpu_reasm_{align,footprint} return the required alignment and
     189             :    footprint of a memory region suitable for use as a tpu_reasm that
     190             :    can reassemble up to 'reasm_max' txns concurrently.  'depth' is the
     191             :    entry count of the target mcache.  mtu is the max sz of a serialized
     192             :    txn (usually FD_TXN_MTU). */
     193             : 
     194             : FD_FN_CONST ulong
     195             : fd_tpu_reasm_align( void );
     196             : 
     197             : FD_FN_CONST ulong
     198             : fd_tpu_reasm_footprint( ulong depth,       /* Assumed in {2^0,2^1,2^2,...,2^31} */
     199             :                         ulong reasm_max ); /* Assumed in [1,2^31) */
     200             : 
     201             : FD_FN_CONST static inline ulong
     202             : fd_tpu_reasm_req_data_sz( ulong depth,
     203           6 :                           ulong reasm_max ) { /* Assumed in [1,2^31) */
     204           6 :   return (depth+reasm_max) * FD_TPU_REASM_MTU;
     205           6 : }
     206             : 
     207             : /* fd_tpu_reasm_new formats an unused memory region for use as a
     208             :    tpu_reasm.  shmem is a non-NULL pointer to this region in the local
     209             :    address space with the required footprint and alignment.  {depth,
     210             :    reasm_max,mtu} as described above.  orig is the Tango origin ID of
     211             :    this tpu_reasm.  dcache is a local join to an fd_dcache that
     212             :    tpu_reasm will write frags to.  dcache should have at least
     213             :    fd_tpu_reasm_req_data_sz() bytes of data_sz.  The dcache app region
     214             :    is ignored and not written to. */
     215             : 
     216             : void *
     217             : fd_tpu_reasm_new( void * shmem,
     218             :                   ulong  depth,     /* Assumed in {2^0,2^1,2^2,...,2^32} */
     219             :                   ulong  reasm_max, /* Assumed in [1,2^32) */
     220             :                   ulong  orig,      /* Assumed in [0,FD_FRAG_META_ORIG_MAX) */
     221             :                   void * dcache );
     222             : 
     223             : fd_tpu_reasm_t *
     224             : fd_tpu_reasm_join( void * shreasm );
     225             : 
     226             : void *
     227             : fd_tpu_reasm_leave( fd_tpu_reasm_t * reasm );
     228             : 
     229             : void *
     230             : fd_tpu_reasm_delete( void * shreasm );
     231             : 
     232             : /* Accessor API */
     233             : 
     234             : fd_tpu_reasm_slot_t *
     235             : fd_tpu_reasm_query( fd_tpu_reasm_t * reasm,
     236             :                     ulong            conn_uid,
     237             :                     ulong            stream_id );
     238             : 
     239             : FD_FN_PURE static inline fd_tpu_reasm_slot_t *
     240           0 : fd_tpu_reasm_peek_tail( fd_tpu_reasm_t * reasm ) {
     241           0 :   uint                  tail_idx = reasm->tail;
     242           0 :   fd_tpu_reasm_slot_t * tail     = fd_tpu_reasm_slots_laddr( reasm ) + tail_idx;
     243           0 :   return tail;
     244           0 : }
     245             : 
     246             : fd_tpu_reasm_slot_t *
     247             : fd_tpu_reasm_prepare( fd_tpu_reasm_t * reasm,
     248             :                       ulong            conn_uid,
     249             :                       ulong            stream_id,
     250             :                       long             tspub );
     251             : 
     252             : static inline fd_tpu_reasm_slot_t *
     253             : fd_tpu_reasm_acquire( fd_tpu_reasm_t * reasm,
     254             :                       ulong            conn_uid,
     255             :                       ulong            stream_id,
     256       76509 :                       long             tspub ) {
     257       76509 :   fd_tpu_reasm_slot_t * slot = fd_tpu_reasm_query( reasm, conn_uid, stream_id );
     258       76509 :   if( !slot ) {
     259       76125 :     slot = fd_tpu_reasm_prepare( reasm, conn_uid, stream_id, tspub );
     260       76125 :   }
     261       76509 :   return slot;
     262       76509 : }
     263             : 
     264             : /* fd_tpu_reasm_frag appends a new stream frag to the reasm slot.
     265             :    [data,data+data_sz) is the memory region containing the stream data.
     266             :    data_off is the offset of this stream data.  Slot reassembly buffer
     267             :    is appended with copy of [data,data+data_sz) on success.  On failure,
     268             :    cancels the reassembly.
     269             : 
     270             :    Return values one of:
     271             : 
     272             :      FD_TPU_REASM_SUCCESS:   success, fragment added to reassembly
     273             :      FD_TPU_REASM_EAGAIN:    incomplete
     274             :      FD_TPU_REASM_ERR_SZ:    fail, data_off + data_sz  > mtu
     275             :      FD_TPU_REASM_ERR_SKIP:  fail, data_off - slot->sz > 0
     276             : 
     277             :    Note on SKIP error:  RFC 9000 Section 2.2 specifies "QUIC makes no
     278             :    specific allowances for delivery of stream data out of order." */
     279             : 
     280             : int
     281             : fd_tpu_reasm_frag( fd_tpu_reasm_t *      reasm,
     282             :                    fd_tpu_reasm_slot_t * slot,
     283             :                    uchar const *         data,
     284             :                    ulong                 sz,
     285             :                    ulong                 off );
     286             : 
     287             : /* fd_tpu_reasm_publish completes a stream reassembly and publishes the
     288             :    message to an mcache for downstream consumption.  base is the address
     289             :    of the chunk whose index is 0 (chunk0 param of fd_chunk_to_laddr).
     290             :    {seq,sig,tspub} are mcache frag params.  If slot does not have active
     291             :    reassembly or txn parsing failed, returns NULL.  If base is not valid
     292             :    for tpu_reasm, aborts.  Final msg sz in [0,mtu+FD_CHUNK_SZ). */
     293             : 
     294             : int
     295             : fd_tpu_reasm_publish( fd_tpu_reasm_t *      reasm,
     296             :                       fd_tpu_reasm_slot_t * slot,
     297             :                       fd_frag_meta_t *      mcache,
     298             :                       void *                base,  /* Assumed aligned FD_CHUNK_ALIGN */
     299             :                       ulong                 seq,
     300             :                       long                  tspub,
     301             :                       uint                  source_ipv4,
     302             :                       uchar                 source_tpu );
     303             : 
     304             : /* fd_tpu_reasm_publish_fast is a streamlined version of acquire/frag/
     305             :    publish. */
     306             : 
     307             : int
     308             : fd_tpu_reasm_publish_fast( fd_tpu_reasm_t * reasm,
     309             :                            uchar const *    data,
     310             :                            ulong            sz,
     311             :                            fd_frag_meta_t * mcache,
     312             :                            void *           base,  /* Assumed aligned FD_CHUNK_ALIGN */
     313             :                            ulong            seq,
     314             :                            long             tspub,
     315             :                            uint             source_ipv4,
     316             :                            uchar            source_tpu );
     317             : 
     318             : /* fd_tpu_reasm_cancel cancels the given stream reassembly. */
     319             : 
     320             : void
     321             : fd_tpu_reasm_cancel( fd_tpu_reasm_t *      reasm,
     322             :                      fd_tpu_reasm_slot_t * slot );
     323             : 
     324             : /* fd_tpu_reasm_key_hash is an unrolled version of fd_hash (xxhash-r39) */
     325             : 
     326    78230124 : #define C1 (11400714785074694791UL)
     327    58672593 : #define C2 (14029467366897019727UL)
     328    19557531 : #define C3 ( 1609587929392839161UL)
     329    39115062 : #define C4 ( 9650029242287828579UL)
     330    19557531 : #define C5 ( 2870177450012600261UL)
     331             : 
     332             : FD_FN_PURE static inline ulong
     333             : fd_tpu_reasm_key_hash( fd_tpu_reasm_key_t const * k,
     334    19557531 :                        ulong                      seed ) {
     335             : 
     336    19557531 :   ulong h  = seed + C5 + 16UL;
     337    19557531 :   ulong w0 = k->conn_uid;
     338    19557531 :   ulong w1 = k->stream_id;
     339             : 
     340    19557531 :   w0 *= C2; w0 = fd_ulong_rotate_left( w0, 31 ); w0 *= C1; h ^= w0; h = fd_ulong_rotate_left( h, 27 )*C1 + C4;
     341    19557531 :   w1 *= C2; w1 = fd_ulong_rotate_left( w1, 31 ); w1 *= C1; h ^= w1; h = fd_ulong_rotate_left( h, 27 )*C1 + C4;
     342             : 
     343             :   /* Final avalanche */
     344    19557531 :   h ^= h >> 33;
     345    19557531 :   h *= C2;
     346    19557531 :   h ^= h >> 29;
     347    19557531 :   h *= C3;
     348    19557531 :   h ^= h >> 32;
     349             : 
     350    19557531 :   return h;
     351    19557531 : }
     352             : 
     353             : #undef C1
     354             : #undef C2
     355             : #undef C3
     356             : #undef C4
     357             : #undef C5
     358             : 
     359             : FD_PROTOTYPES_END
     360             : 
     361             : #endif /* HEADER_fd_src_disco_quic_fd_tpu_h */

Generated by: LCOV version 1.14