Line data Source code
1 : #include "fd_pending_slots.h"
2 :
3 : void *
4 0 : fd_pending_slots_new( void * mem, ulong lo_wmark ) {
5 :
6 0 : if( FD_UNLIKELY( !mem ) ) {
7 0 : FD_LOG_WARNING( ( "NULL mem" ) );
8 0 : return NULL;
9 0 : }
10 :
11 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, fd_pending_slots_align() ) ) ) {
12 0 : FD_LOG_WARNING( ( "misaligned mem" ) );
13 0 : return NULL;
14 0 : }
15 :
16 0 : ulong footprint = fd_pending_slots_footprint();
17 :
18 0 : fd_memset( mem, 0, footprint );
19 0 : ulong laddr = (ulong)mem;
20 0 : fd_pending_slots_t * pending_slots = (void *)laddr;
21 0 : pending_slots->lo_wmark = lo_wmark;
22 0 : pending_slots->start = 0;
23 0 : pending_slots->end = 0;
24 0 : pending_slots->lock = 0;
25 :
26 0 : laddr += sizeof( fd_pending_slots_t );
27 0 : pending_slots->pending = (void *)laddr;
28 :
29 0 : laddr += sizeof(long) * FD_PENDING_MAX;
30 :
31 0 : FD_TEST( laddr == (ulong)mem + footprint );
32 :
33 0 : return mem;
34 0 : }
35 :
36 : /* TODO only safe for local joins */
37 : fd_pending_slots_t *
38 0 : fd_pending_slots_join( void * pending_slots ) {
39 0 : if( FD_UNLIKELY( !pending_slots ) ) {
40 0 : FD_LOG_WARNING( ( "NULL pending_slots" ) );
41 0 : return NULL;
42 0 : }
43 :
44 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)pending_slots, fd_pending_slots_align() ) ) ) {
45 0 : FD_LOG_WARNING( ( "misaligned pending_slots" ) );
46 0 : return NULL;
47 0 : }
48 :
49 0 : fd_pending_slots_t * pending_slots_ = (fd_pending_slots_t *)pending_slots;
50 :
51 0 : return pending_slots_;
52 0 : }
53 :
54 : void *
55 0 : fd_pending_slots_leave( fd_pending_slots_t const * pending_slots ) {
56 0 : if( FD_UNLIKELY( !pending_slots ) ) {
57 0 : FD_LOG_WARNING( ( "NULL pending_slots" ) );
58 0 : return NULL;
59 0 : }
60 :
61 0 : return (void *)pending_slots;
62 0 : }
63 :
64 : void *
65 0 : fd_pending_slots_delete( void * pending_slots ) {
66 0 : if( FD_UNLIKELY( !pending_slots ) ) {
67 0 : FD_LOG_WARNING( ( "NULL pending_slots" ) );
68 0 : return NULL;
69 0 : }
70 :
71 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)pending_slots, fd_pending_slots_align() ) ) ) {
72 0 : FD_LOG_WARNING( ( "misaligned pending_slots" ) );
73 0 : return NULL;
74 0 : }
75 :
76 0 : return pending_slots;
77 0 : }
78 :
79 : static void
80 0 : fd_pending_slots_lock( fd_pending_slots_t * pending_slots ) {
81 0 : # if FD_HAS_THREADS
82 0 : for(;;) {
83 0 : if( FD_LIKELY( !FD_ATOMIC_CAS( &pending_slots->lock, 0UL, 1UL) ) ) break;
84 0 : FD_SPIN_PAUSE();
85 0 : }
86 : # else
87 : pending_slots->lock = 1UL;
88 : # endif
89 0 : FD_COMPILER_MFENCE();
90 0 : }
91 :
92 : static void
93 0 : fd_pending_slots_unlock( fd_pending_slots_t * pending_slots ) {
94 0 : FD_COMPILER_MFENCE();
95 0 : FD_VOLATILE( pending_slots->lock ) = 0UL;
96 0 : }
97 :
98 :
99 : ulong
100 0 : fd_pending_slots_iter_init( fd_pending_slots_t * pending_slots ) {
101 0 : return pending_slots->start;
102 0 : }
103 :
104 : ulong
105 : fd_pending_slots_iter_next( fd_pending_slots_t * pending_slots,
106 : long now,
107 0 : ulong i ) {
108 0 : fd_pending_slots_lock( pending_slots );
109 0 : ulong end = pending_slots->end;
110 0 : for( i = fd_ulong_max(i, pending_slots->start); 1; ++i ) {
111 0 : if( i >= end ) {
112 : /* End sentinel */
113 0 : i = ULONG_MAX;
114 0 : break;
115 0 : }
116 0 : long * ele = &pending_slots->pending[ i & FD_PENDING_MASK ];
117 0 : if( i <= pending_slots->lo_wmark || *ele == 0 ) {
118 : /* Empty or useless slot */
119 0 : if( pending_slots->start == i )
120 0 : pending_slots->start = i+1U; /* Pop it */
121 0 : } else if( *ele <= now ) {
122 : /* Do this slot */
123 0 : long when = *ele;
124 0 : *ele = 0;
125 0 : if( pending_slots->start == i )
126 0 : pending_slots->start = i+1U; /* Pop it */
127 0 : FD_LOG_DEBUG(( "preparing slot %lu when=%ld now=%ld latency=%ld",
128 0 : i, when, now, now - when ));
129 0 : break;
130 0 : }
131 0 : }
132 0 : fd_pending_slots_unlock( pending_slots );
133 0 : return i;
134 0 : }
135 :
136 : int
137 : fd_pending_slots_check( fd_pending_slots_t const * pending_slots,
138 0 : ulong slot ) {
139 0 : if( slot < pending_slots->lo_wmark ) {
140 0 : return 0;
141 0 : } else if( pending_slots->start == pending_slots->end ) {
142 0 : return 1;
143 0 : } else if( slot < pending_slots->start && (long)(pending_slots->end - slot) > ((long)FD_PENDING_MAX / 4L) ) {
144 0 : return 0;
145 0 : } else if( slot >= pending_slots->end && (long)(slot - pending_slots->start) > ((long)FD_PENDING_MAX / 4L) ) {
146 0 : return 0;
147 0 : }
148 :
149 0 : return 1;
150 0 : }
151 :
152 : void
153 : fd_pending_slots_add( fd_pending_slots_t * pending_slots,
154 : ulong slot,
155 0 : long when ) {
156 0 : fd_pending_slots_lock( pending_slots );
157 :
158 0 : long * pending = pending_slots->pending;
159 0 : if( slot < pending_slots->lo_wmark ) {
160 0 : FD_LOG_ERR(( "pending queue overrun: lo_wmark=%lu, start=%lu, end=%lu, new slot=%lu", pending_slots->lo_wmark, pending_slots->start, pending_slots->end, slot ));
161 0 : } else if( pending_slots->start == pending_slots->end ) {
162 : /* Queue is empty */
163 0 : pending_slots->start = slot;
164 0 : pending_slots->end = slot+1U;
165 0 : pending[slot & FD_PENDING_MASK] = when;
166 0 : FD_LOG_DEBUG(("PENDING QUEUE: EMPTY, START SLOT: %lu, END SLOT: %lu", pending_slots->start, pending_slots->end));
167 :
168 0 : } else if ( slot < pending_slots->start ) {
169 : /* Grow down */
170 0 : if( (long)(pending_slots->end - slot) > (long)FD_PENDING_MAX )
171 0 : FD_LOG_ERR(( "pending queue overrun: start=%lu, end=%lu, new slot=%lu", pending_slots->start, pending_slots->end, slot ));
172 0 : pending[slot & FD_PENDING_MASK] = when;
173 0 : for( ulong i = slot+1; i < pending_slots->start; i++ ) {
174 : /* Zero fill */
175 0 : pending[i & FD_PENDING_MASK] = 0;
176 0 : }
177 0 : pending_slots->start = slot;
178 0 : FD_LOG_DEBUG(("PENDING QUEUE: GROW DOWN, START SLOT: %lu", pending_slots->start));
179 :
180 0 : } else if ( slot >= pending_slots->end ) {
181 : /* Grow up */
182 0 : if( (long)(slot - pending_slots->start) > (long)FD_PENDING_MAX )
183 0 : FD_LOG_ERR(( "pending queue overrun: start=%lu, end=%lu, new slot=%lu", pending_slots->start, pending_slots->end, slot ));
184 0 : pending[slot & FD_PENDING_MASK] = when;
185 0 : for( ulong i = pending_slots->end; i < slot; i++ ) {
186 : /* Zero fill */
187 0 : pending[i & FD_PENDING_MASK] = 0;
188 0 : }
189 0 : pending_slots->end = slot+1U;
190 0 : FD_LOG_DEBUG(("PENDING QUEUE: GROW UP, END SLOT: %lu", pending_slots->end));
191 :
192 0 : } else {
193 : /* Update in place */
194 0 : long * p = &pending[slot & FD_PENDING_MASK];
195 0 : if( 0 == *p || *p > when )
196 0 : *p = when;
197 0 : }
198 :
199 0 : fd_pending_slots_unlock( pending_slots );
200 0 : }
201 :
202 : long
203 : fd_pending_slots_get( fd_pending_slots_t * pending_slots,
204 0 : ulong slot ) {
205 0 : if( pending_slots->start == pending_slots->end ) {
206 0 : return LONG_MAX;
207 0 : } else if( slot < pending_slots->start ) {
208 0 : return LONG_MAX;
209 0 : } else if( slot >= pending_slots->end ) {
210 0 : return LONG_MAX;
211 0 : } else {
212 0 : return pending_slots->pending[ slot & FD_PENDING_MASK ];
213 0 : }
214 0 : }
215 :
216 : void
217 : fd_pending_slots_set_lo_wmark( fd_pending_slots_t * pending_slots,
218 0 : ulong slot ) {
219 0 : pending_slots->lo_wmark = slot;
220 0 : }
|