Line data Source code
1 : #include "fd_circq.h" 2 : 3 : #include "../../util/log/fd_log.h" 4 : 5 : struct __attribute__((aligned(8UL))) fd_circq_message_private { 6 : ulong align; 7 : ulong footprint; 8 : 9 : /* Offset withn the circular buffer data region of where the next 10 : message starts, if there is one. This is not always the same as 11 : aligning up this message + footprint, because the next message may 12 : have wrapped around to the start of the buffer. */ 13 : ulong next; 14 : }; 15 : 16 : typedef struct fd_circq_message_private fd_circq_message_t; 17 : 18 : FD_FN_CONST ulong 19 0 : fd_circq_align( void ) { 20 0 : return FD_CIRCQ_ALIGN; 21 0 : } 22 : 23 : FD_FN_CONST ulong 24 0 : fd_circq_footprint( ulong sz ) { 25 0 : return sizeof( fd_circq_t ) + sz; 26 0 : } 27 : 28 : void * 29 : fd_circq_new( void * shmem, 30 36 : ulong sz ) { 31 36 : fd_circq_t * circq = (fd_circq_t *)shmem; 32 36 : circq->cnt = 0UL; 33 36 : circq->head = 0UL; 34 36 : circq->tail = 0UL; 35 36 : circq->size = sz; 36 36 : circq->cursor = ULONG_MAX; 37 36 : circq->cursor_seq = 0UL; 38 36 : circq->cursor_push_seq = 0UL; 39 : 40 36 : memset( &circq->metrics, 0, sizeof( circq->metrics ) ); 41 : 42 36 : return shmem; 43 36 : } 44 : 45 : fd_circq_t * 46 36 : fd_circq_join( void * shbuf ) { 47 36 : return (fd_circq_t *)shbuf; 48 36 : } 49 : 50 : void * 51 0 : fd_circq_leave( fd_circq_t * buf ) { 52 0 : return (void *)buf; 53 0 : } 54 : 55 : void * 56 0 : fd_circq_delete( void * shbuf ) { 57 0 : return shbuf; 58 0 : } 59 : 60 : static inline void FD_FN_UNUSED 61 0 : verify( fd_circq_t * circq ) { 62 0 : FD_TEST( circq->head<circq->size ); 63 0 : FD_TEST( circq->tail<circq->size ); 64 0 : FD_TEST( circq->tail!=circq->head || circq->cnt<=1 ); 65 0 : if( !circq->cnt ) { 66 0 : FD_TEST( circq->head==0UL ); 67 0 : FD_TEST( circq->tail==0UL ); 68 0 : } else if( circq->cnt==1UL ) { 69 0 : FD_TEST( circq->head==circq->tail ); 70 0 : } 71 0 : 72 0 : uchar * buf = (uchar *)(circq+1); 73 0 : 74 0 : ulong current = circq->head; 75 0 : int wrapped = 0; 76 0 : for( ulong i=0UL; i<circq->cnt; i++ ) { 77 0 : fd_circq_message_t * message = (fd_circq_message_t *)(buf+current); 78 0 : ulong start = current; 79 0 : ulong end = fd_ulong_align_up( start+sizeof( fd_circq_message_t ), message->align ) + message->footprint; 80 0 : if( wrapped ) FD_TEST( end<=circq->head ); 81 0 : FD_TEST( start<end ); 82 0 : FD_TEST( end<=circq->size ); 83 0 : current = message->next; 84 0 : if( current<start ) wrapped = 1; 85 0 : } 86 0 : } 87 : 88 : /* Recover from eviction logic removing elements at the cursor */ 89 : 90 : static inline void 91 12583278 : overrun_recover( fd_circq_t * circq ) { 92 12583278 : if( FD_UNLIKELY( circq->cursor==ULONG_MAX ) ) return; 93 : 94 300 : ulong oldest_seq = circq->cursor_push_seq - circq->cnt; 95 300 : if( FD_UNLIKELY( circq->cursor_seq<=oldest_seq ) ) circq->cursor = ULONG_MAX; 96 300 : } 97 : 98 : static void 99 : evict( fd_circq_t * circq, 100 : ulong from, 101 20642571 : ulong to ) { 102 20642571 : uchar * buf = (uchar *)(circq+1); 103 : 104 33225831 : for(;;) { 105 33225831 : if( FD_UNLIKELY( !circq->cnt ) ) return; 106 : 107 27493179 : fd_circq_message_t * head = (fd_circq_message_t *)(buf+circq->head); 108 : 109 27493179 : ulong start = circq->head; 110 27493179 : ulong end = fd_ulong_align_up( start + sizeof( fd_circq_message_t ), head->align ) + head->footprint; 111 : 112 27493179 : if( FD_UNLIKELY( (start<to && end>from) ) ) { 113 12583260 : circq->cnt--; 114 12583260 : circq->metrics.drop_cnt++; 115 12583260 : if( FD_LIKELY( !circq->cnt ) ) circq->head = circq->tail = 0UL; 116 6850647 : else circq->head = head->next; 117 12583260 : overrun_recover( circq ); 118 14909919 : } else { 119 14909919 : break; 120 14909919 : } 121 27493179 : } 122 20642571 : } 123 : 124 : uchar * 125 : fd_circq_push_back( fd_circq_t * circq, 126 : ulong align, 127 12583410 : ulong footprint ) { 128 12583410 : if( FD_UNLIKELY( !fd_ulong_is_pow2( align ) ) ) { 129 0 : FD_LOG_WARNING(( "align must be a power of 2" )); 130 0 : return NULL; 131 0 : } 132 12583410 : if( FD_UNLIKELY( align>FD_CIRCQ_ALIGN ) ) { 133 0 : FD_LOG_WARNING(( "align must be at most %lu", FD_CIRCQ_ALIGN )); 134 0 : return NULL; 135 0 : } 136 : 137 12583410 : ulong required = fd_ulong_align_up( sizeof( fd_circq_message_t ), align ) + footprint; 138 12583410 : if( FD_UNLIKELY( required>circq->size ) ) { 139 3 : FD_LOG_WARNING(( "tried to push message which was too large %lu>%lu", required, circq->size )); 140 3 : return NULL; 141 3 : } 142 : 143 12583407 : uchar * buf = (uchar *)(circq+1); 144 : 145 12583407 : ulong current = 0UL; 146 12583407 : fd_circq_message_t * message = NULL; 147 12583407 : if( FD_LIKELY( circq->cnt ) ) { 148 12583368 : message = (fd_circq_message_t *)(buf+circq->tail); 149 12583368 : current = fd_ulong_align_up( fd_ulong_align_up( circq->tail+sizeof( fd_circq_message_t ), message->align )+message->footprint, alignof( fd_circq_message_t ) ); 150 12583368 : } 151 : 152 12583407 : if( FD_UNLIKELY( current+required>circq->size ) ) { 153 8059164 : evict( circq, current, circq->size ); 154 8059164 : evict( circq, 0UL, required ); 155 : 156 8059164 : circq->tail = 0UL; 157 8059164 : if( FD_LIKELY( circq->cnt && message ) ) message->next = 0UL; 158 8059164 : } else { 159 4524243 : evict( circq, current, current+required ); 160 : 161 4524243 : circq->tail = current; 162 4524243 : if( FD_LIKELY( circq->cnt && message ) ) message->next = current; 163 4524243 : } 164 : 165 12583407 : circq->cnt++; 166 12583407 : fd_circq_message_t * next_message = (fd_circq_message_t *)(buf+circq->tail); 167 12583407 : next_message->align = align; 168 12583407 : next_message->footprint = footprint; 169 12583407 : next_message->next = ULONG_MAX; 170 12583407 : circq->cursor_push_seq++; 171 12583407 : return (uchar *)(next_message+1); 172 12583410 : } 173 : 174 : void 175 : fd_circq_resize_back( fd_circq_t * circq, 176 0 : ulong new_footprint ) { 177 0 : FD_TEST( circq->cnt ); 178 : 179 0 : uchar * buf = (uchar *)(circq+1); 180 0 : fd_circq_message_t * message = (fd_circq_message_t *)(buf+circq->tail); 181 0 : FD_TEST( new_footprint<=message->footprint ); 182 : 183 0 : message->footprint = new_footprint; 184 0 : } 185 : 186 : uchar const * 187 : fd_circq_cursor_advance( fd_circq_t * circq, 188 441 : ulong * msg_sz ) { 189 : /* First call or after reset - start from head */ 190 441 : if( FD_UNLIKELY( circq->cursor==ULONG_MAX ) ) { 191 309 : if( FD_UNLIKELY( !circq->cnt ) ) return NULL; 192 303 : circq->cursor = circq->head; 193 303 : circq->cursor_seq = circq->cursor_push_seq - circq->cnt; 194 303 : } else { 195 : /* Already iterating - move to next */ 196 132 : if( FD_UNLIKELY( circq->cursor_seq >= circq->cursor_push_seq ) ) return NULL; 197 : 198 120 : uchar * buf = (uchar *)(circq+1); 199 120 : fd_circq_message_t * message = (fd_circq_message_t *)(buf+circq->cursor); 200 120 : circq->cursor = message->next; 201 120 : } 202 : 203 423 : uchar * buf = (uchar *)(circq+1); 204 423 : fd_circq_message_t * current_msg = (fd_circq_message_t *)(buf+circq->cursor); 205 423 : circq->cursor_seq++; 206 423 : if( FD_LIKELY( msg_sz ) ) *msg_sz = current_msg->footprint; 207 423 : return (uchar *)(current_msg+1); 208 441 : } 209 : 210 : int 211 : fd_circq_pop_until( fd_circq_t * circq, 212 27 : ulong cursor ) { 213 27 : if( FD_UNLIKELY( cursor>=circq->cursor_seq ) ) return -1; 214 : 215 21 : ulong oldest_seq = circq->cursor_push_seq-circq->cnt; 216 21 : if( FD_UNLIKELY( cursor<oldest_seq ) ) return 0; 217 : 218 18 : ulong to_pop = fd_ulong_min( cursor-oldest_seq+1UL, circq->cnt ); 219 : 220 18 : uchar * buf = (uchar *)(circq+1); 221 48 : for( ulong i=0UL; i<to_pop; i++ ) { 222 30 : fd_circq_message_t * message = (fd_circq_message_t *)(buf+circq->head); 223 30 : circq->cnt--; 224 : 225 30 : if( FD_UNLIKELY( !circq->cnt ) ) { 226 6 : circq->head = circq->tail = 0UL; 227 24 : } else { 228 24 : circq->head = message->next; 229 24 : FD_TEST( circq->head<circq->size ); 230 24 : } 231 30 : } 232 : 233 18 : if( FD_UNLIKELY( !circq->cnt ) ) circq->cursor = ULONG_MAX; 234 18 : overrun_recover( circq ); 235 18 : return 0; 236 18 : } 237 : 238 : void 239 63 : fd_circq_reset_cursor( fd_circq_t * circq ) { 240 63 : circq->cursor = ULONG_MAX; 241 63 : } 242 : 243 : ulong 244 0 : fd_circq_bytes_used( fd_circq_t const * circq ) { 245 0 : if( FD_UNLIKELY( !circq->cnt ) ) return 0UL; 246 : 247 0 : uchar const * buf = (uchar const *)(circq+1); 248 : 249 0 : fd_circq_message_t const * tail_msg = (fd_circq_message_t const *)(buf+circq->tail); 250 0 : ulong tail_end = fd_ulong_align_up( circq->tail + sizeof(fd_circq_message_t), tail_msg->align ) + tail_msg->footprint; 251 : 252 0 : if( FD_LIKELY( circq->tail>=circq->head ) ) return tail_end - circq->head; 253 0 : else return (circq->size - circq->head) + tail_end; 254 0 : } 255 : 256 : ulong 257 0 : fd_circq_unsent_cnt( fd_circq_t const * circq ) { 258 0 : if( FD_UNLIKELY( circq->cursor==ULONG_MAX ) ) return circq->cnt; 259 0 : return fd_ulong_min( circq->cursor_push_seq - circq->cursor_seq, circq->cnt ); 260 0 : }