Line data Source code
1 : #include <lz4.h>
2 : #include "fd_vinyl.h"
3 :
4 : void
5 : fd_vinyl_compact( fd_vinyl_t * vinyl,
6 0 : ulong compact_max ) {
7 :
8 0 : fd_vinyl_io_t * io = vinyl->io;
9 0 : ulong gc_thresh = vinyl->gc_thresh;
10 0 : int gc_eager = vinyl->gc_eager;
11 0 : int style = vinyl->style;
12 :
13 0 : ulong io_seed = fd_vinyl_io_seed ( io ); (void)io_seed;
14 0 : ulong seq_past = fd_vinyl_io_seq_past ( io );
15 0 : ulong seq_present = fd_vinyl_io_seq_present( io );
16 :
17 0 : if( FD_UNLIKELY( (!compact_max) | ((seq_present-seq_past)<=gc_thresh) | (gc_eager<0) ) ) return;
18 :
19 0 : fd_vinyl_meta_t * meta = vinyl->meta;
20 0 : fd_vinyl_line_t * line = vinyl->line;
21 0 : ulong line_cnt = vinyl->line_cnt;
22 0 : ulong garbage_sz = vinyl->garbage_sz;
23 :
24 0 : fd_vinyl_meta_ele_t * ele0 = meta->ele;
25 0 : ulong ele_max = meta->ele_max;
26 0 : ulong meta_seed = meta->seed;
27 :
28 0 : fd_vinyl_data_t * data = vinyl->data;
29 :
30 0 : fd_vinyl_data_vol_t * vol = data->vol; (void)vol;
31 0 : ulong vol_cnt = data->vol_cnt; (void)vol_cnt;
32 :
33 0 : ulong seq = seq_past;
34 :
35 0 : for( ulong rem=compact_max; rem; rem-- ) {
36 :
37 : /* At this point, we've compacted [seq_past,seq) (cyclic), with
38 : items still needed in this range at [seq_present,seq_future)
39 : (cyclic). We still have [seq,seq_present) (cyclic), containing
40 : garbage_sz bytes to compact.
41 :
42 : If the new past region is small enough or there is a relatively
43 : small amount of garbage in this region, we consider the bstream's
44 : past fully compacted. */
45 :
46 0 : ulong past_sz_new = fd_vinyl_io_seq_future( io ) - seq;
47 0 : if( FD_UNLIKELY( (past_sz_new <= gc_thresh ) |
48 0 : (garbage_sz <= (past_sz_new >> gc_eager)) |
49 0 : (fd_vinyl_seq_ge( seq, seq_present ) ) ) ) {
50 0 : FD_CRIT( fd_vinyl_seq_le( seq, seq_present ), "corruption detected" );
51 0 : if( FD_UNLIKELY( fd_vinyl_seq_eq( seq, seq_present ) ) ) FD_CRIT( !garbage_sz, "corruption detected" );
52 0 : break;
53 0 : }
54 :
55 : /* At this point, there is enough garbage to do some more
56 : compaction. Load the leading block of the object at seq and
57 : determine if this object is needed to recover the bstream's state
58 : at seq_present.
59 :
60 : That is, we determine if the object at bstream_past_new is the
61 : version of a pair that exists at bstream seq_present. If so, we
62 : append a copy to the bstream's present.
63 :
64 : When compacting is complete, we forget the region containing the
65 : copy at seq. This then effectively moves the copy from seq to
66 : seq_future without any risk of losing data while allowing
67 : compaction to be done with large amounts of async I/O overlapped
68 : with compaction processing (metadata lookups, hash validation,
69 : etc).
70 :
71 : This move will not move the pair past any conflicting operations
72 : later in the bstream's past (almost definitionally so as the pair
73 : is the most recent version). Thus set of pairs recovered at
74 : seq_future will be identical to the set of pairs recovered at
75 : seq_present. */
76 :
77 0 : fd_vinyl_bstream_block_t block[1];
78 :
79 0 : fd_vinyl_io_read_imm( io, seq, block, FD_VINYL_BSTREAM_BLOCK_SZ );
80 :
81 0 : ulong ctl = block->ctl;
82 :
83 0 : int type = fd_vinyl_bstream_ctl_type( ctl );
84 :
85 0 : switch( type ) {
86 :
87 0 : case FD_VINYL_BSTREAM_CTL_TYPE_PAIR: {
88 :
89 : /* At this point, we've read a pair's leading block into block.
90 : Validate the pair was completely written. It's okay if we are
91 : in a move (move block processing the previous iteration already
92 : confirmed this pair is the proper). */
93 :
94 0 : int pair_style = fd_vinyl_bstream_ctl_style( ctl );
95 0 : ulong pair_val_esz = fd_vinyl_bstream_ctl_sz ( ctl );
96 0 : fd_vinyl_key_t const * pair_key = &block->phdr.key;
97 0 : ulong pair_val_sz = (ulong)block->phdr.info.val_sz;
98 :
99 0 : ulong pair_sz = fd_vinyl_bstream_pair_sz( pair_val_esz );
100 :
101 0 : int truncated = (pair_sz > (seq_present - seq)); /* Wrapping safe */
102 0 : int bad_esz = (pair_val_esz > FD_VINYL_VAL_MAX);
103 0 : int bad_sz = (pair_val_sz > FD_VINYL_VAL_MAX);
104 :
105 0 : FD_CRIT( !(truncated | bad_esz | bad_sz), truncated ? "truncated pair" :
106 0 : bad_esz ? "unexpected pair value encoded size" :
107 0 : "pair value size too large" );
108 :
109 : # if FD_PARANOID
110 : fd_vinyl_bstream_block_t _ftr[1];
111 : fd_vinyl_bstream_block_t * ftr = _ftr;
112 :
113 : if( FD_UNLIKELY( pair_sz <= FD_VINYL_BSTREAM_BLOCK_SZ ) ) ftr = block;
114 : else fd_vinyl_io_read_imm( io, seq + pair_sz - FD_VINYL_BSTREAM_BLOCK_SZ, ftr, FD_VINYL_BSTREAM_BLOCK_SZ );
115 :
116 : FD_ALERT( !fd_vinyl_bstream_pair_test_fast( io_seed, seq, block, ftr ), "corruption detected" );
117 : # endif
118 :
119 : /* At this point, we appear to have a valid pair. Query the
120 : vinyl's meta to determine if this is the version of the pair at
121 : bstream seq_present. Since this implementation is doing single
122 : threaded recovery, we can use the single threaded optimized
123 : meta APIs. */
124 :
125 0 : ulong pair_memo = fd_vinyl_key_memo( meta_seed, pair_key );
126 :
127 0 : ulong _ele_idx; /* avoid pointer escape */
128 0 : int err = fd_vinyl_meta_query_fast( ele0, ele_max, pair_key, pair_memo, &_ele_idx );
129 0 : ulong ele_idx = _ele_idx;
130 :
131 0 : if( FD_LIKELY( !err ) ) {
132 :
133 : /* At this point, a version of pair key is mapped */
134 :
135 0 : if( FD_LIKELY( fd_vinyl_meta_ele_in_bstream( &ele0[ ele_idx ] ) ) ) {
136 :
137 : /* At this point, a version of pair key exists at bstream
138 : seq_present (i.e. is not in the process of being created by
139 : a client). */
140 :
141 0 : ulong pair_seq = ele0[ ele_idx ].seq;
142 :
143 0 : if( FD_LIKELY( fd_vinyl_seq_eq( pair_seq, seq ) ) ) {
144 :
145 : /* At this point, the version of pair key at seq is the
146 : version of pair key that exists at bstream seq_present.
147 : Validate the metadata. */
148 :
149 0 : FD_CRIT( !memcmp( &ele0[ ele_idx ].phdr, &block->phdr, sizeof(fd_vinyl_bstream_phdr_t) ), "corruption detected" );
150 :
151 : /* If the pair is cached and not acquired for modify, append
152 : the cached copy in the target style. Otherwise, append a
153 : (possibly recoded) copy from the bstream. */
154 :
155 0 : int pair_style_new;
156 0 : ulong pair_val_esz_new;
157 0 : ulong pair_seq_new;
158 :
159 0 : int do_copy = 1;
160 :
161 0 : ulong line_idx = ele0[ ele_idx ].line_idx;
162 :
163 0 : if( FD_LIKELY( line_idx!=ULONG_MAX ) ) { /* Pair is in cache */
164 :
165 0 : FD_CRIT( line_idx<line_cnt, "corruption detected" );
166 0 : FD_CRIT( line[ line_idx ].ele_idx==ele_idx, "corruption detected" );
167 :
168 0 : fd_vinyl_data_obj_t * obj = line[ line_idx ].obj;
169 :
170 0 : FD_ALERT( fd_vinyl_data_is_valid_obj( obj, vol, vol_cnt ), "corruption detected" );
171 0 : FD_CRIT ( obj->line_idx==line_idx, "corruption detected" );
172 0 : FD_CRIT ( !obj->rd_active, "corruption detected" );
173 :
174 0 : ulong line_ctl = line[ line_idx ].ctl;
175 :
176 0 : if( FD_LIKELY( fd_vinyl_line_ctl_ref( line_ctl )>=0L ) ) { /* Pair cached and not acquired for modify */
177 :
178 0 : fd_vinyl_bstream_phdr_t * phdr = fd_vinyl_data_obj_phdr( obj );
179 :
180 0 : FD_ALERT( !memcmp( phdr, &block->phdr, sizeof(fd_vinyl_bstream_phdr_t) ), "corruption detected" );
181 :
182 0 : pair_seq_new = fd_vinyl_io_append_pair_inplace( io, style, phdr, &pair_style_new, &pair_val_esz_new );
183 :
184 0 : do_copy = 0;
185 :
186 0 : }
187 :
188 0 : }
189 :
190 0 : if( do_copy ) { /* Pair is either in cache or acquired for modify, append from the bstream */
191 :
192 0 : if( FD_LIKELY( (pair_style!=FD_VINYL_BSTREAM_CTL_STYLE_RAW) |
193 0 : (style ==FD_VINYL_BSTREAM_CTL_STYLE_RAW) |
194 0 : (pair_sz ==FD_VINYL_BSTREAM_BLOCK_SZ ) ) ) {
195 :
196 : /* At this point, the pair is already stored in an
197 : encoded format, the preferred format for storing
198 : encoded pairs is raw and/or encoding the pair will
199 : not make it any smaller in the bstream. Copy the
200 : pair as is from seq to seq_future. The reason we
201 : don't reencode the pair in the second case is that
202 : this pair has likely not been touched since it last
203 : got to the bstream's seq_past. It would be waste to
204 : compute and bstream storage to uncompress it as we
205 : copy it. */
206 :
207 0 : pair_style_new = pair_style;
208 0 : pair_val_esz_new = fd_vinyl_bstream_ctl_sz( ele0[ ele_idx ].phdr.ctl );
209 0 : pair_seq_new = fd_vinyl_io_copy( io, pair_seq, pair_sz );
210 :
211 0 : } else {
212 :
213 : /* At this point, the pair is stored in a raw encoded
214 : format, the preferred format is an encoded format and
215 : there is a possibility that encoding it will make it
216 : smaller. Encode the pair as we copy it from seq to
217 : seq_future.
218 :
219 : To do this, we allocate enough scratch from the io
220 : append spad to cover the worst case encoded pair and
221 : the raw pair (this sets the lower bound on how large
222 : the io append spad must be). Then we read the raw
223 : pair into the trailing part of the scratch and encode
224 : from that into the leading part of the scratch.
225 :
226 : We play some games with the spad_used so that the
227 : append_pair_inplace will not invalidate the read and
228 : so that we use scratch as efficiently as possible
229 : when there is lots of stuff to compress. */
230 :
231 0 : ulong cpair_max = fd_vinyl_bstream_pair_sz( (ulong)LZ4_COMPRESSBOUND( (int)pair_val_sz ) );
232 0 : ulong scratch_max = cpair_max + pair_sz;
233 :
234 0 : fd_vinyl_bstream_phdr_t * cphdr = (fd_vinyl_bstream_phdr_t *)
235 0 : fd_vinyl_io_alloc( io, scratch_max, FD_VINYL_IO_FLAG_BLOCKING );
236 :
237 0 : fd_vinyl_bstream_phdr_t * phdr = (fd_vinyl_bstream_phdr_t *)((ulong)cphdr + cpair_max);
238 :
239 0 : fd_vinyl_io_read_imm( io, seq, phdr, pair_sz );
240 :
241 0 : fd_vinyl_io_trim( io, scratch_max );
242 :
243 0 : pair_seq_new = fd_vinyl_io_append_pair_inplace( io, style, phdr, &pair_style_new, &pair_val_esz_new );
244 :
245 : /* At this point, we either are appending the encoded
246 : pair from the leading part of the scratch and
247 : spad_used is correct or we are appending the pair
248 : from the trailing part and spad_used does not include
249 : it. Adjust the spad used for the later case. In
250 : this second case, we end up with a temporary hole in
251 : the scratch when we decided not to copy into an
252 : encoded form. This just scratch is used less
253 : efficiently in the unlikely case in order to use it
254 : more efficiently in the likely case (the correct
255 : tradeoff). */
256 :
257 0 : if( FD_UNLIKELY( pair_style_new==FD_VINYL_BSTREAM_CTL_STYLE_RAW ) ) io->spad_used += scratch_max;
258 :
259 0 : }
260 0 : }
261 :
262 : /* Note: we don't need to prepare here because we aren't
263 : modifying shared fields. */
264 :
265 0 : ele0[ ele_idx ].phdr.ctl = fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_PAIR, pair_style_new, pair_val_esz_new );
266 0 : ele0[ ele_idx ].seq = pair_seq_new;
267 :
268 0 : } else {
269 :
270 : /* The version of the pair at bstream seq was replaced. The
271 : most recent version of this pair is at pair_seq. */
272 :
273 0 : FD_CRIT( fd_vinyl_seq_gt( pair_seq, seq ), "corruption detected" );
274 :
275 0 : garbage_sz -= pair_sz;
276 :
277 0 : }
278 :
279 0 : } else {
280 :
281 : /* The pair at bstream seq does not exist in the bstream at
282 : bstream seq_present. It is in the vinyl meta because it is
283 : being created. We wouldn't be in the process of creating
284 : it unless this pair (or a subsequent version of it) was
285 : erased or moved before seq_present. So this pair is
286 : garbage. */
287 :
288 0 : garbage_sz -= pair_sz;
289 :
290 0 : }
291 :
292 0 : } else {
293 :
294 : /* The pair at bstream seq does not exist in the bstream at
295 : bstream seq_present. This pair (or a subsequent version of
296 : it) was erased or moved before seq_present. So this pair
297 : is garbage. */
298 :
299 0 : garbage_sz -= pair_sz;
300 :
301 0 : }
302 :
303 0 : seq += pair_sz;
304 0 : break;
305 :
306 0 : }
307 :
308 0 : case FD_VINYL_BSTREAM_CTL_TYPE_DEAD:
309 0 : case FD_VINYL_BSTREAM_CTL_TYPE_MOVE:
310 0 : case FD_VINYL_BSTREAM_CTL_TYPE_PART: {
311 :
312 : /* DEAD blocks can always be compacted out because the version of
313 : the pair they reference is not in the current view of the
314 : bstream (because that version was unmapped when the DEAD was
315 : written), that version was located at an earlier location than
316 : the DEAD (because blocks are appended sequentially) and thus
317 : that version has already been compacted out (because a previous
318 : iteration of this would have encountered it before getting this
319 : DEAD block, would have detecting that version was no longer
320 : needed and compacted it at that time instead of moving it to a
321 : higher sequence number).
322 :
323 : MOVE blocks can always be compacted out for the same reasons as
324 : the above with the twist that, compacting the move block makes
325 : the pair following look like a create from the point of view of
326 : a recovery starting at the pair. This is immaterial though
327 : because doesn't change the recovered view if recovery starts
328 : on the block after the move.
329 :
330 : PART blocks can always be compacted because they are just
331 : informational (to help partition the bstream past in parallel
332 : recovery) and this partition ends bstream blocks that have
333 : already been compacted out.
334 :
335 : We validate the block because we already have the data anyway. */
336 :
337 0 : FD_ALERT( !fd_vinyl_bstream_block_test( io_seed, block ), "corruption detected" );
338 :
339 0 : garbage_sz -= FD_VINYL_BSTREAM_BLOCK_SZ;
340 0 : seq += FD_VINYL_BSTREAM_BLOCK_SZ;
341 0 : break;
342 :
343 0 : }
344 :
345 0 : case FD_VINYL_BSTREAM_CTL_TYPE_ZPAD: {
346 :
347 : /* ZPAD blocks can always be compacted out because they are no-ops
348 : from the point of view of bstream processing (the underlying
349 : I/O layer can insert these so that, for example, a multi-block
350 : pair is never split across two different physical volumes).
351 : Note that zpad blocks aren't included in garbage_sz because we
352 : don't control when they get created (and thus can't easily
353 : update garbage_sz to account for them when they are created). */
354 :
355 0 : FD_ALERT( !fd_vinyl_bstream_zpad_test( io_seed, seq, block ), "corruption detected" );
356 :
357 0 : seq += FD_VINYL_BSTREAM_BLOCK_SZ;
358 0 : break;
359 :
360 0 : }
361 :
362 0 : default: FD_LOG_CRIT(( "%016lx: unknown type (%x)", seq, (uint)type ));
363 :
364 0 : }
365 :
366 0 : }
367 :
368 : /* At this point, we've made copies of all info in [seq_past,seq)
369 : (cyclic) to [seq_present,seq_future) (cyclic) needed to recover the
370 : bstream's state at seq_present. We commit the new, forget the old
371 : and update the garbage size to finish this compaction. */
372 :
373 0 : fd_vinyl_io_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
374 0 : fd_vinyl_io_forget( io, seq );
375 :
376 0 : vinyl->garbage_sz = garbage_sz;
377 0 : }
|