LCOV - code coverage report
Current view: top level - disco/quic - fd_tpu.h (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 12 15 80.0 %
Date: 2024-11-13 11:58:15 Functions: 0 0 -

          Line data    Source code
       1             : #ifndef HEADER_fd_src_disco_quic_fd_tpu_h
       2             : #define HEADER_fd_src_disco_quic_fd_tpu_h
       3             : 
       4             : /* fd_tpu provides the server-side of the TPU/QUIC protocol.
       5             : 
       6             :    TPU/QUIC is the protocol used to submit transactions to a block
       7             :    producer.  For each txn to be transferred, the client opens a
       8             :    unidirectional QUIC stream and sends its serialization (see
       9             :    fd_txn_parse).  In the happy case, a txn only requires one packet.
      10             : 
      11             :    For txn exceeding MTU size, the txn is fragmented over multiple
      12             :    packets.  For more information, see the specification:
      13             :    https://github.com/solana-foundation/specs/blob/main/p2p/tpu.md */
      14             : 
      15             : #include "../fd_disco_base.h"
      16             : 
      17             : /* FD_TPU_REASM_MTU is the max tango frag sz sent by an fd_tpu_reasm_t.
      18             :    FD_TPU_REASM_CHUNK_MTU*FD_CHUNK_SZ == FD_TPU_REASM_MTU */
      19             : 
      20      624429 : #define FD_TPU_REASM_CHUNK_MTU (FD_ULONG_ALIGN_UP( FD_TPU_MTU, FD_CHUNK_SZ )>>FD_CHUNK_LG_SZ)
      21      624429 : #define FD_TPU_REASM_MTU       (FD_TPU_REASM_CHUNK_MTU<<FD_CHUNK_LG_SZ)
      22             : 
      23             : #define FD_TPU_REASM_ALIGN FD_CHUNK_ALIGN
      24             : #define FD_TPU_REASM_FOOTPRINT( depth, burst )                                                              \
      25           6 :   FD_LAYOUT_FINI( FD_LAYOUT_APPEND( FD_LAYOUT_APPEND( FD_LAYOUT_APPEND ( FD_LAYOUT_APPEND ( FD_LAYOUT_INIT, \
      26           6 :     FD_TPU_REASM_ALIGN,           sizeof(fd_tpu_reasm_t)                        ), /* hdr       */           \
      27           6 :     alignof(uint),                 (depth)         *sizeof(uint)                ), /* pub_slots */           \
      28           6 :     alignof(fd_tpu_reasm_slot_t), ((depth)+(burst))*sizeof(fd_tpu_reasm_slot_t) ), /* slots     */           \
      29           6 :     FD_CHUNK_ALIGN,               ((depth)+(burst))*FD_TPU_REASM_MTU            ), /* chunks    */           \
      30           6 :     FD_TPU_REASM_ALIGN )
      31             : 
      32             : /* FD_TPU_REASM_{SUCCESS,ERR_{...}} are error codes.  These values are
      33             :    persisted to logs.  Entries should not be renumbered and numeric
      34             :    values should never be reused. */
      35             : 
      36      131748 : #define FD_TPU_REASM_SUCCESS   (0)
      37           0 : #define FD_TPU_REASM_ERR_SZ    (1)  /* oversz msg */
      38           0 : #define FD_TPU_REASM_ERR_SKIP  (2)  /* out-of-order data within QUIC stream */
      39           0 : #define FD_TPU_REASM_ERR_STATE (3)  /* unexpected slot state */
      40             : 
      41             : /* FD_TPU_REASM_STATE_{...} are reasm slot states */
      42             : 
      43    19490475 : #define FD_TPU_REASM_STATE_FREE ((uchar)0)  /* free */
      44      151467 : #define FD_TPU_REASM_STATE_BUSY ((uchar)1)  /* active reassembly */
      45    19555722 : #define FD_TPU_REASM_STATE_PUB  ((uchar)2)  /* published */
      46             : 
      47             : /* fd_tpu_reasm_t handles incoming data fragments of TPU/QUIC streams.
      48             :    Frags are expected to be provided via fd_quic callback.  Each
      49             :    tpu_reasm object may only serve a single fd_quic object.  Dispatches
      50             :    reassembled messages to an mcache.)  Should not be persisted.
      51             : 
      52             :    ### Flow Control
      53             : 
      54             :    fd_tpu_reasm is wired up as follows:
      55             : 
      56             :      ┌────────┐           ┌───────┐       ┌────────┐
      57             :      │  QUIC  │ callbacks │ tpu_  │ tango │ sig_   │
      58             :      │ Server ├───────────► reasm ├───────► verify │
      59             :      └────────┘           └───────┘       └────────┘
      60             : 
      61             :    Neither of the pictured links backpressure.  Packet loss occurs if
      62             :    (1) the QUIC server accepts more concurrent streams than available
      63             :    reassembly slots.  Also if (2) the bank of sig verify tiles is too
      64             :    slow to keepup with incoming transactions.
      65             : 
      66             :    The application should thus adjust the QUIC server to throttle the
      67             :    concurrent stream count and transaction rate to appropriate levels.
      68             :    (Via QUIC connection quotas)
      69             : 
      70             :    The tpu_reasm MUST be the only writer to the mcache.
      71             : 
      72             :    ### Eviction Policy
      73             : 
      74             :    Aforementioned case 1 specifically happens whenever the QUIC server
      75             :    accepts a stream and tpu_reasm doesn't find a free slot.  tpu_reasm
      76             :    hardcodes a FIFO eviction policy to handle this case by cancelling
      77             :    the least recently prepared reassembly.  This also guarantees that
      78             :    unfragmented transaction never get dropped.
      79             : 
      80             :    ### Internals
      81             : 
      82             :    fd_tpu_reasm internally manages an array of message reassembly
      83             :    buffers.  Each of these is called a "slot" (fd_tpu_reasm_slot_t).
      84             : 
      85             :    Slots are either owned by the reassembly fifo (FREE, BUSY states), or
      86             :    the mcache (PUB state).  The ownership separation prevents in-flight
      87             :    reassemblies from thrashing data exposed to consumers via the mcache.
      88             :    (Data races transitioning between reassembly and fifo ownership are
      89             :    handled by the speculative receive pattern.)
      90             : 
      91             :    The lifecycle of a slot is:
      92             : 
      93             :             prepare()  publish()
      94             :      ┌─► FREE ───► BUSY ───► PUB ─┐
      95             :      │              │             │
      96             :      ▲              ▼ cancel()    ▼ implied by a later
      97             :      │              │             │ publish()/cancel()
      98             :      └──────◄───────┴──────◄──────┘
      99             : 
     100             :    prepare: The transition from FREE to BUSY occurs when a new QUIC
     101             :             stream is accepted.
     102             :    cancel:  The transition from BUSY to FREE occurs when stream/txn
     103             :             reassembly is aborted.  This can happen for whatever
     104             :             explicit reason (peer kicked, network error), or implicitly
     105             :             when prepare() is called but no free slot was found.
     106             :    publish: The transition from BUSY to PUB occurs when a slot holding
     107             :             a complete txn is made visible to downstream consumers.
     108             :             This moves a slot from the reassembly fifo to the mcache.
     109             : 
     110             :    The transition from PUB to FREE also occurs at the same time (for a
     111             :    different slot).  This moves the least recently published slot from
     112             :    the mcache into the reassembly fifo.  This keeps the number of slots
     113             :    owned by the mcache at _exactly_ depth at all times and exactly
     114             :    mirroring the set of packets exposed downstream (notwithstanding a
     115             :    startup transient of up to depth packets).  This also guarantees that
     116             :    the number of slots in the FREE and BUSY states is kept at _exactly_
     117             :    burst at all times.
     118             : 
     119             :    In order to support the above, the 'pub_slots' lookup table tracks
     120             :    which published mcache lines (indexed by `seq % depth`) correspond to
     121             :    which slot indexes. */
     122             : 
     123             : struct fd_tpu_reasm_slot;
     124             : typedef struct fd_tpu_reasm_slot fd_tpu_reasm_slot_t;
     125             : 
     126             : struct __attribute__((aligned(FD_TPU_REASM_ALIGN))) fd_tpu_reasm {
     127             :   ulong magic;  /* ==FD_TPU_REASM_MAGIC */
     128             : 
     129             :   ulong slots_off;     /* slots mem     */
     130             :   ulong pub_slots_off; /* pub_slots mem */
     131             :   ulong chunks_off;    /* payload mem   */
     132             : 
     133             :   uint   depth;       /* mcache depth */
     134             :   uint   burst;       /* max concurrent reassemblies */
     135             : 
     136             :   uint   head;        /* least recent reassembly */
     137             :   uint   tail;        /* most  recent reassembly */
     138             : 
     139             :   uint   slot_cnt;
     140             :   ushort orig;        /* tango orig */
     141             : };
     142             : 
     143             : typedef struct fd_tpu_reasm fd_tpu_reasm_t;
     144             : 
     145             : /* fd_tpu_reasm_slot_t holds a message reassembly buffer. */
     146             : 
     147             : struct __attribute__((aligned(16UL))) fd_tpu_reasm_slot {
     148             : 
     149             :   ulong  conn_id;
     150             :   ulong  stream_id;
     151             : 
     152             :   /* Private fields ... */
     153             : 
     154             :   uint   prev_idx;  /* unused for now */
     155             :   uint   next_idx;
     156             : 
     157             :   uint   tsorig;
     158             :   ushort sz;
     159             :   uchar  state;
     160             : };
     161             : 
     162             : FD_PROTOTYPES_BEGIN
     163             : 
     164             : /* Construction API */
     165             : 
     166             : /* fd_tpu_reasm_{align,footprint} return the required alignment and
     167             :    footprint of a memory region suitable for use as a tpu_reasm that
     168             :    can reassemble up to 'burst' txns concurrently.  'depth' is the
     169             :    entry count of the target mcache.  mtu is the max sz of a serialized
     170             :    txn (usually FD_TXN_MTU). */
     171             : 
     172             : FD_FN_CONST ulong
     173             : fd_tpu_reasm_align( void );
     174             : 
     175             : FD_FN_CONST ulong
     176             : fd_tpu_reasm_footprint( ulong depth,  /* Assumed in {2^0,2^1,2^2,...,2^31} */
     177             :                         ulong burst   /* Assumed in [1,2^31) */ );
     178             : 
     179             : /* fd_tpu_reasm_new formats an unused memory region for use as a
     180             :    tpu_reasm.  shmem is a non-NULL pointer to this region in the local
     181             :    address space with the required footprint and alignment.  {depth,
     182             :    burst,mtu} as described above.  orig is the Tango origin ID of this
     183             :    tpu_reasm. */
     184             : 
     185             : void *
     186             : fd_tpu_reasm_new( void * shmem,
     187             :                   ulong  depth,  /* Assumed in {2^0,2^1,2^2,...,2^32} */
     188             :                   ulong  burst,  /* Assumed in [1,2^32) */
     189             :                   ulong  orig    /* Assumed in [0,FD_FRAG_META_ORIG_MAX) */ );
     190             : 
     191             : fd_tpu_reasm_t *
     192             : fd_tpu_reasm_join( void * shreasm );
     193             : 
     194             : void *
     195             : fd_tpu_reasm_leave( fd_tpu_reasm_t * reasm );
     196             : 
     197             : void *
     198             : fd_tpu_reasm_delete( void * shreasm );
     199             : 
     200             : /* fd_tpu_reasm_{chunk0,wmark} returns the chunk index of the {lowest,
     201             :    highest} possible chunk value that fd_tpu_reasm_publish will write to
     202             :    an mcache. */
     203             : 
     204             : FD_FN_CONST ulong
     205             : fd_tpu_reasm_chunk0( fd_tpu_reasm_t const * reasm,
     206             :                      void const *           base );
     207             : 
     208             : 
     209             : FD_FN_CONST ulong
     210             : fd_tpu_reasm_wmark( fd_tpu_reasm_t const * reasm,
     211             :                     void const *           base );
     212             : 
     213             : /* Accessor API */
     214             : 
     215             : /* fd_tpu_reasm_prepare starts a new stream reassembly.  If more than
     216             :    'burst' reassemblies are active, cancels the oldest active.  Returns
     217             :    a pointer to the acquired slot.  User is expected to set conn_id and
     218             :    stream_id of slot. */
     219             : 
     220             : fd_tpu_reasm_slot_t *
     221             : fd_tpu_reasm_prepare( fd_tpu_reasm_t * reasm,
     222             :                       ulong            tsorig );
     223             : 
     224             : /* fd_tpu_reasm_append appends a new stream frag to the reasm slot.
     225             :    [data,data+data_sz) is the memory region containing the stream data.
     226             :    data_off is the offset of this stream data.  Slot reassembly buffer
     227             :    is appended with copy of [data,data+data_sz) on success.  On failure,
     228             :    cancels the reassembly.
     229             : 
     230             :    Return values one of:
     231             : 
     232             :      FD_TPU_REASM_SUCCESS:   success, fragment added to reassembly
     233             :      FD_TPU_REASM_ERR_SZ:    fail, data_off + data_sz  > mtu
     234             :      FD_TPU_REASM_ERR_SKIP:  fail, data_off - slot->sz > 0
     235             : 
     236             :    Note on SKIP error:  RFC 9000 Section 2.2 specifies "QUIC makes no
     237             :    specific allowances for delivery of stream data out of order." */
     238             : 
     239             : int
     240             : fd_tpu_reasm_append( fd_tpu_reasm_t *      reasm,
     241             :                      fd_tpu_reasm_slot_t * slot,
     242             :                      uchar const *         data,
     243             :                      ulong                 data_sz,
     244             :                      ulong                 data_off );
     245             : 
     246             : /* fd_tpu_reasm_publish completes a stream reassembly and publishes the
     247             :    message to an mcache for downstream consumption.  base is the address
     248             :    of the chunk whose index is 0 (chunk0 param of fd_chunk_to_laddr).
     249             :    {seq,sig,tspub} are mcache frag params.  If slot does not have active
     250             :    reassembly or txn parsing failed, returns NULL.  If base is not valid
     251             :    for tpu_reasm, aborts.  Final msg sz in [0,mtu+FD_CHUNK_SZ). */
     252             : 
     253             : int
     254             : fd_tpu_reasm_publish( fd_tpu_reasm_t *      reasm,
     255             :                       fd_tpu_reasm_slot_t * slot,
     256             :                       fd_frag_meta_t *      mcache,
     257             :                       void *                base,  /* Assumed aligned FD_CHUNK_ALIGN */
     258             :                       ulong                 seq,
     259             :                       ulong                 tspub );
     260             : 
     261             : /* fd_tpu_reasm_cancel cancels the given stream reassembly. */
     262             : 
     263             : void
     264             : fd_tpu_reasm_cancel( fd_tpu_reasm_t *      reasm,
     265             :                      fd_tpu_reasm_slot_t * slot );
     266             : 
     267             : FD_PROTOTYPES_END
     268             : 
     269             : #endif /* HEADER_fd_src_disco_quic_fd_tpu_h */

Generated by: LCOV version 1.14