Line data Source code
1 : #include "../../disco/topo/fd_topo.h" 2 : #include "../../util/wksp/fd_wksp_private.h" /* why does this depend on private APIs? */ 3 : 4 0 : #define SHAM_LINK_(n) FD_EXPAND_THEN_CONCAT3(SHAM_LINK_NAME,_,n) 5 : 6 : struct SHAM_LINK_NAME { 7 : fd_frag_meta_t * mcache; 8 : fd_wksp_t * wksp; 9 : ulong depth; 10 : ulong seq_expect; 11 : }; 12 : 13 : typedef struct SHAM_LINK_NAME SHAM_LINK_(t); 14 : 15 0 : static inline ulong SHAM_LINK_(align)(void) { return alignof(SHAM_LINK_(t)); } 16 0 : static inline ulong SHAM_LINK_(footprint)(void) { return sizeof(SHAM_LINK_(t)); } 17 : 18 : static inline SHAM_LINK_(t) * 19 0 : SHAM_LINK_(new)( void * mem, const char * wksp_name ) { 20 0 : SHAM_LINK_(t) * self = (SHAM_LINK_(t) *)mem; 21 0 : memset( self, 0, sizeof(SHAM_LINK_(t)) ); 22 0 : FD_LOG_NOTICE(( "attaching to workspace \"%s\"", wksp_name )); 23 0 : self->wksp = fd_wksp_attach( wksp_name ); 24 0 : if( FD_UNLIKELY( !self->wksp ) ) 25 0 : FD_LOG_ERR(( "unable to attach to \"%s\"\n\tprobably does not exist or bad permissions", wksp_name )); 26 0 : ulong offset = fd_ulong_align_up( fd_wksp_private_data_off( self->wksp->part_max ), fd_topo_workspace_align() ); 27 0 : self->mcache = fd_mcache_join( (void *)((ulong)self->wksp + offset) ); 28 0 : if( self->mcache == NULL ) { 29 0 : FD_LOG_ERR(( "failed to join a mcache" )); 30 0 : } 31 0 : return self; 32 0 : } 33 : 34 : static inline void 35 0 : SHAM_LINK_(start)( SHAM_LINK_(t) * self ) { 36 0 : fd_frag_meta_t * mcache = self->mcache; 37 0 : self->depth = fd_mcache_depth( mcache ); 38 0 : self->seq_expect = fd_mcache_seq0( mcache ); 39 0 : } 40 : 41 : static void 42 : SHAM_LINK_(during_frag)( SHAM_LINK_CONTEXT * ctx, SHAM_LINK_STATE * state, void const * msg, int sz ); 43 : 44 : static void 45 : SHAM_LINK_(after_frag)( SHAM_LINK_CONTEXT * ctx, SHAM_LINK_STATE * state ); 46 : 47 : static inline void 48 0 : SHAM_LINK_(poll)( SHAM_LINK_(t) * self, SHAM_LINK_CONTEXT * ctx, SHAM_LINK_STATE * state ) { 49 0 : while (1) { 50 0 : fd_frag_meta_t const * mline = self->mcache + fd_mcache_line_idx( self->seq_expect, self->depth ); 51 : 52 0 : ulong seq_found = fd_frag_meta_seq_query( mline ); 53 0 : long diff = fd_seq_diff( seq_found, self->seq_expect ); 54 0 : if( FD_UNLIKELY( diff ) ) { /* caught up or overrun, optimize for expected sequence number ready */ 55 0 : if( FD_UNLIKELY( diff>0L ) ) { 56 0 : FD_LOG_NOTICE(( "overrun: seq=%lu seq_found=%lu diff=%ld", self->seq_expect, seq_found, diff )); 57 0 : self->seq_expect = seq_found; 58 0 : } else { 59 : /* caught up */ 60 0 : break; 61 0 : } 62 0 : continue; 63 0 : } 64 : 65 0 : ulong chunk = mline->chunk; 66 : /* TODO: sanity check chunk,sz */ 67 0 : SHAM_LINK_(during_frag)( ctx, state, fd_chunk_to_laddr( self->wksp, chunk ), mline->sz ); 68 : 69 0 : seq_found = fd_frag_meta_seq_query( mline ); 70 0 : diff = fd_seq_diff( seq_found, self->seq_expect ); 71 0 : if( FD_UNLIKELY( diff ) ) { /* overrun, optimize for expected sequence number ready */ 72 0 : FD_LOG_NOTICE(( "overrun: seq=%lu seq_found=%lu diff=%ld", self->seq_expect, seq_found, diff )); 73 0 : self->seq_expect = seq_found; 74 0 : continue; 75 0 : } 76 : 77 0 : SHAM_LINK_(after_frag)( ctx, state ); 78 : 79 0 : self->seq_expect++; 80 0 : } 81 0 : } 82 : 83 : #undef SHAM_LINK_CONTEXT 84 : #undef SHAM_LINK_STATE 85 : #undef SHAM_LINK_NAME 86 : #undef SHAM_LINK_