Line data Source code
1 : #include "fd_vinyl.h"
2 : #include "fd_vinyl_recover_serial.c"
3 :
4 0 : #define PEEK( seq ) ((fd_vinyl_bstream_block_t *)(mmio + ((seq) % mmio_sz)))
5 :
6 : /* fd_vinyl_recover_test tests if parallel recovery is possible. */
7 :
8 : static int
9 0 : fd_vinyl_recover_test( fd_vinyl_io_t * io ) {
10 :
11 0 : uchar * mmio = (uchar *)fd_vinyl_mmio( io );
12 :
13 0 : if( FD_UNLIKELY( !mmio ) ) {
14 0 : FD_LOG_NOTICE(( "bstream io interface type does not support parallel memory mapped io"
15 0 : "\n\tfalling back to serial recovery" ));
16 0 : return FD_VINYL_ERR_INVAL;
17 0 : }
18 :
19 0 : ulong mmio_sz = fd_vinyl_mmio_sz( io );
20 :
21 0 : ulong seq_past = fd_vinyl_io_seq_past ( io );
22 0 : ulong seq_present = fd_vinyl_io_seq_present( io );
23 0 : ulong io_seed = fd_vinyl_io_seed ( io );
24 :
25 : //ulong tstone_req = 0UL;
26 :
27 0 : ulong seq1 = seq_present;
28 0 : while( fd_vinyl_seq_gt( seq1, seq_past ) ) {
29 :
30 : /* At this point, we've tested [seq1,seq_present) is suitable for
31 : parallel recovery. Peek at the block just before seq1. If it is
32 : not a valid partition block, we can't do parallel recovery. */
33 :
34 0 : ulong part_seq = seq1 - FD_VINYL_BSTREAM_BLOCK_SZ;
35 :
36 0 : fd_vinyl_bstream_block_t block[1];
37 :
38 0 : block[0] = *PEEK( part_seq );
39 :
40 0 : char const * _err = fd_vinyl_bstream_part_test( io_seed, part_seq, block ); /* testing changes the block */
41 0 : if( FD_UNLIKELY( _err ) ) {
42 0 : FD_LOG_WARNING(( "bstream past does not have a valid partitioning"
43 0 : "\n\tseq %016lx: %s"
44 0 : "\n\tprevious bstream writers probably did not terminate cleanly"
45 0 : "\n\tfalling back to serial recovery", part_seq, _err ));
46 0 : return FD_VINYL_ERR_CORRUPT;
47 0 : }
48 :
49 : /* We got a valid partition block. Determine the start of this
50 : partition. */
51 :
52 0 : ulong seq0 = block->part.seq0;
53 0 : seq0 = fd_vinyl_seq_gt( seq0, seq_past ) ? seq0 : seq_past;
54 :
55 : # if 0
56 : /* Compute the maximum number of deads the portion of this partition
57 : in the bstream's past that could produce as the lesser the number
58 : of deads reported in the partition and the number of blocks in
59 : the partition. Similarly for move (note that each move makes two
60 : tombstone but also requires at least two blocks ... so moves also
61 : make, at most, 1 tombstone per block on average). */
62 :
63 : ulong part_sz = seq1 - seq0 - FD_VINYL_BSTREAM_BLOCK_SZ; /* exclude trailing part block for below */
64 :
65 : ulong dead_max = fd_ulong_min( block->part.dead_cnt, part_sz );
66 : ulong move_max = fd_ulong_min( block->part.move_cnt, part_sz );
67 :
68 : tstone_req += fd_ulong_min( dead_max + 2UL*move_max, part_sz );
69 : # endif
70 :
71 : /* Move to the previous partition */
72 :
73 0 : seq1 = seq0;
74 0 : }
75 :
76 : /* We seem to have a valid partitioning for parallel recovery */
77 :
78 : # if 0
79 : if( FD_UNLIKELY( tstone_req > tstone_max ) ) {
80 : FD_LOG_WARNING(( "insufficient scratch space for parallel recovery"
81 : "\n\tincrease data cache size"
82 : "\n\tfalling back to serial recovery" ));
83 : return FD_VINYL_ERR_FULL;
84 : }
85 : # endif
86 :
87 0 : return FD_VINYL_SUCCESS;
88 0 : }
89 :
90 : /* fd_vinyl_recover_line_task tests parallel flushes all vinyl
91 : lines and resets the evicition priority sequence. */
92 :
93 0 : static FD_FOR_ALL_BEGIN( fd_vinyl_recover_line_task, 1L ) {
94 0 : fd_vinyl_t * vinyl = (fd_vinyl_t *)arg[0];
95 :
96 0 : fd_vinyl_line_t * line = vinyl->line;
97 0 : ulong line_cnt = vinyl->line_cnt;
98 :
99 0 : ulong line0 = (ulong)block_i0;
100 0 : ulong line1 = (ulong)block_i1;
101 :
102 0 : for( ulong line_idx=line0; line_idx<line1; line_idx++ ) {
103 0 : line[ line_idx ].obj = NULL;
104 0 : line[ line_idx ].ele_idx = ULONG_MAX;
105 0 : line[ line_idx ].ctl = fd_vinyl_line_ctl( 0UL, 0L);
106 0 : line[ line_idx ].line_idx_older = (uint)fd_ulong_if( line_idx!=0UL, line_idx-1UL, line_cnt-1UL );
107 0 : line[ line_idx ].line_idx_newer = (uint)fd_ulong_if( line_idx!=line_cnt-1UL, line_idx+1UL, 0UL );
108 0 : }
109 :
110 0 : } FD_FOR_ALL_END
111 :
112 : /* fd_vinyl_recover_reclaim_task parallel locks all the meta locks,
113 : reclaiming any that were locked from presumably dead writers that
114 : terminated uncleanly. Returns the number of locks reclaimed. */
115 :
116 0 : static FD_MAP_REDUCE_BEGIN( fd_vinyl_recover_reclaim_task, 1L, alignof(ulong), sizeof(ulong), 1UL ) {
117 0 : ulong * _reclaim_cnt = (ulong *) arg[0];
118 0 : fd_vinyl_t * vinyl = (fd_vinyl_t *)arg[1];
119 :
120 0 : ulong * lock = vinyl->meta->lock;
121 :
122 0 : ulong reclaim_cnt = 0UL;
123 :
124 0 : for( long lock_idx=block_i0; lock_idx<block_i1; lock_idx++ ) {
125 0 : # if FD_HAS_ATOMIC
126 0 : ulong l = FD_ATOMIC_FETCH_AND_OR( &lock[ lock_idx ], 1UL );
127 : # else
128 : ulong l = lock[ lock_idx ];
129 : lock[ lock_idx ] |= 1UL;
130 : # endif
131 0 : reclaim_cnt += l & 1UL;
132 0 : }
133 :
134 0 : *_reclaim_cnt = reclaim_cnt;
135 :
136 0 : } FD_MAP_END {
137 :
138 0 : *(ulong *)arg[0] += *(ulong const *)_r1;
139 :
140 0 : } FD_REDUCE_END
141 :
142 : /* fd_vinyl_recover_meta_flush_task tests parallel clears the meta
143 : element storage. Assumes the meta is fully locked. */
144 :
145 0 : static FD_FOR_ALL_BEGIN( fd_vinyl_recover_meta_flush_task, 1L ) {
146 0 : fd_vinyl_t * vinyl = (fd_vinyl_t *)arg[0];
147 :
148 0 : fd_vinyl_meta_ele_t * ele0 = vinyl->meta->ele;
149 :
150 0 : fd_vinyl_meta_ele_t init_ele[1];
151 0 : memset( init_ele, 0, sizeof(fd_vinyl_meta_ele_t) );
152 0 : init_ele->line_idx = ULONG_MAX;
153 :
154 0 : for( long ele_idx=block_i0; ele_idx<block_i1; ele_idx++ ) ele0[ ele_idx ] = init_ele[0];
155 :
156 0 : } FD_FOR_ALL_END
157 :
158 : /* fd_vinyl_recover_unlock_task tests parallel unlocks all the meta
159 : locks. Assumes the meta is fully locked. */
160 :
161 0 : static FD_FOR_ALL_BEGIN( fd_vinyl_recover_unlock_task, 1L ) {
162 0 : fd_vinyl_t * vinyl = (fd_vinyl_t *)arg[0];
163 :
164 0 : ulong * lock = vinyl->meta->lock;
165 :
166 0 : for( long lock_idx=block_i0; lock_idx<block_i1; lock_idx++ ) lock[ lock_idx ]++;
167 :
168 0 : } FD_FOR_ALL_END
169 :
170 : /* fd_vinyl_recover_tstone inserts a tstone for key at seq in the meta
171 : if there isn't anything beyond seq for key already. Returns SUCCESS
172 : on success and FD_VINYL_ERR code on failure. This will update the
173 : pair_cnt, garbage_sz and tstone_cnt counters appropriately. */
174 :
175 : static int
176 : fd_vinyl_recover_tstone( fd_vinyl_meta_t * meta,
177 : fd_vinyl_key_t const * key,
178 : ulong seq,
179 : ulong * _pair_cnt,
180 : ulong * _garbage_sz,
181 0 : ulong * _tstone_cnt ) {
182 :
183 : /* Query meta for key */
184 :
185 0 : fd_vinyl_meta_query_t query[1];
186 :
187 0 : fd_vinyl_meta_prepare( meta, key, NULL, query, FD_MAP_FLAG_BLOCKING );
188 :
189 0 : fd_vinyl_meta_ele_t * ele = fd_vinyl_meta_query_ele( query );
190 :
191 0 : if( FD_UNLIKELY( !ele ) ) {
192 0 : FD_LOG_NOTICE(( "%016lx: increase meta cache size for parallel recovery or corruption", seq ));
193 0 : return FD_VINYL_ERR_FULL;
194 0 : }
195 :
196 0 : if( FD_LIKELY( !ele->phdr.ctl ) ) {
197 :
198 : /* There is no version or tstone for pair key in the meta currently.
199 : Insert a tstone at seq for key so any versions or tstone for pair
200 : key encountered later in parallel recovery can tell if they are
201 : before or after this tstone. Because we don't know if there will
202 : version of key after this, we need to append key to the tstone
203 : array. */
204 :
205 : //pair_cnt unchanged
206 : //garbage_sz unchanged
207 0 : (*_tstone_cnt)++;
208 :
209 0 : ele->memo = fd_vinyl_meta_query_memo( query );
210 0 : ele->phdr.ctl = 1UL;
211 0 : ele->phdr.key = *key;
212 : //ele->phdr.info = d/c
213 0 : ele->line_idx = ULONG_MAX - 1UL; // tstone
214 0 : ele->seq = seq;
215 :
216 0 : fd_vinyl_meta_publish( query );
217 :
218 0 : } else if( FD_LIKELY( fd_vinyl_seq_lt( ele->seq, seq ) ) ) {
219 :
220 : /* The version (or tstone) for pair key in the meta is older than
221 : seq. We append a key to the tstone array if we haven't already. */
222 :
223 0 : int old_ele_is_pair = (ele->line_idx==ULONG_MAX);
224 :
225 0 : (*_pair_cnt) -= (ulong)old_ele_is_pair;
226 0 : (*_garbage_sz) += old_ele_is_pair ? fd_vinyl_bstream_pair_sz( fd_vinyl_bstream_ctl_sz( ele->phdr.ctl ) ) : 0UL;
227 0 : (*_tstone_cnt) += (ulong)old_ele_is_pair;
228 :
229 : //ele->memo = already init
230 : //ele->phdr.ctl = already init
231 : //ele->phdr.key = already init
232 : //ele->phdr.info = d/c
233 0 : ele->line_idx = ULONG_MAX - 1UL; // tstone
234 0 : ele->seq = seq;
235 :
236 0 : fd_vinyl_meta_publish( query );
237 :
238 0 : } else {
239 :
240 : /* The meta entry (pair or tstone) for pair key in the meta is newer
241 : than seq. We can skip this tstone. */
242 :
243 : //pair_cnt unchanged
244 : //garbage_sz unchanged
245 : //tstone_cnt unchanged
246 :
247 0 : int corrupt = fd_vinyl_seq_eq( ele->seq, seq );
248 :
249 0 : fd_vinyl_meta_cancel( query );
250 :
251 0 : if( FD_UNLIKELY( corrupt ) ) {
252 0 : FD_LOG_WARNING(( "%016lx: probable corruption detected", seq ));
253 0 : return FD_VINYL_ERR_CORRUPT;
254 0 : }
255 :
256 0 : }
257 :
258 0 : return FD_VINYL_SUCCESS;
259 0 : }
260 :
261 : /* fd_vinyl_recover_part_task dynamically assigns the partitions of the
262 : bstream's past to threads for recovery and then recovers them in
263 : parallel. The bstream past partition iteration is near identical
264 : to bstream past iteration in serial recovery. See
265 : fd_vinyl_recover_serial.c for more details. */
266 :
267 : /* FIXME: ADD MORE EXTENSIVE DATA INTEGRITY CHECKING LIKE SERIAL IMPL */
268 :
269 0 : static FD_FN_UNUSED FD_MAP_REDUCE_BEGIN( fd_vinyl_recover_part_task, 1UL, alignof(ulong), sizeof(ulong), 4UL ) {
270 0 : ulong * _rlocal = (ulong *) arg[0];
271 0 : fd_vinyl_t * vinyl = (fd_vinyl_t *) arg[1];
272 0 : ulong * _lock = (ulong *) arg[2];
273 :
274 0 : fd_vinyl_io_t * io = vinyl->io;
275 0 : fd_vinyl_meta_t * meta = vinyl->meta;
276 :
277 0 : ulong io_seed = fd_vinyl_io_seed ( io );
278 0 : ulong seq_past = fd_vinyl_io_seq_past( io );
279 0 : uchar * mmio = (uchar *)fd_vinyl_mmio ( io );
280 0 : ulong mmio_sz = fd_vinyl_mmio_sz ( io );
281 :
282 0 : ulong fail = 1UL;
283 0 : ulong pair_cnt = 0UL;
284 0 : ulong garbage_sz = 0UL;
285 0 : ulong tstone_cnt = 0UL;
286 :
287 0 : for(;;) {
288 :
289 : /* Determine the range of the bstream past we should process next. */
290 :
291 0 : ulong seq0;
292 0 : ulong seq1;
293 :
294 : /* Lock and fetch the task assignment cursor */
295 :
296 0 : FD_COMPILER_MFENCE();
297 0 : # if FD_HAS_ATOMIC
298 0 : while( FD_ATOMIC_CAS( _lock, 0UL, 1UL ) ) FD_SPIN_PAUSE();
299 : # else
300 : *_lock = 1UL;
301 : # endif
302 0 : FD_COMPILER_MFENCE();
303 :
304 0 : seq1 = _lock[1];
305 :
306 : /* At this point, the bstream range [seq_past,seq1) has not been
307 : assigned. If seq1 is at seq_past, everything has been assigned
308 : already. Otherwise, the block before cursor is a valid partition
309 : block (as per the test above) and we claim the range:
310 :
311 : [ the older of part_seq0 and seq_past, seq1 )
312 :
313 : to process. */
314 :
315 0 : if( FD_UNLIKELY( fd_vinyl_seq_le( seq1, seq_past ) ) ) seq0 = seq_past;
316 0 : else {
317 0 : fd_vinyl_bstream_block_t const * block = PEEK( seq1 - FD_VINYL_BSTREAM_BLOCK_SZ );
318 0 : seq0 = block->part.seq0;
319 0 : if( fd_vinyl_seq_lt( seq0, seq_past ) ) seq0 = seq_past;
320 0 : }
321 :
322 : /* Update and unlock the task assignment cursor */
323 :
324 0 : _lock[1] = seq0;
325 0 : FD_COMPILER_MFENCE();
326 0 : _lock[0] = 0UL;
327 0 : FD_COMPILER_MFENCE();
328 :
329 0 : if( FD_UNLIKELY( fd_vinyl_seq_le( seq1, seq_past ) ) ) break;
330 :
331 : /* At this point, we need to recover the range [seq0,seq1). */
332 :
333 0 : ulong seq = seq0;
334 0 : while( fd_vinyl_seq_lt( seq, seq1 ) ) {
335 :
336 0 : fd_vinyl_bstream_block_t block[1];
337 :
338 0 : block[0] = *(fd_vinyl_bstream_block_t *)PEEK( seq ); /* testing is destructive */
339 :
340 0 : ulong ctl = block->ctl;
341 :
342 0 : int type = fd_vinyl_bstream_ctl_type( ctl );
343 :
344 0 : switch( type ) {
345 :
346 0 : case FD_VINYL_BSTREAM_CTL_TYPE_PAIR: {
347 :
348 0 : ulong pair_val_esz = fd_vinyl_bstream_ctl_sz( ctl );
349 :
350 0 : ulong pair_sz = fd_vinyl_bstream_pair_sz( pair_val_esz );
351 :
352 0 : if( FD_UNLIKELY( pair_sz > (seq1-seq) ) ) { /* Wrapping safe */
353 0 : FD_LOG_WARNING(( "%016lx: truncated", seq ));
354 0 : goto done;
355 0 : }
356 :
357 0 : fd_vinyl_bstream_block_t ftr[1];
358 :
359 0 : ftr[0] = *PEEK( seq + pair_sz - FD_VINYL_BSTREAM_BLOCK_SZ );
360 :
361 0 : char const * _err = fd_vinyl_bstream_pair_test_fast( io_seed, seq, block, ftr );
362 0 : if( FD_UNLIKELY( _err ) ) {
363 0 : FD_LOG_WARNING(( "%016lx: %s", seq, _err ));
364 0 : goto done;
365 0 : }
366 :
367 : /* At this point, we appear to have valid completely written
368 : pair. Prepare the meta to do an update for this key. */
369 :
370 0 : fd_vinyl_meta_query_t query[1];
371 :
372 0 : fd_vinyl_meta_prepare( meta, &block->phdr.key, NULL, query, FD_MAP_FLAG_BLOCKING );
373 :
374 0 : fd_vinyl_meta_ele_t * ele = fd_vinyl_meta_query_ele( query );
375 :
376 0 : if( FD_UNLIKELY( !ele ) ) {
377 0 : FD_LOG_WARNING(( "%016lx: corruption detected or meta cache too small for parallel recovery", seq ));
378 0 : goto done;
379 0 : }
380 :
381 0 : if( FD_LIKELY( (!ele->phdr.ctl) | fd_vinyl_seq_gt( seq, ele->seq ) ) ) {
382 :
383 0 : pair_cnt++;
384 :
385 : /* At this point, this is the first time any thread has seen
386 : pair key or this version of pair key is newer than the
387 : version (or tstone) of pair key has been seed */
388 :
389 0 : ele->memo = fd_vinyl_meta_query_memo( query );
390 0 : ele->phdr = block->phdr;
391 0 : ele->line_idx = ULONG_MAX; // pair
392 0 : ele->seq = seq;
393 :
394 0 : fd_vinyl_meta_publish( query );
395 :
396 0 : } else {
397 :
398 : /* At this point, this version of pair key is older than the
399 : version (or tstone) for pair key seen by all threads so
400 : far. */
401 :
402 0 : fd_vinyl_meta_cancel( query );
403 :
404 0 : garbage_sz += pair_sz;
405 :
406 0 : }
407 :
408 0 : seq += pair_sz;
409 0 : break;
410 0 : }
411 :
412 0 : case FD_VINYL_BSTREAM_CTL_TYPE_DEAD: {
413 :
414 0 : char const * _err = fd_vinyl_bstream_dead_test( io_seed, seq, block );
415 0 : if( FD_UNLIKELY( _err ) ) {
416 0 : FD_LOG_WARNING(( "%016lx: %s", seq, _err ));
417 0 : goto done;
418 0 : }
419 :
420 0 : int err = fd_vinyl_recover_tstone( meta, &block->dead.phdr.key, seq, &pair_cnt, &garbage_sz, &tstone_cnt );
421 0 : if( FD_UNLIKELY( err ) ) goto done; /* logs details */
422 :
423 0 : garbage_sz += FD_VINYL_BSTREAM_BLOCK_SZ;
424 0 : seq += FD_VINYL_BSTREAM_BLOCK_SZ;
425 0 : break;
426 0 : }
427 :
428 0 : case FD_VINYL_BSTREAM_CTL_TYPE_MOVE: {
429 :
430 0 : if( FD_UNLIKELY( 2UL*FD_VINYL_BSTREAM_BLOCK_SZ > (seq1-seq) ) ) { /* Wrapping safe */
431 0 : FD_LOG_WARNING(( "%016lx: truncated", seq ));
432 0 : goto done;
433 0 : }
434 :
435 0 : fd_vinyl_bstream_block_t dst[1];
436 :
437 0 : dst[0] = *PEEK( seq + FD_VINYL_BSTREAM_BLOCK_SZ );
438 :
439 0 : char const * _err = fd_vinyl_bstream_move_test( io_seed, seq, block, dst );
440 0 : if( FD_UNLIKELY( _err ) ) {
441 0 : FD_LOG_WARNING(( "%016lx: %s", seq, _err ));
442 0 : goto done;
443 0 : }
444 :
445 0 : int err = fd_vinyl_recover_tstone( meta, &block->move.src.key, seq, &pair_cnt, &garbage_sz, &tstone_cnt );
446 0 : if( FD_UNLIKELY( err ) ) goto done; /* logs details */
447 :
448 0 : /**/ err = fd_vinyl_recover_tstone( meta, &block->move.dst, seq, &pair_cnt, &garbage_sz, &tstone_cnt );
449 0 : if( FD_UNLIKELY( err ) ) goto done; /* logs details */
450 :
451 0 : garbage_sz += FD_VINYL_BSTREAM_BLOCK_SZ;
452 0 : seq += FD_VINYL_BSTREAM_BLOCK_SZ;
453 0 : break;
454 0 : }
455 :
456 0 : case FD_VINYL_BSTREAM_CTL_TYPE_PART: {
457 :
458 0 : char const * _err = fd_vinyl_bstream_part_test( io_seed, seq, block );
459 0 : if( FD_UNLIKELY( _err ) ) {
460 0 : FD_LOG_WARNING(( "%016lx: %s", seq, _err ));
461 0 : goto done;
462 0 : }
463 :
464 0 : garbage_sz += FD_VINYL_BSTREAM_BLOCK_SZ;
465 0 : seq += FD_VINYL_BSTREAM_BLOCK_SZ;
466 0 : break;
467 0 : }
468 :
469 0 : case FD_VINYL_BSTREAM_CTL_TYPE_ZPAD: {
470 :
471 0 : char const * _err = fd_vinyl_bstream_zpad_test( io_seed, seq, block );
472 0 : if( FD_UNLIKELY( _err ) ) {
473 0 : FD_LOG_WARNING(( "%016lx: %s", seq, _err ));
474 0 : goto done;
475 0 : }
476 :
477 0 : seq += FD_VINYL_BSTREAM_BLOCK_SZ;
478 0 : break;
479 0 : }
480 :
481 0 : default:
482 0 : FD_LOG_WARNING(( "%016lx: unknown type (%x)", seq, (uint)type ));
483 0 : goto done;
484 :
485 0 : }
486 0 : }
487 :
488 0 : if( FD_UNLIKELY( fd_vinyl_seq_ne( seq, seq1 ) ) ) {
489 0 : FD_LOG_WARNING(( "%016lx: bad partitioning", seq ));
490 0 : goto done;
491 0 : }
492 :
493 0 : }
494 :
495 0 : fail = 0UL;
496 :
497 0 : done:
498 :
499 : /* If we failed, tell all the other threads to not continue by
500 : setting the task assignment cursor to seq_past. */
501 :
502 0 : if( fail ) {
503 0 : FD_COMPILER_MFENCE();
504 0 : # if FD_HAS_ATOMIC
505 0 : while( FD_ATOMIC_CAS( _lock, 0UL, 1UL ) ) FD_SPIN_PAUSE();
506 : # else
507 : *_lock = 1UL;
508 : # endif
509 0 : FD_COMPILER_MFENCE();
510 0 : _lock[1]= seq_past;
511 0 : FD_COMPILER_MFENCE();
512 0 : _lock[0]= 0UL;
513 0 : }
514 :
515 0 : _rlocal[0] = fail;
516 0 : _rlocal[1] = pair_cnt;
517 0 : _rlocal[2] = garbage_sz;
518 0 : _rlocal[3] = tstone_cnt;
519 :
520 0 : } FD_MAP_END {
521 :
522 0 : ulong * _rlocal = (ulong *) arg[0];
523 0 : ulong const * _rremote = (ulong const *)_r1;
524 :
525 0 : _rlocal[0] |= _rremote[0];
526 0 : _rlocal[1] += _rremote[1];
527 0 : _rlocal[2] += _rremote[2];
528 0 : _rlocal[3] += _rremote[3];
529 :
530 0 : } FD_REDUCE_END
531 :
532 0 : static FD_FN_UNUSED FD_MAP_REDUCE_BEGIN( fd_vinyl_recover_meta_cleanup_task, 1L, alignof(ulong), sizeof(ulong), 1UL ) {
533 0 : ulong * _rlocal = (ulong *)arg[0];
534 :
535 0 : fd_vinyl_t * vinyl = (fd_vinyl_t *)arg[1];
536 :
537 0 : fd_vinyl_meta_t * meta = vinyl->meta;
538 :
539 0 : fd_vinyl_meta_ele_t * ele0 = meta->ele;
540 0 : ulong const * lock = meta->lock;
541 0 : int lock_shift = meta->lock_shift;
542 :
543 0 : ulong remove_cnt = 0UL;
544 :
545 0 : for( long ele_idx=block_i0; ele_idx<block_i1; ele_idx++ ) {
546 0 : long lock_idx = ele_idx >> lock_shift;
547 :
548 0 : fd_vinyl_key_t key;
549 0 : int try_remove;
550 :
551 : /* Do a non-blocking query by ele_idx (not be key). We have to do
552 : this direct because this is no standard API for this. This is
553 : highly unlikely to ever block (but theoretically could if the
554 : remove in a different thread has locked a probe chain that
555 : touches elements in this thread). */
556 :
557 0 : for(;;) {
558 0 : FD_COMPILER_MFENCE();
559 0 : ulong ver0 = lock[ lock_idx ];
560 0 : FD_COMPILER_MFENCE();
561 0 : if( FD_LIKELY( !(ver0 & 1UL) ) ) {
562 :
563 0 : try_remove = (!!ele0[ ele_idx ].phdr.ctl) & (ele0[ ele_idx ].line_idx==(ULONG_MAX-1UL));
564 0 : key = ele0[ ele_idx ].phdr.key;
565 :
566 0 : FD_COMPILER_MFENCE();
567 0 : ulong ver1 = lock[ lock_idx ];
568 0 : FD_COMPILER_MFENCE();
569 0 : if( FD_LIKELY( ver0==ver1 ) ) break;
570 0 : }
571 0 : FD_SPIN_PAUSE();
572 0 : }
573 :
574 : /* If try_remove is not set, ele_idx either had no key it in or
575 : had a pair entry. So we continue to the next slot. */
576 :
577 0 : if( FD_LIKELY( !try_remove ) ) continue;
578 :
579 : /* At this point, we observed key had a tstone in the meta above.
580 : So we try to remove it. It is possible (though extremely
581 : unlikely for big sparse maps and the vanilla thread partitioning
582 : here) that a remove on another thread got key first. So it is
583 : okay if this fails. We have to use the parallel version of this
584 : (even if it is highly unlikely to interfere with other threads)
585 : for the same reason we had to use a non-blocking query above. */
586 :
587 0 : fd_vinyl_meta_query_t query[1];
588 0 : remove_cnt += (ulong)!fd_vinyl_meta_remove( meta, &key, query, FD_MAP_FLAG_BLOCKING );
589 0 : }
590 :
591 0 : *_rlocal = remove_cnt;
592 :
593 0 : } FD_MAP_END {
594 :
595 0 : ulong * _rlocal = (ulong *) arg[0];
596 0 : ulong const * _rremote = (ulong const *)_r1;
597 :
598 0 : *_rlocal += *_rremote;
599 :
600 0 : } FD_REDUCE_END
601 :
602 : ulong
603 : fd_vinyl_recover( fd_tpool_t * tpool, ulong t0, ulong t1, int level,
604 0 : fd_vinyl_t * vinyl ) {
605 :
606 0 : fd_vinyl_meta_t * meta = vinyl->meta;
607 0 : ulong line_cnt = vinyl->line_cnt;
608 :
609 0 : ulong ele_max = meta->ele_max;
610 0 : ulong lock_cnt = meta->lock_cnt;
611 :
612 : /* Using all avaialble threads, flush the lines and meta cache. We do
613 : the meta flush locked so we don't confuse any concurrent meta
614 : readers. This will claim any existing locks (e.g. the previous
615 : meta writer died while holding a lock and the user didn't clean it
616 : up before calling this). */
617 :
618 0 : ulong reclaim_cnt;
619 :
620 0 : FD_FOR_ALL ( fd_vinyl_recover_line_task, tpool,t0,t1, 0L,(long)line_cnt, vinyl );
621 0 : FD_MAP_REDUCE( fd_vinyl_recover_reclaim_task, tpool,t0,t1, 0L,(long)lock_cnt, &reclaim_cnt, vinyl );
622 0 : FD_FOR_ALL ( fd_vinyl_recover_meta_flush_task, tpool,t0,t1, 0L,(long)ele_max, vinyl );
623 0 : FD_FOR_ALL ( fd_vinyl_recover_unlock_task, tpool,t0,t1, 0L,(long)lock_cnt, vinyl );
624 :
625 0 : if( FD_UNLIKELY( reclaim_cnt ) ) FD_LOG_WARNING(( "reclaimed %lu locks (dead writer?); attempting to continue", reclaim_cnt ));
626 :
627 : /* FIXME: should this fail if it detects in progress io? */
628 :
629 : /* If there is only 1 thread provided or the bstream past doesn't
630 : have a valid partitioning, use the serial recovery algorithm */
631 :
632 0 : t1 = t0 + 1UL; /* Turn off parallel recovery while it is untested */
633 :
634 0 : if( FD_UNLIKELY( (t1-t0)<=1UL ) ||
635 0 : FD_UNLIKELY( fd_vinyl_recover_test( vinyl->io ) ||
636 0 : !FD_HAS_ATOMIC ) ) {
637 0 : fd_vinyl_data_reset( tpool,t0,t1, level, vinyl->data );
638 0 : return fd_vinyl_recover_serial( vinyl );
639 0 : }
640 :
641 0 : # if FD_HAS_ATOMIC
642 :
643 : /* The parallel recovery of bstream partition may leave tstones in the
644 : meta elements. To clean this up, we have two options.
645 :
646 : Option 1 (simplest and most robust): we parallel scan all the meta
647 : elements in parallel for tstones and remove them. We might have to
648 : do more than one pass because the removal of elements could mean
649 : some elements are not placed well. This requires no scratch (and
650 : thus is more robust against arbitrary erase / move patterns in the
651 : recovery region). While it isn't any less algo inefficient
652 : (because we paralllel scan all the elements already to clear them),
653 : it is pracitcally less efficient for applications access patterns
654 : that don't generate many tombstones and/or have pair_cnt<<pair_max.
655 :
656 : Option 2 (fastest but trickiest): we append the keys that might
657 : have tstones at the end of partition processing in a scratch memory
658 : during the parallel recovery. The vinyl data cache region is huge,
659 : well aligned, not used at this point. So it can handle all but the
660 : most extreme tstone generate application patterns. We can store
661 : either the key directly in the scratch or the location in the
662 : bstream (faster but more scratch efficient) or the bstream seq of
663 : the dead / move that generated the tstone (slower but more scratch
664 : efficient). We further can use the aux information in the
665 : partition to tighly bound the worst case number of tstones required
666 : up front. But this is tricky because the srcatch array needs to
667 : have the partition processing tasks append to it in parallel. So
668 : we either have to use atomic increments in the inner loop (yuck) or
669 : we have to partition the array up front (keeping fingers crossed
670 : that a uniform distribution assumption is valid) and then
671 : concatenate the partitions for parallel processing (yuck) or have
672 : the parallel cleanup processing work with non-compactly stored
673 : scratch (yuck).
674 :
675 : (There is a hybrid option where this tries to do option 2 but if
676 : scratch runs out on any thread, use option 1 to clean up tstones in
677 : meta.)
678 :
679 : We go with the simplest and robust implementation below.
680 :
681 : FIXME: regardless of the above, it is theoretically possible for
682 : the number of used meta elements that need to be tracked
683 : intermediate to exceed meta pair_max even if the final state at
684 : seq_present can be stored in pair_max. We retry with a serial
685 : recovery if parallel recovery fails. */
686 :
687 0 : ulong seq = fd_vinyl_io_seq_present( vinyl->io );
688 :
689 0 : ulong rtmp[4];
690 0 : ulong lock[2];
691 :
692 0 : lock[0] = 0UL;
693 0 : lock[1] = seq;
694 :
695 0 : FD_MAP_REDUCE( fd_vinyl_recover_part_task, tpool,t0,t1, 0L,(long)(t1-t0), rtmp, vinyl, lock );
696 :
697 0 : ulong fail = rtmp[0];
698 0 : if( FD_UNLIKELY( fail ) ) {
699 0 : FD_LOG_WARNING(( "parallel recovery failed; attempting serial recovery" ));
700 :
701 : /* Reset the meta from whatever messy state failed parallel recovery
702 : left it */
703 :
704 0 : FD_MAP_REDUCE( fd_vinyl_recover_reclaim_task, tpool,t0,t1, 0L,(long)lock_cnt, &reclaim_cnt, vinyl );
705 0 : FD_FOR_ALL ( fd_vinyl_recover_meta_flush_task, tpool,t0,t1, 0L,(long)ele_max, vinyl );
706 0 : FD_FOR_ALL ( fd_vinyl_recover_unlock_task, tpool,t0,t1, 0L,(long)lock_cnt, vinyl );
707 :
708 0 : fd_vinyl_data_reset( tpool,t0,t1, level, vinyl->data );
709 :
710 0 : return fd_vinyl_recover_serial( vinyl );
711 0 : }
712 :
713 0 : vinyl->pair_cnt = rtmp[1];
714 0 : vinyl->garbage_sz = rtmp[2];
715 :
716 0 : ulong tstone_rem = rtmp[3];
717 :
718 0 : while( tstone_rem ) {
719 0 : FD_FOR_ALL( fd_vinyl_recover_meta_cleanup_task, tpool,t0,t1, 0L,(long)ele_max, rtmp, vinyl );
720 0 : tstone_rem -= rtmp[0];
721 0 : }
722 :
723 : /* Reset the data cache to clean up any scratch usage (currently none
724 : but no reason to do earlier) */
725 :
726 0 : fd_vinyl_data_reset( tpool,t0,t1, level, vinyl->data );
727 :
728 0 : return seq;
729 :
730 0 : # endif /* FD_HAS_ATOMIC */
731 0 : }
|