Line data Source code
1 0 : #define SHAM_LINK_(n) FD_EXPAND_THEN_CONCAT3(SHAM_LINK_NAME,_,n) 2 : 3 : struct SHAM_LINK_NAME { 4 : fd_frag_meta_t * mcache; 5 : fd_wksp_t * wksp; 6 : ulong depth; 7 : ulong seq_expect; 8 : }; 9 : 10 : typedef struct SHAM_LINK_NAME SHAM_LINK_(t); 11 : 12 0 : static inline ulong SHAM_LINK_(align)(void) { return alignof(SHAM_LINK_(t)); } 13 0 : static inline ulong SHAM_LINK_(footprint)(void) { return sizeof(SHAM_LINK_(t)); } 14 : 15 : static inline SHAM_LINK_(t) * 16 0 : SHAM_LINK_(new)( void * mem, const char * wksp_name ) { 17 0 : SHAM_LINK_(t) * self = (SHAM_LINK_(t) *)mem; 18 0 : memset( self, 0, sizeof(SHAM_LINK_(t)) ); 19 0 : FD_LOG_NOTICE(( "attaching to workspace \"%s\"", wksp_name )); 20 0 : self->wksp = fd_wksp_attach( wksp_name ); 21 0 : if( FD_UNLIKELY( !self->wksp ) ) 22 0 : FD_LOG_ERR(( "unable to attach to \"%s\"\n\tprobably does not exist or bad permissions", wksp_name )); 23 0 : ulong offset = fd_ulong_align_up( fd_wksp_private_data_off( self->wksp->part_max ), fd_topo_workspace_align() ); 24 0 : self->mcache = fd_mcache_join( (void *)((ulong)self->wksp + offset) ); 25 0 : if( self->mcache == NULL ) { 26 0 : FD_LOG_ERR(( "failed to join a mcache" )); 27 0 : } 28 0 : return self; 29 0 : } 30 : 31 : static inline void 32 0 : SHAM_LINK_(start)( SHAM_LINK_(t) * self ) { 33 0 : fd_frag_meta_t * mcache = self->mcache; 34 0 : self->depth = fd_mcache_depth( mcache ); 35 0 : self->seq_expect = fd_mcache_seq0( mcache ); 36 0 : } 37 : 38 : static void 39 : SHAM_LINK_(during_frag)( SHAM_LINK_CONTEXT * ctx, SHAM_LINK_STATE * state, void const * msg, int sz ); 40 : 41 : static void 42 : SHAM_LINK_(after_frag)( SHAM_LINK_CONTEXT * ctx, SHAM_LINK_STATE * state ); 43 : 44 : static inline void 45 0 : SHAM_LINK_(poll)( SHAM_LINK_(t) * self, SHAM_LINK_CONTEXT * ctx, SHAM_LINK_STATE * state ) { 46 0 : while (1) { 47 0 : fd_frag_meta_t const * mline = self->mcache + fd_mcache_line_idx( self->seq_expect, self->depth ); 48 : 49 0 : ulong seq_found = fd_frag_meta_seq_query( mline ); 50 0 : long diff = fd_seq_diff( seq_found, self->seq_expect ); 51 0 : if( FD_UNLIKELY( diff ) ) { /* caught up or overrun, optimize for expected sequence number ready */ 52 0 : if( FD_UNLIKELY( diff>0L ) ) { 53 0 : FD_LOG_NOTICE(( "overrun: seq=%lu seq_found=%lu diff=%ld", self->seq_expect, seq_found, diff )); 54 0 : self->seq_expect = seq_found; 55 0 : } else { 56 : /* caught up */ 57 0 : break; 58 0 : } 59 0 : continue; 60 0 : } 61 : 62 0 : ulong chunk = mline->chunk; 63 : /* TODO: sanity check chunk,sz */ 64 0 : SHAM_LINK_(during_frag)( ctx, state, fd_chunk_to_laddr( self->wksp, chunk ), mline->sz ); 65 : 66 0 : seq_found = fd_frag_meta_seq_query( mline ); 67 0 : diff = fd_seq_diff( seq_found, self->seq_expect ); 68 0 : if( FD_UNLIKELY( diff ) ) { /* overrun, optimize for expected sequence number ready */ 69 0 : FD_LOG_NOTICE(( "overrun: seq=%lu seq_found=%lu diff=%ld", self->seq_expect, seq_found, diff )); 70 0 : self->seq_expect = seq_found; 71 0 : continue; 72 0 : } 73 : 74 0 : SHAM_LINK_(after_frag)( ctx, state ); 75 : 76 0 : self->seq_expect++; 77 0 : } 78 0 : } 79 : 80 : #undef SHAM_LINK_CONTEXT 81 : #undef SHAM_LINK_STATE 82 : #undef SHAM_LINK_NAME 83 : #undef SHAM_LINK_