LCOV - code coverage report
Current view: top level - disco/quic - fd_tpu.h (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 45 53 84.9 %
Date: 2025-01-08 12:08:44 Functions: 10 56 17.9 %

          Line data    Source code
       1             : #ifndef HEADER_fd_src_disco_quic_fd_tpu_h
       2             : #define HEADER_fd_src_disco_quic_fd_tpu_h
       3             : 
       4             : /* fd_tpu provides the server-side of the TPU/QUIC protocol.
       5             : 
       6             :    TPU/QUIC is the protocol used to submit transactions to a block
       7             :    producer.  For each txn to be transferred, the client opens a
       8             :    unidirectional QUIC stream and sends its serialization (see
       9             :    fd_txn_parse).  In the happy case, a txn only requires one packet.
      10             : 
      11             :    For txn exceeding MTU size, the txn is fragmented over multiple
      12             :    packets.  For more information, see the specification:
      13             :    https://github.com/solana-foundation/specs/blob/main/p2p/tpu.md */
      14             : 
      15             : #include "../fd_disco_base.h"
      16             : 
      17             : /* FD_TPU_REASM_MTU is the max tango frag sz sent by an fd_tpu_reasm_t.
      18             :    FD_TPU_REASM_CHUNK_MTU*FD_CHUNK_SZ == FD_TPU_REASM_MTU */
      19             : 
      20      690588 : #define FD_TPU_REASM_CHUNK_MTU (FD_ULONG_ALIGN_UP( FD_TPU_MTU, FD_CHUNK_SZ )>>FD_CHUNK_LG_SZ)
      21      690588 : #define FD_TPU_REASM_MTU       (FD_TPU_REASM_CHUNK_MTU<<FD_CHUNK_LG_SZ)
      22             : 
      23             : #define FD_TPU_REASM_ALIGN FD_CHUNK_ALIGN
      24             : 
      25             : #define FD_TPU_REASM_REQ_DATA_SZ(depth, reasm_max) (((depth)+(reasm_max))*FD_TPU_REASM_MTU)
      26             : 
      27             : /* FD_TPU_REASM_{SUCCESS,ERR_{...}} are error codes.  These values are
      28             :    persisted to logs.  Entries should not be renumbered and numeric
      29             :    values should never be reused. */
      30             : 
      31      131682 : #define FD_TPU_REASM_SUCCESS   (0)
      32           0 : #define FD_TPU_REASM_ERR_SZ    (1)  /* oversz msg */
      33           0 : #define FD_TPU_REASM_ERR_SKIP  (2)  /* out-of-order data within QUIC stream */
      34           0 : #define FD_TPU_REASM_ERR_STATE (3)  /* unexpected slot state */
      35             : 
      36             : /* FD_TPU_REASM_STATE_{...} are reasm slot states */
      37             : 
      38    19470351 : #define FD_TPU_REASM_STATE_FREE ((uchar)0)  /* free */
      39    19470735 : #define FD_TPU_REASM_STATE_BUSY ((uchar)1)  /* active reassembly */
      40    19535877 : #define FD_TPU_REASM_STATE_PUB  ((uchar)2)  /* published */
      41             : 
      42             : /* fd_tpu_reasm_t handles incoming data fragments of TPU/QUIC streams.
      43             :    Frags are expected to be provided via fd_quic callback.  Each
      44             :    tpu_reasm object may only serve a single fd_quic object.  Dispatches
      45             :    reassembled messages to an mcache.)  Should not be persisted.
      46             : 
      47             :    ### Flow Control
      48             : 
      49             :    fd_tpu_reasm is wired up as follows:
      50             : 
      51             :      ┌────────┐           ┌───────┐       ┌────────┐
      52             :      │  QUIC  │ callbacks │ tpu_  │ tango │ sig_   │
      53             :      │ Server ├───────────► reasm ├───────► verify │
      54             :      └────────┘           └───────┘       └────────┘
      55             : 
      56             :    Neither of the pictured links backpressure.  Packet loss occurs if
      57             :    (1) the QUIC server accepts more concurrent streams than available
      58             :    reassembly slots.  Also if (2) the bank of sig verify tiles is too
      59             :    slow to keepup with incoming transactions.
      60             : 
      61             :    The application should thus adjust the QUIC server to throttle the
      62             :    concurrent stream count and transaction rate to appropriate levels.
      63             :    (Via QUIC connection quotas)
      64             : 
      65             :    The tpu_reasm MUST be the only writer to the mcache.
      66             : 
      67             :    ### Eviction Policy
      68             : 
      69             :    Aforementioned case 1 specifically happens whenever the QUIC server
      70             :    accepts a stream and tpu_reasm doesn't find a free slot.  tpu_reasm
      71             :    hardcodes a FIFO eviction policy to handle this case by cancelling
      72             :    the least recently prepared reassembly.  This also guarantees that
      73             :    unfragmented transaction never get dropped.
      74             : 
      75             :    ### Internals
      76             : 
      77             :    fd_tpu_reasm internally manages an array of message reassembly
      78             :    buffers.  Each of these is called a "slot" (fd_tpu_reasm_slot_t).
      79             : 
      80             :    Slots are either owned by the reassembly fifo (FREE, BUSY states), or
      81             :    the mcache (PUB state).  The ownership separation prevents in-flight
      82             :    reassemblies from thrashing data exposed to consumers via the mcache.
      83             :    (Data races transitioning between reassembly and fifo ownership are
      84             :    handled by the speculative receive pattern.)
      85             : 
      86             :    The lifecycle of a slot is:
      87             : 
      88             :             prepare()  publish()
      89             :      ┌─► FREE ───► BUSY ───► PUB ─┐
      90             :      │              │             │
      91             :      ▲              ▼ cancel()    ▼ implied by a later
      92             :      │              │             │ publish()/cancel()
      93             :      └──────◄───────┴──────◄──────┘
      94             : 
      95             :    prepare: The transition from FREE to BUSY occurs when a new QUIC
      96             :             stream is accepted.
      97             :    cancel:  The transition from BUSY to FREE occurs when stream/txn
      98             :             reassembly is aborted.  This can happen for whatever
      99             :             explicit reason (peer kicked, network error), or implicitly
     100             :             when prepare() is called but no free slot was found.
     101             :    publish: The transition from BUSY to PUB occurs when a slot holding
     102             :             a complete txn is made visible to downstream consumers.
     103             :             This moves a slot from the reassembly fifo to the mcache.
     104             : 
     105             :    The transition from PUB to FREE also occurs at the same time (for a
     106             :    different slot).  This moves the least recently published slot from
     107             :    the mcache into the reassembly fifo.  This keeps the number of slots
     108             :    owned by the mcache at _exactly_ depth at all times and exactly
     109             :    mirroring the set of packets exposed downstream (notwithstanding a
     110             :    startup transient of up to depth packets).  This also guarantees that
     111             :    the number of slots in the FREE and BUSY states is kept at _exactly_
     112             :    reasm_max at all times.
     113             : 
     114             :    In order to support the above, the 'pub_slots' lookup table tracks
     115             :    which published mcache lines (indexed by `seq % depth`) correspond to
     116             :    which slot indexes. */
     117             : 
     118             : 
     119             : /* fd_tpu_reasm_slot_t holds a message reassembly buffer.
     120             :    Carefully tuned to 32 byte size. */
     121             : 
     122             : struct fd_tpu_reasm_key {
     123             :   ulong conn_uid; /* ULONG_MAX means invalid */
     124             :   ulong stream_id : 48;
     125             :   ulong sz        : 14;
     126             :   ulong state     : 2;
     127             : };
     128             : 
     129    19548183 : #define FD_TPU_REASM_SID_MASK (0xffffffffffffUL)
     130       65841 : #define FD_TPU_REASM_SZ_MASK  (0x3fffUL)
     131             : 
     132             : typedef struct fd_tpu_reasm_key fd_tpu_reasm_key_t;
     133             : 
     134             : struct __attribute__((aligned(16))) fd_tpu_reasm_slot {
     135             :   fd_tpu_reasm_key_t k; /* FIXME ugly: the compound key has to be a single struct member */
     136             :   uint lru_prev;
     137             :   uint lru_next;
     138             :   uint chain_next;
     139             :   uint tsorig_comp;
     140             : };
     141             : 
     142             : typedef struct fd_tpu_reasm_slot fd_tpu_reasm_slot_t;
     143             : 
     144             : struct __attribute__((aligned(FD_TPU_REASM_ALIGN))) fd_tpu_reasm {
     145             :   ulong magic;  /* ==FD_TPU_REASM_MAGIC */
     146             : 
     147             :   ulong   slots_off;     /* slots mem     */
     148             :   ulong   pub_slots_off; /* pub_slots mem */
     149             :   ulong   map_off;       /* map mem */
     150             :   uchar * dcache;        /* points to first dcache data byte in local address space */
     151             : 
     152             :   uint   depth;       /* mcache depth */
     153             :   uint   burst;       /* max concurrent reassemblies */
     154             : 
     155             :   uint   head;        /* least recent reassembly */
     156             :   uint   tail;        /* most  recent reassembly */
     157             : 
     158             :   uint   slot_cnt;
     159             :   ushort orig;        /* tango orig */
     160             : };
     161             : 
     162             : typedef struct fd_tpu_reasm fd_tpu_reasm_t;
     163             : 
     164             : FD_PROTOTYPES_BEGIN
     165             : 
     166             : /* Private accessors */
     167             : 
     168             : static inline FD_FN_PURE fd_tpu_reasm_slot_t *
     169    20076942 : fd_tpu_reasm_slots_laddr( fd_tpu_reasm_t * reasm ) {
     170    20076942 :   return (fd_tpu_reasm_slot_t *)( (ulong)reasm + reasm->slots_off );
     171    20076942 : }
     172             : 
     173             : static inline FD_FN_PURE fd_tpu_reasm_slot_t const *
     174      358953 : fd_tpu_reasm_slots_laddr_const( fd_tpu_reasm_t const * reasm ) {
     175      358953 :   return (fd_tpu_reasm_slot_t const *)( (ulong)reasm + reasm->slots_off );
     176      358953 : }
     177             : 
     178             : static inline FD_FN_PURE uint *
     179      216783 : fd_tpu_reasm_pub_slots_laddr( fd_tpu_reasm_t * reasm ) {
     180      216783 :   return (uint *)( (ulong)reasm + reasm->pub_slots_off );
     181      216783 : }
     182             : 
     183             : /* Construction API */
     184             : 
     185             : /* fd_tpu_reasm_{align,footprint} return the required alignment and
     186             :    footprint of a memory region suitable for use as a tpu_reasm that
     187             :    can reassemble up to 'reasm_max' txns concurrently.  'depth' is the
     188             :    entry count of the target mcache.  mtu is the max sz of a serialized
     189             :    txn (usually FD_TXN_MTU). */
     190             : 
     191             : FD_FN_CONST ulong
     192             : fd_tpu_reasm_align( void );
     193             : 
     194             : FD_FN_CONST ulong
     195             : fd_tpu_reasm_footprint( ulong depth,       /* Assumed in {2^0,2^1,2^2,...,2^31} */
     196             :                         ulong reasm_max ); /* Assumed in [1,2^31) */
     197             : 
     198             : FD_FN_CONST static inline ulong
     199             : fd_tpu_reasm_req_data_sz( ulong depth,
     200           3 :                           ulong reasm_max ) { /* Assumed in [1,2^31) */
     201           3 :   return (depth+reasm_max) * FD_TPU_REASM_MTU;
     202           3 : }
     203             : 
     204             : /* fd_tpu_reasm_new formats an unused memory region for use as a
     205             :    tpu_reasm.  shmem is a non-NULL pointer to this region in the local
     206             :    address space with the required footprint and alignment.  {depth,
     207             :    reasm_max,mtu} as described above.  orig is the Tango origin ID of
     208             :    this tpu_reasm.  dcache is a local join to an fd_dcache that
     209             :    tpu_reasm will write frags to.  dcache should have at least
     210             :    fd_tpu_reasm_req_data_sz() bytes of data_sz.  The dcache app region
     211             :    is ignored and not written to. */
     212             : 
     213             : void *
     214             : fd_tpu_reasm_new( void * shmem,
     215             :                   ulong  depth,     /* Assumed in {2^0,2^1,2^2,...,2^32} */
     216             :                   ulong  reasm_max, /* Assumed in [1,2^32) */
     217             :                   ulong  orig,      /* Assumed in [0,FD_FRAG_META_ORIG_MAX) */
     218             :                   void * dcache );
     219             : 
     220             : fd_tpu_reasm_t *
     221             : fd_tpu_reasm_join( void * shreasm );
     222             : 
     223             : void *
     224             : fd_tpu_reasm_leave( fd_tpu_reasm_t * reasm );
     225             : 
     226             : void *
     227             : fd_tpu_reasm_delete( void * shreasm );
     228             : 
     229             : /* Accessor API */
     230             : 
     231             : fd_tpu_reasm_slot_t *
     232             : fd_tpu_reasm_query( fd_tpu_reasm_t * reasm,
     233             :                     ulong            conn_uid,
     234             :                     ulong            stream_id );
     235             : 
     236             : FD_FN_PURE static inline fd_tpu_reasm_slot_t *
     237           0 : fd_tpu_reasm_peek_tail( fd_tpu_reasm_t * reasm ) {
     238           0 :   uint                  tail_idx = reasm->tail;
     239           0 :   fd_tpu_reasm_slot_t * tail     = fd_tpu_reasm_slots_laddr( reasm ) + tail_idx;
     240           0 :   return tail;
     241           0 : }
     242             : 
     243             : fd_tpu_reasm_slot_t *
     244             : fd_tpu_reasm_prepare( fd_tpu_reasm_t * reasm,
     245             :                       ulong            conn_uid,
     246             :                       ulong            stream_id,
     247             :                       long             tspub );
     248             : 
     249             : static inline fd_tpu_reasm_slot_t *
     250             : fd_tpu_reasm_acquire( fd_tpu_reasm_t * reasm,
     251             :                       ulong            conn_uid,
     252             :                       ulong            stream_id,
     253       76509 :                       long             tspub ) {
     254       76509 :   fd_tpu_reasm_slot_t * slot = fd_tpu_reasm_query( reasm, conn_uid, stream_id );
     255       76509 :   if( !slot ) {
     256       76125 :     slot = fd_tpu_reasm_prepare( reasm, conn_uid, stream_id, tspub );
     257       76125 :   }
     258       76509 :   return slot;
     259       76509 : }
     260             : 
     261             : /* fd_tpu_reasm_frag appends a new stream frag to the reasm slot.
     262             :    [data,data+data_sz) is the memory region containing the stream data.
     263             :    data_off is the offset of this stream data.  Slot reassembly buffer
     264             :    is appended with copy of [data,data+data_sz) on success.  On failure,
     265             :    cancels the reassembly.
     266             : 
     267             :    Return values one of:
     268             : 
     269             :      FD_TPU_REASM_SUCCESS:   success, fragment added to reassembly
     270             :      FD_TPU_REASM_EAGAIN:    incomplete
     271             :      FD_TPU_REASM_ERR_SZ:    fail, data_off + data_sz  > mtu
     272             :      FD_TPU_REASM_ERR_SKIP:  fail, data_off - slot->sz > 0
     273             : 
     274             :    Note on SKIP error:  RFC 9000 Section 2.2 specifies "QUIC makes no
     275             :    specific allowances for delivery of stream data out of order." */
     276             : 
     277             : int
     278             : fd_tpu_reasm_frag( fd_tpu_reasm_t *      reasm,
     279             :                    fd_tpu_reasm_slot_t * slot,
     280             :                    uchar const *         data,
     281             :                    ulong                 sz,
     282             :                    ulong                 off );
     283             : 
     284             : /* fd_tpu_reasm_publish completes a stream reassembly and publishes the
     285             :    message to an mcache for downstream consumption.  base is the address
     286             :    of the chunk whose index is 0 (chunk0 param of fd_chunk_to_laddr).
     287             :    {seq,sig,tspub} are mcache frag params.  If slot does not have active
     288             :    reassembly or txn parsing failed, returns NULL.  If base is not valid
     289             :    for tpu_reasm, aborts.  Final msg sz in [0,mtu+FD_CHUNK_SZ). */
     290             : 
     291             : int
     292             : fd_tpu_reasm_publish( fd_tpu_reasm_t *      reasm,
     293             :                       fd_tpu_reasm_slot_t * slot,
     294             :                       fd_frag_meta_t *      mcache,
     295             :                       void *                base,  /* Assumed aligned FD_CHUNK_ALIGN */
     296             :                       ulong                 seq,
     297             :                       long                  tspub );
     298             : 
     299             : /* fd_tpu_reasm_publish_fast is a streamlined version of acquire/frag/
     300             :    publish. */
     301             : 
     302             : int
     303             : fd_tpu_reasm_publish_fast( fd_tpu_reasm_t * reasm,
     304             :                            uchar const *    data,
     305             :                            ulong            sz,
     306             :                            fd_frag_meta_t * mcache,
     307             :                            void *           base,  /* Assumed aligned FD_CHUNK_ALIGN */
     308             :                            ulong            seq,
     309             :                            long             tspub );
     310             : 
     311             : /* fd_tpu_reasm_cancel cancels the given stream reassembly. */
     312             : 
     313             : void
     314             : fd_tpu_reasm_cancel( fd_tpu_reasm_t *      reasm,
     315             :                      fd_tpu_reasm_slot_t * slot );
     316             : 
     317             : /* fd_tpu_reasm_key_hash is an unrolled version of fd_hash (xxhash-r39) */
     318             : 
     319    78230124 : #define C1 (11400714785074694791UL)
     320    58672593 : #define C2 (14029467366897019727UL)
     321    19557531 : #define C3 ( 1609587929392839161UL)
     322    39115062 : #define C4 ( 9650029242287828579UL)
     323    19557531 : #define C5 ( 2870177450012600261UL)
     324             : 
     325             : static inline ulong
     326             : fd_tpu_reasm_key_hash( fd_tpu_reasm_key_t const * k,
     327    19557531 :                        ulong                      seed ) {
     328             : 
     329    19557531 :   ulong h  = seed + C5 + 16UL;
     330    19557531 :   ulong w0 = k->conn_uid;
     331    19557531 :   ulong w1 = k->stream_id;
     332             : 
     333    19557531 :   w0 *= C2; w0 = fd_ulong_rotate_left( w0, 31 ); w0 *= C1; h ^= w0; h = fd_ulong_rotate_left( h, 27 )*C1 + C4;
     334    19557531 :   w1 *= C2; w1 = fd_ulong_rotate_left( w1, 31 ); w1 *= C1; h ^= w1; h = fd_ulong_rotate_left( h, 27 )*C1 + C4;
     335             : 
     336             :   /* Final avalanche */
     337    19557531 :   h ^= h >> 33;
     338    19557531 :   h *= C2;
     339    19557531 :   h ^= h >> 29;
     340    19557531 :   h *= C3;
     341    19557531 :   h ^= h >> 32;
     342             : 
     343    19557531 :   return h;
     344    19557531 : }
     345             : 
     346             : #undef C1
     347             : #undef C2
     348             : #undef C3
     349             : #undef C4
     350             : #undef C5
     351             : 
     352             : FD_PROTOTYPES_END
     353             : 
     354             : #endif /* HEADER_fd_src_disco_quic_fd_tpu_h */

Generated by: LCOV version 1.14