Line data Source code
1 : #include "fd_circq.h" 2 : 3 : #include "../../util/log/fd_log.h" 4 : 5 : struct __attribute__((aligned(8UL))) fd_circq_message_private { 6 : ulong align; 7 : ulong footprint; 8 : 9 : /* Offset withn the circular buffer data region of where the next 10 : message starts, if there is one. This is not always the same as 11 : aligning up this message + footprint, because the next message may 12 : have wrapped around to the start of the buffer. */ 13 : ulong next; 14 : }; 15 : 16 : typedef struct fd_circq_message_private fd_circq_message_t; 17 : 18 : FD_FN_CONST ulong 19 0 : fd_circq_align( void ) { 20 0 : return FD_CIRCQ_ALIGN; 21 0 : } 22 : 23 : FD_FN_CONST ulong 24 0 : fd_circq_footprint( ulong sz ) { 25 0 : return sizeof( fd_circq_t ) + sz; 26 0 : } 27 : 28 : void * 29 : fd_circq_new( void * shmem, 30 27 : ulong sz ) { 31 27 : fd_circq_t * circq = (fd_circq_t *)shmem; 32 27 : circq->cnt = 0UL; 33 27 : circq->head = 0UL; 34 27 : circq->tail = 0UL; 35 27 : circq->size = sz; 36 27 : circq->cursor = ULONG_MAX; 37 27 : circq->cursor_seq = 0UL; 38 27 : circq->cursor_push_seq = 0UL; 39 : 40 27 : memset( &circq->metrics, 0, sizeof( circq->metrics ) ); 41 : 42 27 : return shmem; 43 27 : } 44 : 45 : fd_circq_t * 46 27 : fd_circq_join( void * shbuf ) { 47 27 : return (fd_circq_t *)shbuf; 48 27 : } 49 : 50 : void * 51 0 : fd_circq_leave( fd_circq_t * buf ) { 52 0 : return (void *)buf; 53 0 : } 54 : 55 : void * 56 0 : fd_circq_delete( void * shbuf ) { 57 0 : return shbuf; 58 0 : } 59 : 60 : static inline void FD_FN_UNUSED 61 0 : verify( fd_circq_t * circq ) { 62 0 : FD_TEST( circq->head<circq->size ); 63 0 : FD_TEST( circq->tail<circq->size ); 64 0 : FD_TEST( circq->tail!=circq->head || circq->cnt<=1 ); 65 0 : if( !circq->cnt ) { 66 0 : FD_TEST( circq->head==0UL ); 67 0 : FD_TEST( circq->tail==0UL ); 68 0 : } else if( circq->cnt==1UL ) { 69 0 : FD_TEST( circq->head==circq->tail ); 70 0 : } 71 0 : 72 0 : uchar * buf = (uchar *)(circq+1); 73 0 : 74 0 : ulong current = circq->head; 75 0 : int wrapped = 0; 76 0 : for( ulong i=0UL; i<circq->cnt; i++ ) { 77 0 : fd_circq_message_t * message = (fd_circq_message_t *)(buf+current); 78 0 : ulong start = current; 79 0 : ulong end = fd_ulong_align_up( start+sizeof( fd_circq_message_t ), message->align ) + message->footprint; 80 0 : if( wrapped ) FD_TEST( end<=circq->head ); 81 0 : FD_TEST( start<end ); 82 0 : FD_TEST( end<=circq->size ); 83 0 : current = message->next; 84 0 : if( current<start ) wrapped = 1; 85 0 : } 86 0 : } 87 : 88 : static void 89 : evict( fd_circq_t * circq, 90 : ulong from, 91 330266850 : ulong to ) { 92 330266850 : uchar * buf = (uchar *)(circq+1); 93 : 94 531593781 : for(;;) { 95 531593781 : if( FD_UNLIKELY( !circq->cnt ) ) return; 96 : 97 439888764 : fd_circq_message_t * head = (fd_circq_message_t *)(buf+circq->head); 98 : 99 439888764 : ulong start = circq->head; 100 439888764 : ulong end = fd_ulong_align_up( start + sizeof( fd_circq_message_t ), head->align ) + head->footprint; 101 : 102 439888764 : if( FD_UNLIKELY( (start<to && end>from) ) ) { 103 201326931 : circq->cnt--; 104 201326931 : circq->metrics.drop_cnt++; 105 201326931 : if( FD_LIKELY( !circq->cnt ) ) circq->head = circq->tail = 0UL; 106 109621944 : else circq->head = head->next; 107 238561833 : } else { 108 238561833 : break; 109 238561833 : } 110 439888764 : } 111 330266850 : } 112 : 113 : uchar * 114 : fd_circq_push_back( fd_circq_t * circq, 115 : ulong align, 116 201327063 : ulong footprint ) { 117 201327063 : if( FD_UNLIKELY( !fd_ulong_is_pow2( align ) ) ) { 118 0 : FD_LOG_WARNING(( "align must be a power of 2" )); 119 0 : return NULL; 120 0 : } 121 201327063 : if( FD_UNLIKELY( align>FD_CIRCQ_ALIGN ) ) { 122 0 : FD_LOG_WARNING(( "align must be at most %lu", FD_CIRCQ_ALIGN )); 123 0 : return NULL; 124 0 : } 125 : 126 201327063 : ulong required = fd_ulong_align_up( sizeof( fd_circq_message_t ), align ) + footprint; 127 201327063 : if( FD_UNLIKELY( required>circq->size ) ) { 128 3 : FD_LOG_WARNING(( "tried to push message which was too large %lu>%lu", required, circq->size )); 129 3 : return NULL; 130 3 : } 131 : 132 201327060 : uchar * buf = (uchar *)(circq+1); 133 : 134 201327060 : ulong current = 0UL; 135 201327060 : fd_circq_message_t * message = NULL; 136 201327060 : if( FD_LIKELY( circq->cnt ) ) { 137 201327030 : message = (fd_circq_message_t *)(buf+circq->tail); 138 201327030 : current = fd_ulong_align_up( fd_ulong_align_up( circq->tail+sizeof( fd_circq_message_t ), message->align )+message->footprint, alignof( fd_circq_message_t ) ); 139 201327030 : } 140 : 141 201327060 : if( FD_UNLIKELY( current+required>circq->size ) ) { 142 128939790 : evict( circq, current, circq->size ); 143 128939790 : evict( circq, 0UL, required ); 144 : 145 128939790 : circq->tail = 0UL; 146 128939790 : if( FD_LIKELY( circq->cnt && message ) ) message->next = 0UL; 147 128939790 : } else { 148 72387270 : evict( circq, current, current+required ); 149 : 150 72387270 : circq->tail = current; 151 72387270 : if( FD_LIKELY( circq->cnt && message ) ) message->next = current; 152 72387270 : } 153 : 154 201327060 : circq->cnt++; 155 201327060 : fd_circq_message_t * next_message = (fd_circq_message_t *)(buf+circq->tail); 156 201327060 : next_message->align = align; 157 201327060 : next_message->footprint = footprint; 158 201327060 : circq->cursor_push_seq++; 159 201327060 : return (uchar *)(next_message+1); 160 201327063 : } 161 : 162 : void 163 : fd_circq_resize_back( fd_circq_t * circq, 164 0 : ulong new_footprint ) { 165 0 : FD_TEST( circq->cnt ); 166 : 167 0 : uchar * buf = (uchar *)(circq+1); 168 0 : fd_circq_message_t * message = (fd_circq_message_t *)(buf+circq->tail); 169 0 : FD_TEST( new_footprint<=message->footprint ); 170 : 171 0 : message->footprint = new_footprint; 172 0 : } 173 : 174 : uchar const * 175 : fd_circq_cursor_advance( fd_circq_t * circq, 176 414 : ulong * msg_sz ) { 177 : /* First call or after reset - start from head */ 178 414 : if( FD_UNLIKELY( circq->cursor==ULONG_MAX ) ) { 179 72 : if( FD_UNLIKELY( !circq->cnt ) ) return NULL; 180 66 : circq->cursor = circq->head; 181 66 : circq->cursor_seq = circq->cursor_push_seq - circq->cnt; 182 342 : } else { 183 : /* Already iterating - move to next */ 184 342 : if( FD_UNLIKELY( circq->cursor_seq >= circq->cursor_push_seq ) ) return NULL; 185 : 186 333 : uchar * buf = (uchar *)(circq+1); 187 333 : fd_circq_message_t * message = (fd_circq_message_t *)(buf+circq->cursor); 188 333 : circq->cursor = message->next; 189 333 : } 190 : 191 399 : uchar * buf = (uchar *)(circq+1); 192 399 : fd_circq_message_t * current_msg = (fd_circq_message_t *)(buf+circq->cursor); 193 399 : circq->cursor_seq++; 194 399 : if( FD_LIKELY( msg_sz ) ) *msg_sz = current_msg->footprint; 195 399 : return (uchar *)(current_msg+1); 196 414 : } 197 : 198 : int 199 : fd_circq_pop_until( fd_circq_t * circq, 200 18 : ulong cursor ) { 201 18 : if( FD_UNLIKELY( cursor>=circq->cursor_push_seq ) ) return -1; 202 : 203 15 : ulong oldest_seq = circq->cursor_push_seq-circq->cnt; 204 15 : if( FD_UNLIKELY( cursor<oldest_seq ) ) return 0; 205 : 206 12 : ulong to_pop = fd_ulong_min( cursor-oldest_seq+1UL, circq->cnt ); 207 : 208 12 : uchar * buf = (uchar *)(circq+1); 209 36 : for( ulong i=0UL; i<to_pop; i++ ) { 210 24 : fd_circq_message_t * message = (fd_circq_message_t *)(buf+circq->head); 211 24 : circq->cnt--; 212 : 213 24 : if( FD_UNLIKELY( !circq->cnt ) ) { 214 6 : circq->head = circq->tail = 0UL; 215 18 : } else { 216 18 : circq->head = message->next; 217 18 : FD_TEST( circq->head<circq->size ); 218 18 : } 219 24 : } 220 : 221 12 : if( FD_UNLIKELY( !circq->cnt ) ) circq->cursor = ULONG_MAX; 222 12 : return 0; 223 12 : } 224 : 225 : void 226 60 : fd_circq_reset_cursor( fd_circq_t * circq ) { 227 60 : circq->cursor = ULONG_MAX; 228 60 : } 229 : 230 : ulong 231 0 : fd_circq_bytes_used( fd_circq_t const * circq ) { 232 0 : if( FD_UNLIKELY( !circq->cnt ) ) return 0UL; 233 : 234 0 : uchar const * buf = (uchar const *)(circq+1); 235 : 236 0 : fd_circq_message_t const * tail_msg = (fd_circq_message_t const *)(buf+circq->tail); 237 0 : ulong tail_end = fd_ulong_align_up( circq->tail + sizeof(fd_circq_message_t), tail_msg->align ) + tail_msg->footprint; 238 : 239 0 : if( FD_LIKELY( circq->tail>=circq->head ) ) return tail_end - circq->head; 240 0 : else return (circq->size - circq->head) + tail_end; 241 0 : }