Line data Source code
1 0 : #define SHAM_LINK_(n) FD_EXPAND_THEN_CONCAT3(SHAM_LINK_NAME,_,n) 2 : 3 : struct SHAM_LINK_NAME { 4 : fd_frag_meta_t * mcache; 5 : fd_wksp_t * wksp; 6 : ulong depth; 7 : ulong seq_expect; 8 : }; 9 : 10 : typedef struct SHAM_LINK_NAME SHAM_LINK_(t); 11 : 12 0 : static inline ulong SHAM_LINK_(align)(void) { return alignof(SHAM_LINK_(t)); } 13 0 : static inline ulong SHAM_LINK_(footprint)(void) { return sizeof(SHAM_LINK_(t)); } 14 : 15 : static inline SHAM_LINK_(t) * 16 0 : SHAM_LINK_(new)( void * mem, const char * wksp_name ) { 17 0 : SHAM_LINK_(t) * self = (SHAM_LINK_(t) *)mem; 18 0 : memset( self, 0, sizeof(SHAM_LINK_(t)) ); 19 0 : FD_LOG_NOTICE(( "attaching to workspace \"%s\"", wksp_name )); 20 0 : self->wksp = fd_wksp_attach( wksp_name ); 21 0 : if( FD_UNLIKELY( !self->wksp ) ) 22 0 : FD_LOG_ERR(( "unable to attach to \"%s\"\n\tprobably does not exist or bad permissions", wksp_name )); 23 0 : ulong offset = fd_ulong_align_up( fd_wksp_private_data_off( self->wksp->part_max ), fd_topo_workspace_align() ); 24 0 : self->mcache = fd_mcache_join( (void *)((ulong)self->wksp + offset) ); 25 0 : if( self->mcache == NULL ) { 26 0 : FD_LOG_ERR(( "failed to join a mcache" )); 27 0 : } 28 0 : return self; 29 0 : } 30 : 31 : static inline void 32 0 : SHAM_LINK_(start)( SHAM_LINK_(t) * self ) { 33 0 : fd_frag_meta_t * mcache = self->mcache; 34 0 : self->depth = fd_mcache_depth( mcache ); 35 0 : self->seq_expect = fd_mcache_seq0( mcache ); 36 0 : } 37 : 38 : void SHAM_LINK_(during_frag)( SHAM_LINK_CONTEXT * ctx, SHAM_LINK_STATE * state, void const * msg, int sz ); 39 : 40 : void SHAM_LINK_(after_frag)( SHAM_LINK_CONTEXT * ctx, SHAM_LINK_STATE * state ); 41 : 42 : static inline void 43 0 : SHAM_LINK_(poll)( SHAM_LINK_(t) * self, SHAM_LINK_CONTEXT * ctx, SHAM_LINK_STATE * state ) { 44 0 : while (1) { 45 0 : fd_frag_meta_t const * mline = self->mcache + fd_mcache_line_idx( self->seq_expect, self->depth ); 46 : 47 0 : ulong seq_found = fd_frag_meta_seq_query( mline ); 48 0 : long diff = fd_seq_diff( seq_found, self->seq_expect ); 49 0 : if( FD_UNLIKELY( diff ) ) { /* caught up or overrun, optimize for expected sequence number ready */ 50 0 : if( FD_UNLIKELY( diff>0L ) ) { 51 0 : FD_LOG_NOTICE(( "overrun: seq=%lu seq_found=%lu diff=%ld", self->seq_expect, seq_found, diff )); 52 0 : self->seq_expect = seq_found; 53 0 : } else { 54 : /* caught up */ 55 0 : break; 56 0 : } 57 0 : continue; 58 0 : } 59 : 60 0 : ulong chunk = mline->chunk; 61 : /* TODO: sanity check chunk,sz */ 62 0 : SHAM_LINK_(during_frag)( ctx, state, fd_chunk_to_laddr( self->wksp, chunk ), mline->sz ); 63 : 64 0 : seq_found = fd_frag_meta_seq_query( mline ); 65 0 : diff = fd_seq_diff( seq_found, self->seq_expect ); 66 0 : if( FD_UNLIKELY( diff ) ) { /* overrun, optimize for expected sequence number ready */ 67 0 : FD_LOG_NOTICE(( "overrun: seq=%lu seq_found=%lu diff=%ld", self->seq_expect, seq_found, diff )); 68 0 : self->seq_expect = seq_found; 69 0 : continue; 70 0 : } 71 : 72 0 : SHAM_LINK_(after_frag)( ctx, state ); 73 : 74 0 : self->seq_expect++; 75 0 : } 76 0 : } 77 : 78 : #undef SHAM_LINK_CONTEXT 79 : #undef SHAM_LINK_STATE 80 : #undef SHAM_LINK_NAME 81 : #undef SHAM_LINK_