Line data Source code
1 : #include "fd_accdb_pipe.h" 2 : #include "fd_accdb_ref.h" 3 : #include "fd_accdb_sync.h" 4 : 5 : /* ro_pipe state machine 6 : 7 : BATCH: generating request batch 8 : DRAIN: waiting for user to poll responses 9 : 10 : On creation, ro_pipe is in BATCH state. 11 : BATCH->DRAIN transitioned when flush is requested (open_ro_multi) 12 : DRAIN->BATCH transitioned when user processed all completions (close_ro_multi) */ 13 : 14 0 : #define RO_PIPE_STATE_BATCH 0 15 0 : #define RO_PIPE_STATE_DRAIN 1 16 : 17 : fd_accdb_ro_pipe_t * 18 : fd_accdb_ro_pipe_init( fd_accdb_ro_pipe_t * pipe, 19 : fd_accdb_user_t * accdb, 20 0 : fd_funk_txn_xid_t const * xid ) { 21 0 : if( FD_UNLIKELY( !pipe ) ) FD_LOG_CRIT(( "NULL pipe" )); 22 0 : if( FD_UNLIKELY( !accdb ) ) FD_LOG_CRIT(( "NULL accdb" )); 23 0 : if( FD_UNLIKELY( !xid ) ) FD_LOG_CRIT(( "NULL xid" )); 24 : 25 : /* Partial init style because pipe is large O(~64KB) */ 26 0 : pipe->accdb = accdb; 27 0 : pipe->xid = *xid; 28 0 : pipe->batch_idx = 0UL; 29 0 : pipe->req_max = (uint)fd_ulong_min( fd_accdb_batch_max( accdb ), FD_ACCDB_RO_PIPE_MAX ); 30 0 : pipe->req_cnt = 0U; 31 0 : pipe->req_comp = 0U; 32 0 : pipe->state = RO_PIPE_STATE_BATCH; 33 : 34 0 : return pipe; 35 0 : } 36 : 37 : void 38 0 : fd_accdb_ro_pipe_fini( fd_accdb_ro_pipe_t * pipe ) { 39 0 : if( pipe->state==RO_PIPE_STATE_DRAIN ) { 40 0 : fd_accdb_close_ro_multi( pipe->accdb, pipe->ro, pipe->req_cnt ); 41 0 : } 42 0 : pipe->state = RO_PIPE_STATE_BATCH; 43 0 : pipe->req_cnt = 0U; 44 0 : pipe->req_comp = 0U; 45 0 : } 46 : 47 : void 48 : fd_accdb_ro_pipe_enqueue( fd_accdb_ro_pipe_t * pipe, 49 0 : void const * address ) { 50 0 : if( FD_UNLIKELY( pipe->state!=RO_PIPE_STATE_BATCH || 51 0 : pipe->req_cnt>=FD_ACCDB_RO_PIPE_MAX ) ) { 52 0 : FD_LOG_CRIT(( "ro_pipe_enqueue failed: not ready for new requests (poll() required for next request)" )); 53 0 : } 54 0 : if( FD_UNLIKELY( pipe->req_max==0UL ) ) { 55 0 : FD_LOG_CRIT(( "ro_pipe_enqueue failed: req_max is zero" )); 56 0 : } 57 : 58 0 : memcpy( pipe->addr[ pipe->req_cnt ], address, 32UL ); 59 0 : pipe->req_cnt++; 60 0 : FD_CRIT( pipe->req_max<=FD_ACCDB_RO_PIPE_MAX, "req_max corrupt" ); 61 0 : if( pipe->req_cnt>=pipe->req_max ) { 62 0 : fd_accdb_ro_pipe_flush( pipe ); 63 0 : } 64 0 : } 65 : 66 : void 67 0 : fd_accdb_ro_pipe_flush( fd_accdb_ro_pipe_t * pipe ) { 68 0 : if( pipe->state!=RO_PIPE_STATE_BATCH ) return; 69 0 : fd_accdb_open_ro_multi( pipe->accdb, pipe->ro, &pipe->xid, pipe->addr, pipe->req_cnt ); 70 0 : pipe->state = RO_PIPE_STATE_DRAIN; 71 0 : pipe->req_comp = 0U; 72 0 : } 73 : 74 : fd_accdb_ro_t * 75 0 : fd_accdb_ro_pipe_poll( fd_accdb_ro_pipe_t * pipe ) { 76 0 : if( pipe->state!=RO_PIPE_STATE_DRAIN ) return NULL; 77 0 : if( pipe->req_comp==pipe->req_cnt ) { 78 0 : fd_accdb_close_ro_multi( pipe->accdb, pipe->ro, pipe->req_cnt ); 79 0 : pipe->state = RO_PIPE_STATE_BATCH; 80 0 : pipe->req_cnt = 0U; 81 0 : pipe->req_comp = 0U; 82 0 : return NULL; 83 0 : } 84 : 85 0 : ulong idx = pipe->req_comp++; 86 0 : fd_accdb_ro_t * ro = &pipe->ro[ idx ]; 87 0 : if( FD_UNLIKELY( !ro->meta || !ro->meta->lamports ) ) { 88 0 : ro = pipe->ro_nx; 89 0 : return fd_accdb_ro_init_empty( ro, pipe->addr[ idx ] ); 90 0 : } 91 0 : return ro; 92 0 : }