Line data Source code
1 : #ifndef HEADER_fd_src_vinyl_io_fd_vinyl_io_ur_wb_ring_h
2 : #define HEADER_fd_src_vinyl_io_fd_vinyl_io_ur_wb_ring_h
3 :
4 : /* wb_ring.h provides a ring buffer for append-only writes. It is used
5 : to implement the write-back cache in vinyl_io_ur. */
6 :
7 : #include "../../bstream/fd_vinyl_bstream.h"
8 :
9 : /* wb_ring maps bstream seqs (logical) to byte offsets in a buffer
10 : (physical). Logical addressing is abbreviated as 'seq', and physical
11 : as 'off'. Operation is as follows:
12 :
13 : - Users allocate chunks out of a wb_ring. Each allocation is
14 : contiguous in logical and physical space. There are no gaps in
15 : logical space, but the wb_ring wraps around in physical space, to
16 : ensure chunks are contiguous. Allocations implicitly free the
17 : oldest data.
18 :
19 : - Users may trim the most recent allocations. Trimming may go out of
20 : bounds (prior to the oldest available byte), in which case the
21 : wb_ring gracefully becomes empty.
22 :
23 : - wb_ring allows fast translation of logical addresses to buffer
24 : offsets. */
25 :
26 : struct wb_ring {
27 : ulong max;
28 :
29 : /* [seq0,seq1+sz1) is the logical range
30 : [off0,off0+sz0) is the physical range for the logical low range
31 : [off1,off1+sz1) is the physical range for the logical high range
32 :
33 : invariants:
34 : - sz0+sz1 <= max
35 : - seq1-seq0 == sz0
36 : - off1+sz1 == off0
37 : - off0+sz0 <= max
38 : - off1+sz1 <= max
39 : - [off0,off0+sz0) and [off1,off1+sz1) non-overlapping
40 : - if sz1==0, then sz0==0 */
41 :
42 : ulong seq0;
43 : ulong seq1;
44 : ulong off0;
45 : ulong off1;
46 : ulong sz0;
47 : ulong sz1;
48 : };
49 :
50 : typedef struct wb_ring wb_ring_t;
51 :
52 : /* wb_ring_init initializes *ring. seq is the logical address of the
53 : next append byte. max is the capacity of the ring. */
54 :
55 : static inline wb_ring_t *
56 : wb_ring_init( wb_ring_t * ring,
57 : ulong seq,
58 0 : ulong max ) {
59 0 : if( FD_UNLIKELY( !max ) ) {
60 0 : FD_LOG_WARNING(( "zero max" ));
61 0 : return NULL;
62 0 : }
63 0 : *ring = (wb_ring_t) {
64 0 : .max = max,
65 0 : .seq0 = seq,
66 0 : .seq1 = seq,
67 0 : .off0 = 0UL,
68 0 : .off1 = 0UL,
69 0 : .sz0 = 0UL,
70 0 : .sz1 = 0UL
71 0 : };
72 0 : return ring;
73 0 : }
74 :
75 : /* wb_ring_alloc appends sz bytes to wb_ring (evicting low bytes as
76 : necessary). */
77 :
78 : static inline void
79 : wb_ring_alloc( wb_ring_t * wb,
80 0 : ulong const r_sz ) {
81 0 : ulong off0 = wb->off0;
82 0 : ulong off1 = wb->off1;
83 0 : ulong seq0 = wb->seq0;
84 0 : ulong seq1 = wb->seq1;
85 0 : ulong sz0 = wb->sz0;
86 0 : ulong sz1 = wb->sz1;
87 0 : ulong const max = wb->max;
88 0 : FD_CRIT( r_sz<=max, "request too large" );
89 :
90 : /* append to part1? */
91 0 : ulong r_off0 = wb->off1 + wb->sz1;
92 0 : ulong r_off1 = r_off0 + r_sz;
93 0 : if( FD_LIKELY( r_off1<=max ) ) {
94 : /* can extend part1, handle overlap with part0 */
95 0 : FD_CRIT( r_off1>=off0, "gap in middle" );
96 0 : ulong overlap = fd_ulong_min( r_off1 - off0, sz0 );
97 0 : wb->off0 = fd_ulong_max( off0+overlap, off1+sz1+r_sz );
98 0 : wb->seq0 = seq0 + overlap;
99 0 : wb->sz0 = sz0 - overlap;
100 0 : wb->sz1 = sz1 + r_sz;
101 0 : return;
102 0 : }
103 :
104 : /* need to wrap around. move part1 to part0. remove part1 */
105 0 : seq0 = seq1;
106 0 : seq1 += sz1;
107 0 : sz0 = sz1;
108 0 : off0 = off1;
109 0 : sz1 = 0UL;
110 0 : off1 = 0UL;
111 :
112 : /* left-extend part0 to offset 0 */
113 0 : seq0 -= off0;
114 0 : sz0 += off0;
115 0 : off0 = 0UL;
116 :
117 : /* allocate seq1 */
118 0 : if( r_sz>sz0 ) {
119 : /* destroy part0 if it would cause a gap in the middle, or
120 : completely overlap it */
121 0 : off0 = sz1;
122 0 : seq0 = seq1;
123 0 : sz0 = 0UL;
124 0 : } else {
125 : /* handle partial overlap of our new part1 with part0 */
126 0 : seq0 += r_sz;
127 0 : sz0 -= r_sz;
128 0 : off0 += r_sz;
129 0 : sz1 += r_sz;
130 0 : }
131 :
132 0 : wb->seq0 = seq0;
133 0 : wb->seq1 = seq1;
134 0 : wb->off0 = off0;
135 0 : wb->off1 = off1;
136 0 : wb->sz0 = sz0;
137 0 : wb->sz1 = sz1;
138 : /* off0 and off1 don't overlap check */
139 0 : FD_CRIT( ( off0>=off1+sz1 ) | ( off1>=off0+sz0 ), "wb_ring internal error" );
140 0 : }
141 :
142 : /* wb_ring_alloc_seq0 simulates an allocation of sz bytes. Returns the
143 : logical address of the oldest available byte after the allocation is
144 : done. (This method is useful to determine how much old data an
145 : allocation would evict.) */
146 :
147 : FD_FN_PURE static inline ulong
148 : wb_ring_alloc_seq0( wb_ring_t const * wb,
149 0 : ulong const sz ) {
150 0 : wb_ring_t shadow = *wb;
151 0 : wb_ring_alloc( &shadow, sz );
152 0 : return shadow.seq0;
153 0 : }
154 :
155 : /* wb_ring_trim removes most recently written bytes up to seq_hi. This
156 : is useful for callers that over-allocate because they don't know the
157 : exact required chunk size at the time of allocation, e.g. when
158 : serializing. But it is also useful to revert multiple allocations. */
159 :
160 : static inline void
161 : wb_ring_trim( wb_ring_t * wb,
162 0 : ulong seq_hi ) {
163 0 : ulong seq0 = wb->seq0;
164 0 : ulong seq1 = wb->seq1;
165 0 : ulong off0 = wb->off0;
166 0 : ulong sz0 = wb->sz0;
167 0 : ulong sz1 = wb->sz1;
168 :
169 0 : if( FD_UNLIKELY( fd_vinyl_seq_le( seq_hi, seq0 ) ) ) {
170 : /* destroy both regions */
171 0 : wb->seq0 = seq_hi;
172 0 : wb->seq1 = seq_hi;
173 0 : wb->off0 = 0UL;
174 0 : wb->off1 = 0UL;
175 0 : wb->sz0 = 0UL;
176 0 : wb->sz1 = 0UL;
177 0 : return;
178 0 : }
179 :
180 0 : if( FD_UNLIKELY( fd_vinyl_seq_le( seq_hi, seq1 ) ) ) {
181 : /* destroy part1, extend part0, and swap */
182 0 : seq0 -= off0;
183 0 : sz0 += off0;
184 0 : off0 = 0UL;
185 0 : sz0 = seq_hi - seq0;
186 0 : wb->seq0 = seq0;
187 0 : wb->seq1 = seq0;
188 0 : wb->off0 = sz0;
189 0 : wb->off1 = off0;
190 0 : wb->sz0 = 0UL;
191 0 : wb->sz1 = sz0;
192 0 : return;
193 0 : }
194 :
195 : /* trim part1, extend part0 backwards (to avoid middle gap) */
196 0 : ulong trim = (seq1+sz1) - seq_hi;
197 0 : sz1 -= trim;
198 0 : off0 -= trim;
199 0 : seq0 -= trim;
200 0 : sz0 += trim;
201 0 : wb->seq0 = seq0;
202 0 : wb->sz0 = sz0;
203 0 : wb->sz1 = sz1;
204 0 : wb->off0 = off0;
205 : /* if part0 is empty, keep it at seq1 */
206 0 : if( sz0==trim ) {
207 0 : wb->seq0 = wb->seq1;
208 0 : wb->off0 = wb->off1 + wb->sz1;
209 0 : wb->sz0 = 0UL;
210 0 : }
211 0 : }
212 :
213 : /* wb_ring_seq_to_off returns the byte offset for the given seq. May
214 : return out-of-bounds offsets for seq that are not in bounds. */
215 :
216 : static inline ulong
217 : wb_ring_seq_to_off( wb_ring_t const * wb,
218 0 : ulong seq ) {
219 0 : ulong const seq0 = wb->seq0;
220 0 : ulong const seq1 = wb->seq1;
221 0 : ulong const off0 = wb->off0;
222 0 : ulong const off1 = wb->off1;
223 0 : ulong const sz1 = wb->sz1;
224 :
225 0 : FD_CRIT( seq>=seq0, "seq out of bounds" );
226 0 : FD_CRIT( seq< seq1+sz1, "seq out of bounds" );
227 :
228 0 : if( fd_vinyl_seq_lt( seq, seq1 ) ) {
229 0 : return off0 + ( seq-seq0 );
230 0 : } else {
231 0 : return off1 + ( seq-seq1 );
232 0 : }
233 0 : }
234 :
235 : /* [wb_ring_seq0,wb_ring_seq1) is the logical range tracked by a
236 : wb_ring. */
237 :
238 : static inline ulong
239 0 : wb_ring_seq0( wb_ring_t const * wb ) {
240 0 : return wb->seq0;
241 0 : }
242 :
243 : static inline ulong
244 0 : wb_ring_seq1( wb_ring_t const * wb ) {
245 0 : return wb->seq1 + wb->sz1;
246 0 : }
247 :
248 : /* wb_ring_translate translates a seq range to a pair of physical
249 : ranges. */
250 :
251 : struct wb_ring_span {
252 : ulong off0;
253 : ulong sz0;
254 : ulong off1;
255 : ulong sz1;
256 : };
257 :
258 : typedef struct wb_ring_span wb_ring_span_t;
259 :
260 : static inline wb_ring_span_t
261 : wb_ring_translate( wb_ring_t const * wb,
262 : ulong seq0,
263 0 : ulong sz ) {
264 0 : ulong const src_off0 = wb_ring_seq_to_off( wb, seq0 );
265 0 : if( FD_LIKELY( fd_vinyl_seq_ge( seq0, wb->seq1 ) ) ) {
266 : /* entire read is served by part1 of wb cache */
267 0 : FD_CRIT( src_off0+sz<=wb->max, "invariant violation" );
268 0 : return (wb_ring_span_t) { .off0=src_off0, .sz0=sz };
269 0 : } else {
270 : /* slow path */
271 0 : ulong lsz = fd_ulong_min( wb->seq1 - seq0, sz );
272 0 : FD_CRIT( lsz<=sz, "invariant violation" );
273 0 : FD_CRIT( lsz<=wb->sz0, "invariant violation" );
274 0 : wb_ring_span_t span = { .off0 = src_off0, .sz0=lsz };
275 0 : if( lsz<sz ) {
276 0 : ulong src_off1 = wb_ring_seq_to_off( wb, seq0+lsz );
277 0 : FD_CRIT( (sz-lsz)<=wb->sz1, "invariant violation" );
278 0 : span.off1 = src_off1;
279 0 : span.sz1 = sz-lsz;
280 0 : }
281 0 : return span;
282 0 : }
283 0 : }
284 :
285 : #endif /* HEADER_fd_src_vinyl_io_fd_vinyl_io_ur_wb_ring_h */
|