Line data Source code
1 0 : case FD_VINYL_REQ_TYPE_MOVE: {
2 :
3 0 : fd_vinyl_key_t const * req_key_src = MAP_REQ_GADDR( req->key_gaddr, fd_vinyl_key_t, batch_cnt );
4 0 : fd_vinyl_key_t const * req_key_dst = MAP_REQ_GADDR( req->val_gaddr_gaddr, fd_vinyl_key_t, batch_cnt );
5 0 : schar * req_err = MAP_REQ_GADDR( req->err_gaddr, schar, batch_cnt );
6 :
7 0 : if( FD_UNLIKELY( (!!batch_cnt) & ((!req_key_src) | (!req_key_dst) | (!req_err)) ) ) {
8 0 : comp_err = FD_VINYL_ERR_INVAL;
9 0 : break;
10 0 : }
11 :
12 0 : for( ulong batch_idx=0UL; batch_idx<batch_cnt; batch_idx++ ) {
13 :
14 0 : # define DONE(err) do { \
15 0 : int _err = (err); \
16 0 : FD_COMPILER_MFENCE(); \
17 0 : req_err[ batch_idx ] = (schar)_err; \
18 0 : FD_COMPILER_MFENCE(); \
19 0 : fail_cnt += (ulong)!!_err; \
20 0 : goto next_move; /* sigh ... can't use continue */ \
21 0 : } while(0)
22 :
23 : /* If the input and output keys are the same, this is a no-op. */
24 :
25 0 : fd_vinyl_key_t const * key_src = req_key_src + batch_idx;
26 0 : fd_vinyl_key_t const * key_dst = req_key_dst + batch_idx;
27 :
28 0 : if( FD_UNLIKELY( fd_vinyl_key_eq( key_src, key_dst ) ) ) DONE( FD_VINYL_SUCCESS );
29 :
30 : /* At this point, key_src and key_dst are distinct. Query meta
31 : for pair key_dst. If key_dst is acquired, fail with again. */
32 :
33 0 : ulong memo_dst = fd_vinyl_key_memo( meta_seed, key_dst );
34 :
35 0 : ulong _ele_idx_dst; /* Avoid pointer escape */
36 0 : int err_dst = fd_vinyl_meta_query_fast( ele0, ele_max, key_dst, memo_dst, &_ele_idx_dst );
37 0 : ulong ele_idx_dst = _ele_idx_dst; /* In [0,ele_max) */
38 :
39 0 : ulong line_idx_dst = ULONG_MAX; /* Fix spurious compiler warning */
40 :
41 0 : if( FD_UNLIKELY( !err_dst ) ) { /* dst exists at bstream seq_present or is being created */
42 :
43 0 : line_idx_dst = ele0[ ele_idx_dst ].line_idx;
44 :
45 0 : if( FD_UNLIKELY( line_idx_dst<line_cnt ) ) { /* dst is in cache */
46 :
47 0 : FD_CRIT( line[ line_idx_dst ].ele_idx==ele_idx_dst, "corruption detected" );
48 :
49 0 : ulong line_ctl_dst = line[ line_idx_dst ].ctl;
50 :
51 0 : long ref_dst = fd_vinyl_line_ctl_ref( line_ctl_dst );
52 :
53 0 : if( FD_UNLIKELY( ref_dst ) ) DONE( FD_VINYL_ERR_AGAIN ); /* dst is acquired */
54 :
55 0 : } else {
56 :
57 0 : FD_CRIT( line_idx_dst==ULONG_MAX, "corruption detected" );
58 :
59 0 : }
60 :
61 0 : }
62 :
63 : /* At this point, pair key dst might exist but is not acquired.
64 : Query meta for key_src. If it doesn't exist (KEY) or is
65 : acquired (AGAIN), fail. Otherwise, if it is not cached, cache
66 : it in the LRU position.
67 :
68 : (Note: if we want to overlap this IO maximally, we would do the
69 : caching of these lines async but then we'd need to be able to
70 : guarantee at least move batch_cnt lines are evictable ... this
71 : gets quite tricky. So we just do blocking I/O here as we know
72 : at least 1 line is evictable.) */
73 :
74 0 : ulong memo_src = fd_vinyl_key_memo( meta_seed, key_src );
75 :
76 0 : ulong _ele_idx_src; /* Avoid pointer escape */
77 0 : int err_src = fd_vinyl_meta_query_fast( ele0, ele_max, key_src, memo_src, &_ele_idx_src );
78 0 : ulong ele_idx_src = _ele_idx_src; /* In [0,ele_max) */
79 :
80 0 : if( FD_UNLIKELY( err_src ) ) DONE( FD_VINYL_ERR_KEY );
81 :
82 0 : ulong val_sz = (ulong)ele0[ ele_idx_src ].phdr.info.val_sz;
83 :
84 0 : FD_CRIT( val_sz<=FD_VINYL_VAL_MAX, "corruption detected" );
85 :
86 0 : ulong seq_src = ele0[ ele_idx_src ].seq;
87 0 : ulong line_idx_src = ele0[ ele_idx_src ].line_idx;
88 :
89 0 : fd_vinyl_data_obj_t * obj_src;
90 0 : fd_vinyl_bstream_phdr_t * phdr_src;
91 :
92 0 : if( FD_LIKELY( line_idx_src<line_cnt ) ) {
93 :
94 0 : ulong line_ctl_src = line[ line_idx_src ].ctl;
95 :
96 0 : long ref_src = fd_vinyl_line_ctl_ref( line_ctl_src );
97 :
98 0 : if( FD_UNLIKELY( ref_src ) ) DONE( FD_VINYL_ERR_AGAIN );
99 :
100 0 : ulong ver_src = fd_vinyl_line_ctl_ver( line_ctl_src );
101 :
102 0 : FD_CRIT( line[ line_idx_src ].ele_idx==ele_idx_src, "corruption detected" );
103 :
104 0 : obj_src = line[ line_idx_src ].obj;
105 :
106 0 : FD_ALERT( fd_vinyl_data_is_valid_obj( obj_src, vol, vol_cnt ), "corruption detected" );
107 0 : FD_CRIT ( obj_src->line_idx==line_idx_src, "corruption detected" );
108 :
109 0 : phdr_src = fd_vinyl_data_obj_phdr( obj_src );
110 :
111 0 : line[ line_idx_src ].ctl = fd_vinyl_line_ctl( ver_src+1UL, 0L );
112 :
113 0 : } else {
114 :
115 0 : FD_CRIT( line_idx_src==ULONG_MAX, "corruption detected" );
116 :
117 : /* Read the encoded pair from the bstream */
118 :
119 0 : ulong ctl = ele0[ ele_idx_src ].phdr.ctl;
120 :
121 0 : int type = fd_vinyl_bstream_ctl_type ( ctl );
122 0 : int style = fd_vinyl_bstream_ctl_style( ctl );
123 0 : ulong val_esz = fd_vinyl_bstream_ctl_sz ( ctl );
124 :
125 0 : FD_CRIT( type==FD_VINYL_BSTREAM_CTL_TYPE_PAIR, "corruption detected" );
126 0 : FD_CRIT( (style==FD_VINYL_BSTREAM_CTL_STYLE_RAW) | (style==FD_VINYL_BSTREAM_CTL_STYLE_LZ4), "corruption detected" );
127 0 : FD_CRIT( val_esz<=FD_VINYL_VAL_MAX, "corruption detected" );
128 :
129 0 : fd_vinyl_data_obj_t * cobj = fd_vinyl_data_alloc( data, fd_vinyl_data_szc( val_esz ) );
130 0 : if( FD_UNLIKELY( !cobj ) ) FD_LOG_CRIT(( "increase data cache size" ));
131 :
132 0 : fd_vinyl_bstream_phdr_t * cphdr = fd_vinyl_data_obj_phdr( cobj );
133 0 : ulong cpair_sz = fd_vinyl_bstream_pair_sz( val_esz );
134 :
135 0 : fd_vinyl_io_read_imm( io, seq_src, cphdr, cpair_sz );
136 : /* not an async read (so no read_cnt increment) */
137 :
138 : /* Verify data integrity */
139 :
140 0 : FD_ALERT( !fd_vinyl_bstream_pair_test( io_seed, seq_src, (fd_vinyl_bstream_block_t *)cphdr, cpair_sz ),
141 0 : "corruption detected" );
142 :
143 : /* Decode the pair */
144 :
145 0 : if( FD_LIKELY( style==FD_VINYL_BSTREAM_CTL_STYLE_RAW ) ) {
146 :
147 0 : FD_CRIT( val_esz==val_sz, "corruption detected" );
148 :
149 0 : obj_src = cobj;
150 0 : phdr_src = cphdr;
151 :
152 0 : } else {
153 :
154 0 : obj_src = fd_vinyl_data_alloc( data, fd_vinyl_data_szc( val_sz ) );
155 0 : if( FD_UNLIKELY( !obj_src ) ) FD_LOG_CRIT(( "increase data cache size" ));
156 :
157 0 : char const * cval = (char const *)fd_vinyl_data_obj_val( cobj );
158 0 : char * val = (char *) fd_vinyl_data_obj_val( obj_src );
159 0 : if( FD_UNLIKELY( (ulong)LZ4_decompress_safe( cval, val, (int)val_esz, (int)val_sz )!=val_sz ) )
160 0 : FD_LOG_CRIT(( "LZ4_decompress_safe failed" ));
161 :
162 0 : phdr_src = fd_vinyl_data_obj_phdr( obj_src );
163 :
164 0 : phdr_src->ctl = fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_PAIR, FD_VINYL_BSTREAM_CTL_STYLE_RAW, val_sz );
165 0 : phdr_src->key = cphdr->key;
166 0 : phdr_src->info = cphdr->info;
167 :
168 0 : fd_vinyl_data_free( data, cobj );
169 :
170 0 : }
171 :
172 0 : line_idx_src = fd_vinyl_line_evict_lru( &vinyl->line_idx_lru, line, line_cnt, ele0, ele_max, data );
173 :
174 0 : ulong line_ctl_src = line[ line_idx_src ].ctl;
175 :
176 0 : ulong ver_src = fd_vinyl_line_ctl_ver( line_ctl_src );
177 :
178 0 : line[ line_idx_src ].obj = obj_src; obj_src->line_idx = line_idx_src; obj_src->rd_active = (short)0;
179 0 : line[ line_idx_src ].ele_idx = ele_idx_src; ele0[ ele_idx_src ].line_idx = line_idx_src;
180 0 : line[ line_idx_src ].ctl = fd_vinyl_line_ctl( ver_src+1UL, 0L );
181 :
182 0 : fd_vinyl_line_evict_prio( &vinyl->line_idx_lru, line, line_cnt, line_idx_src, FD_VINYL_LINE_EVICT_PRIO_LRU );
183 :
184 0 : if( line_idx_src==line_idx_dst ) line_idx_dst = ULONG_MAX; /* Handle evict_lru evicting the dst */
185 :
186 0 : }
187 :
188 : /* At this point, pair key_src is cached but not acquired and pair
189 : key_dst is not acquired. We are clear to move. If pair
190 : key_dst exists, we are replacing pair key_dst with pair
191 : key_src. In this case, we remove pair key_dst from cache and
192 : remove pair key_dst from the meta. This remove might move the
193 : location of pair key_src's meta element. So we reload if
194 : necessary. */
195 :
196 0 : FD_CRIT( fd_vinyl_bstream_ctl_type( phdr_src->ctl )==fd_vinyl_bstream_ctl_type( ele0[ ele_idx_src ].phdr.ctl ),
197 0 : "corruption detected" );
198 0 : FD_CRIT( fd_vinyl_key_eq( &phdr_src->key, &ele0[ ele_idx_src ].phdr.key ), "corruption detected" );
199 0 : FD_CRIT( !memcmp( &phdr_src->info, &ele0[ ele_idx_src ].phdr.info, sizeof(fd_vinyl_info_t) ), "corruption detected" );
200 :
201 0 : accum_garbage_cnt += 2UL; /* old src and new move block */
202 0 : accum_garbage_sz += fd_vinyl_bstream_pair_sz( fd_vinyl_bstream_ctl_sz( ele0[ ele_idx_src ].phdr.ctl ) ) +
203 0 : FD_VINYL_BSTREAM_BLOCK_SZ;
204 :
205 0 : if( FD_UNLIKELY( !err_dst ) ) {
206 :
207 0 : accum_garbage_cnt++; /* old dst */
208 0 : accum_garbage_sz += fd_vinyl_bstream_pair_sz( fd_vinyl_bstream_ctl_sz( ele0[ ele_idx_dst ].phdr.ctl ) );
209 :
210 0 : if( FD_UNLIKELY( line_idx_dst < line_cnt ) ) {
211 :
212 0 : FD_CRIT( line[ line_idx_dst ].ele_idx==ele_idx_dst, "corruption detected" );
213 :
214 0 : fd_vinyl_data_obj_t * obj_dst = line[ line_idx_dst ].obj;
215 :
216 0 : FD_ALERT( fd_vinyl_data_is_valid_obj( obj_dst, vol, vol_cnt ), "corruption detected" );
217 0 : FD_CRIT ( obj_dst->line_idx==line_idx_dst, "corruption detected" );
218 :
219 0 : ulong line_ctl_dst = line[ line_idx_dst ].ctl;
220 :
221 0 : ulong ver_dst = fd_vinyl_line_ctl_ver( line_ctl_dst );
222 :
223 0 : fd_vinyl_data_free( data, obj_dst );
224 :
225 0 : line[ line_idx_dst ].obj = NULL;
226 0 : line[ line_idx_dst ].ele_idx = ULONG_MAX; // ele0[ ele_idx_dst ].line_idx = ULONG_MAX; /* Technically not necessary given below */
227 0 : line[ line_idx_dst ].ctl = fd_vinyl_line_ctl( ver_dst+1UL, 0L );
228 :
229 0 : fd_vinyl_line_evict_prio( &vinyl->line_idx_lru, line, line_cnt, line_idx_dst, FD_VINYL_LINE_EVICT_PRIO_LRU );
230 0 : }
231 :
232 0 : fd_vinyl_meta_remove_fast( ele0, ele_max, lock, lock_shift, line, line_cnt, ele_idx_dst ); /* See note below about atomicity for concurrent meta readers */
233 :
234 0 : ulong pair_cnt = vinyl->pair_cnt;
235 0 : FD_CRIT( pair_cnt, "corruption detected" );
236 0 : vinyl->pair_cnt = pair_cnt - 1UL;
237 :
238 0 : err_src = fd_vinyl_meta_query_fast( ele0, ele_max, key_src, memo_src, &_ele_idx_src );
239 0 : ele_idx_src = _ele_idx_src; /* In [0,ele_max) */
240 0 : FD_CRIT( !err_src, "corruption detected" );
241 : /* Note: could test other fields post move too */
242 :
243 0 : }
244 :
245 : /* At this point, pair key_src is cached but not acquired and pair
246 : key_dst is not cached and not in the meta (the move block that
247 : will official erase if it already exists will be written
248 : below). Update the cached phdr to reflect the move. Remove
249 : the meta entry for pair key_src and insert a meta entry for
250 : pair key_dst.
251 :
252 : Note: this means from the point of view of concurrent meta
253 : queries, there will be a brief time interval when pair key_src
254 : and pair key_dst are both reported as not existing.
255 :
256 : As an alternative with more overhead we could instead insert
257 : the meta element for key_dst, remove the meta element for
258 : key_src and requery meta for key_dst (as the remove could move
259 : it). In this case, there will be a gap where both key_src and
260 : key_dst are both reported as available (and they will point to
261 : the same cache entry during this interval).
262 :
263 : With even more complexity and overhead, we could eliminate the
264 : gap and overhead and make this atomic from the point of view of
265 : concurrent meta readers. (Would have compute a lock set that
266 : cover the target key_dst insert location and the key_src probe
267 : sequence assuming key_dst has been inserted, lock the locks, do
268 : the insert, do the remove without any locking behavior, free
269 : the lock set and then requery where key_dst ended up.) Also
270 : note that, if we are replacing pair key_dst, at this point,
271 : pair key_dst is already reported to concurrent meta readers as
272 : not existing. Would need to extend this to the above.
273 :
274 : But it isn't clear that concurrent meta readers care at all.
275 : So we go with the fast simple method below (it still is atomic
276 : from the point of view of clients and the bstream). */
277 :
278 0 : ulong pair_sz = fd_vinyl_bstream_pair_sz( val_sz );
279 0 : ulong seq_move = fd_vinyl_io_hint( io, FD_VINYL_BSTREAM_BLOCK_SZ + pair_sz );
280 0 : ulong seq_dst = seq_move + FD_VINYL_BSTREAM_BLOCK_SZ;
281 :
282 : //phdr_src->ctl = ... already init
283 0 : phdr_src->key = *key_dst;
284 : //phdr_src->info = ... already init
285 :
286 0 : fd_vinyl_meta_remove_fast( ele0, ele_max, lock, lock_shift, line, line_cnt, ele_idx_src );
287 :
288 0 : err_dst = fd_vinyl_meta_query_fast( ele0, ele_max, key_dst, memo_dst, &_ele_idx_dst );
289 0 : ele_idx_dst = _ele_idx_dst; /* In [0,ele_max) */
290 :
291 0 : FD_CRIT( err_dst==FD_VINYL_ERR_KEY, "corruption detected" );
292 :
293 0 : ele0[ ele_idx_dst ].memo = memo_dst;
294 : //ele0[ ele_idx_dst ].phdr.ctl = ... init below for concurrent safe insert
295 0 : ele0[ ele_idx_dst ].phdr.key = phdr_src->key;
296 0 : ele0[ ele_idx_dst ].phdr.info = phdr_src->info;
297 0 : ele0[ ele_idx_dst ].line_idx = line_idx_src;
298 0 : ele0[ ele_idx_dst ].seq = seq_dst;
299 :
300 0 : FD_COMPILER_MFENCE();
301 0 : ele0[ ele_idx_dst ].phdr.ctl = phdr_src->ctl;
302 0 : FD_COMPILER_MFENCE();
303 :
304 0 : line[ line_idx_src ].ele_idx = ele_idx_dst;
305 :
306 0 : fd_vinyl_io_append_move( io, phdr_src, key_dst, NULL, 0UL );
307 0 : append_cnt++;
308 0 : accum_move_cnt++;
309 :
310 0 : fd_vinyl_bstream_pair_hash( io_seed, (fd_vinyl_bstream_block_t *)phdr_src );
311 :
312 0 : ulong seq = fd_vinyl_io_append( io, phdr_src, pair_sz );
313 0 : append_cnt++;
314 0 : FD_CRIT( fd_vinyl_seq_eq( seq, seq_dst ), "unexpected append location" );
315 :
316 0 : DONE( FD_VINYL_SUCCESS );
317 :
318 0 : next_move: /* silly language restriction */;
319 :
320 0 : # undef DONE
321 :
322 0 : }
323 :
324 0 : comp_err = FD_VINYL_SUCCESS;
325 0 : break;
326 0 : }
|