Line data Source code
1 0 : case FD_VINYL_REQ_TYPE_ACQUIRE: {
2 0 : ulong req_flags = (ulong)req->flags;
3 0 : fd_vinyl_key_t const * req_key = MAP_REQ_GADDR( req->key_gaddr, fd_vinyl_key_t, batch_cnt );
4 0 : ulong * req_val_gaddr = MAP_REQ_GADDR( req->val_gaddr_gaddr, ulong, batch_cnt );
5 0 : schar * req_err = MAP_REQ_GADDR( req->err_gaddr, schar, batch_cnt );
6 :
7 0 : int req_flag_modify = fd_vinyl_req_flag_modify( req_flags );
8 0 : int req_flag_ignore = fd_vinyl_req_flag_ignore( req_flags );
9 0 : int req_flag_create = fd_vinyl_req_flag_create( req_flags );
10 0 : int req_flag_excl = fd_vinyl_req_flag_excl ( req_flags );
11 0 : int req_evict_prio = fd_vinyl_req_evict_prio ( req_flags );
12 :
13 0 : int bad_gaddr = (!!batch_cnt) & ((!req_key) | (!req_val_gaddr) | (!req_err));
14 0 : int bad_quota = quota_rem<batch_cnt;
15 :
16 0 : if( FD_UNLIKELY( bad_gaddr | bad_quota ) ) {
17 0 : comp_err = bad_gaddr ? FD_VINYL_ERR_INVAL : FD_VINYL_ERR_FULL;
18 0 : break;
19 0 : }
20 :
21 0 : for( ulong batch_idx=0UL; batch_idx<batch_cnt; batch_idx++ ) {
22 :
23 0 : # define DONE(err) do { \
24 0 : int _err = (err); \
25 0 : FD_COMPILER_MFENCE(); \
26 0 : req_err[ batch_idx ] = (schar)_err; \
27 0 : FD_COMPILER_MFENCE(); \
28 0 : quota_rem -= (ulong) !_err; \
29 0 : fail_cnt += (ulong)!!_err; \
30 0 : goto next_acquire; /* sigh ... can't use continue */ \
31 0 : } while(0)
32 :
33 0 : ulong req_val_max = 0UL;
34 0 : if( req_flag_modify ) {
35 0 : req_val_max = req_val_gaddr[ batch_idx ];
36 0 : if( FD_UNLIKELY( req_val_max>FD_VINYL_VAL_MAX ) ) DONE( FD_VINYL_ERR_INVAL );
37 0 : }
38 :
39 : /* Query vinyl meta for key */
40 :
41 0 : fd_vinyl_key_t const * key = req_key + batch_idx;
42 :
43 0 : ulong memo = fd_vinyl_key_memo( meta_seed, key );
44 :
45 0 : ulong _ele_idx; /* avoid pointer escape */
46 0 : int err = fd_vinyl_meta_query_fast( ele0, ele_max, key, memo, &_ele_idx );
47 0 : ulong ele_idx = _ele_idx; /* In [0,ele_max) */
48 :
49 0 : if( FD_LIKELY( !err ) ) { /* pair key meta cached */
50 :
51 : /* At this point, pair key either exists at bstream seq_present
52 : or is in the process of being created. If pair key is being
53 : created, fail with AGAIN (it must be acquired for modify). */
54 :
55 0 : ulong pair_ctl = ele0[ ele_idx ].phdr.ctl;
56 :
57 0 : FD_CRIT( (fd_vinyl_bstream_ctl_type( pair_ctl )==FD_VINYL_BSTREAM_CTL_TYPE_PAIR) | (pair_ctl==ULONG_MAX),
58 0 : "corruption detected" );
59 :
60 0 : if( FD_UNLIKELY( pair_ctl==ULONG_MAX ) ) DONE( FD_VINYL_ERR_AGAIN );
61 :
62 : /* At this point, pair key exists at bstream seq_present. */
63 :
64 0 : ulong val_sz = (ulong)ele0[ ele_idx ].phdr.info.val_sz;
65 0 : ulong line_idx = ele0[ ele_idx ].line_idx;
66 :
67 0 : FD_CRIT( val_sz<=FD_VINYL_VAL_MAX, "corruption detected" );
68 0 : FD_CRIT( (line_idx<line_cnt) | (line_idx==ULONG_MAX), "corruption detected" );
69 :
70 0 : if( FD_LIKELY( line_idx<line_cnt ) ) {
71 :
72 : /* At this point, pair key is cached. Get the cache info for
73 : line line_idx. */
74 :
75 0 : accum_cache_hit++;
76 :
77 0 : FD_CRIT( line[ line_idx ].ele_idx==ele_idx, "corruption detected" );
78 :
79 0 : fd_vinyl_data_obj_t * obj = line[ line_idx ].obj;
80 :
81 0 : FD_ALERT( fd_vinyl_data_is_valid_obj( obj, vol, vol_cnt ), "corruption detected" );
82 0 : FD_CRIT ( obj->line_idx==line_idx, "corruption detected" );
83 :
84 0 : ulong line_ctl = line[ line_idx ].ctl;
85 :
86 0 : ulong ver = fd_vinyl_line_ctl_ver( line_ctl );
87 0 : long ref = fd_vinyl_line_ctl_ref( line_ctl );
88 :
89 0 : if( FD_LIKELY( !req_flag_modify ) ) {
90 :
91 : /* At this point, we are acquiring a cached pair for read.
92 : If the line is acquired for modify, fail with AGAIN. If
93 : there are too many acquires for read on this pair, CRIT
94 : (could consider AGAIN here). Otherwise, we update the
95 : ref count (don't change the ver), point the client at the
96 : line caching pair key to finish the acquire. Note that
97 : we don't validate the pair header if we detect that an
98 : earlier acquire in this batch started fetching the pair
99 : because the read might still be in progress (see note
100 : below for more details). */
101 :
102 0 : if( FD_UNLIKELY( ref<0L ) ) DONE( FD_VINYL_ERR_AGAIN );
103 0 : if( FD_UNLIKELY( ref>=FD_VINYL_LINE_REF_MAX ) ) FD_LOG_CRIT(( "too many acquires for read on this pair" ));
104 :
105 0 : if( FD_LIKELY( !obj->rd_active ) ) {
106 0 : fd_vinyl_bstream_phdr_t * phdr = fd_vinyl_data_obj_phdr( obj );
107 :
108 0 : FD_CRIT( fd_vinyl_data_obj_val_max( obj ) >= val_sz, "corruption detected" );
109 0 : FD_CRIT( phdr->ctl==fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_PAIR,
110 0 : FD_VINYL_BSTREAM_CTL_STYLE_RAW, val_sz ), "corruption detected" );
111 0 : FD_CRIT( fd_vinyl_key_eq( &phdr->key, key ), "corruption detected" );
112 0 : FD_CRIT( !memcmp( &phdr->info, &ele0[ ele_idx ].phdr.info, sizeof(fd_vinyl_info_t) ), "corruption detected" );
113 0 : }
114 :
115 0 : line[ line_idx ].ctl = fd_vinyl_line_ctl( ver, ref+1L ); /* don't bump ver */
116 :
117 0 : req_val_gaddr[ batch_idx ] = (ulong)fd_vinyl_data_obj_val( obj ) - data_laddr0;
118 :
119 0 : DONE( FD_VINYL_SUCCESS );
120 :
121 0 : }
122 :
123 : /* At this point, we are acquiring a cached pair for modify.
124 : If we are not allowed to acquire an existing pair for
125 : modify (INVAL) or if the line line_idx is already acquired
126 : for anything (AGAIN), fail. */
127 :
128 0 : if( FD_UNLIKELY( ref ) ) DONE( FD_VINYL_ERR_AGAIN );
129 0 : if( FD_UNLIKELY( req_flag_excl ) ) DONE( FD_VINYL_ERR_INVAL );
130 :
131 0 : fd_vinyl_bstream_phdr_t * phdr = fd_vinyl_data_obj_phdr( obj );
132 :
133 0 : FD_CRIT( !obj->rd_active, "corruption detected" );
134 0 : FD_CRIT( fd_vinyl_data_obj_val_max( obj ) >= val_sz, "corruption detected" );
135 0 : FD_CRIT( phdr->ctl==fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_PAIR,
136 0 : FD_VINYL_BSTREAM_CTL_STYLE_RAW, val_sz ), "corruption detected" );
137 0 : FD_CRIT( fd_vinyl_key_eq( &phdr->key, key ), "corruption detected" );
138 0 : FD_CRIT( !memcmp( &phdr->info, &ele0[ ele_idx ].phdr.info, sizeof(fd_vinyl_info_t) ), "corruption detected" );
139 :
140 : /* If the ignore flag is set, set the cached value size to 0. */
141 :
142 0 : if( req_flag_ignore ) {
143 0 : phdr->info.val_sz = 0U;
144 0 : val_sz = 0UL;
145 0 : }
146 :
147 : /* If the current location for the pair key's data isn't
148 : sufficient to hold the worst case val_sz that the client
149 : might modify the pair's value into, adjust the space
150 : available for the pair to the user's val_max. Because we
151 : might be ignoring the existing value, this could be smaller
152 : than the current object. (We could chose to not trim in
153 : this case because it will get trimmed again on release.
154 : But doing so makes a more consistent guarantee to the
155 : client and makes testing easier.) */
156 :
157 0 : ulong csz = sizeof(fd_vinyl_bstream_phdr_t) + val_sz;
158 :
159 0 : ulong szc_new = fd_vinyl_data_szc( fd_ulong_max( val_sz, req_val_max ) );
160 0 : ulong szc_old = (ulong)obj->szc;
161 :
162 0 : if( FD_UNLIKELY( szc_new != szc_old ) ) {
163 :
164 0 : fd_vinyl_data_obj_t * obj_new = fd_vinyl_data_alloc( data, szc_new );
165 0 : if( FD_UNLIKELY( !obj_new ) ) FD_LOG_CRIT(( "increase data cache size" ));
166 :
167 0 : fd_vinyl_bstream_phdr_t * phdr_new = fd_vinyl_data_obj_phdr( obj_new );
168 :
169 0 : memcpy( phdr_new, phdr, csz );
170 :
171 0 : fd_vinyl_data_free( data, obj );
172 :
173 0 : phdr = phdr_new;
174 0 : obj = obj_new;
175 :
176 0 : line[ line_idx ].obj = obj; obj->line_idx = line_idx; obj->rd_active = (short)0;
177 0 : }
178 :
179 : /* Zero out any remaining space in the pair. */
180 :
181 0 : ulong zsz = fd_vinyl_bstream_pair_sz( fd_vinyl_data_szc_val_max( szc_new ) ) - csz;
182 0 : memset( ((uchar *)phdr) + csz, 0, zsz );
183 :
184 : /* Finish up acquiring for modify */
185 :
186 : //line[ line_idx ].obj = ... already init;
187 : //line[ line_idx ].ele_idx = ... already init;
188 0 : line[ line_idx ].ctl = fd_vinyl_line_ctl( ver+1UL, -1L ); /* bump ver */
189 :
190 0 : fd_vinyl_line_evict_prio( &vinyl->line_idx_lru, line, line_cnt, line_idx, req_evict_prio );
191 :
192 : //phdr->ctl = ... already init
193 : //phdr->key = ... already init
194 : //phdr->info = ... already init
195 :
196 : //ele0[ ele_idx ] = ... already init
197 :
198 0 : req_val_gaddr[ batch_idx ] = (ulong)fd_vinyl_data_obj_val( obj ) - data_laddr0;
199 :
200 0 : DONE( FD_VINYL_SUCCESS );
201 :
202 0 : } /* pair key data cached */
203 :
204 : /* At this point, pair key is not cached. If we are not allowed
205 : to acquire this pair, fail. Otherwise, evict the least
206 : recently used evictable line (this should always be possible
207 : if quotas are confiured correctly) to make room to cache this
208 : pair. Connect this line to meta element ele_idx, set the
209 : line's reference count appropriately, bump the line's version
210 : and move the line to the desired location in the eviction
211 : sequence. We don't modify any shared fields in meta element
212 : ele_idx so we can do the modification fast.
213 :
214 : We do this upfront to free data cache for the alloc if the
215 : LRU line is in use and to handle the same pair appearing
216 : multiple times in an acquire.
217 :
218 : That is, if req_key appears multiple times in an acquire to
219 : modify, the trailing redundant acquires will see the object
220 : as cached with ref==-1 and fail with AGAIN. If the key
221 : appears multiple times in an acquire for read, the trailing
222 : redundant acquires will see the object as cached with ref>0
223 : and rd_active==1, conclude that the first redundant acquire
224 : is in the process of reading the pair into cache, skip any
225 : racy metadata checks, increase the ref count and succeed.
226 :
227 : IMPORTANT SAFETY TIP! Note that this implies that client
228 : doing an acquire-for-read with redundant keys and with
229 : speculative processing will see req_err transition to success
230 : for the trailing redundant items for a key before the leading
231 : item of that key transitions to success (and thus before the
232 : object is fully read / verified and/or decoded). It is up to
233 : the client doing speculative cut through processing to avoid
234 : redundant keys or react accordingly. */
235 :
236 0 : if( FD_UNLIKELY( req_flag_modify & req_flag_excl ) ) DONE( FD_VINYL_ERR_INVAL );
237 :
238 0 : line_idx = fd_vinyl_line_evict_lru( &vinyl->line_idx_lru, line, line_cnt, ele0, ele_max, data );
239 :
240 0 : ulong line_ctl = line[ line_idx ].ctl;
241 :
242 0 : ulong ver = fd_vinyl_line_ctl_ver( line_ctl );
243 :
244 0 : line[ line_idx ].ele_idx = ele_idx; ele0[ ele_idx ].line_idx = line_idx;
245 0 : line[ line_idx ].ctl = fd_vinyl_line_ctl( ver+1UL, req_flag_modify ? -1L : 1L );
246 :
247 0 : fd_vinyl_line_evict_prio( &vinyl->line_idx_lru, line, line_cnt, line_idx, req_evict_prio );
248 :
249 : /* Allocate an appropriately sized object to hold this pair,
250 : connect it to this line and report the location to the client. */
251 :
252 0 : ulong val_max = fd_ulong_if( !req_flag_modify, val_sz,
253 0 : fd_ulong_if( !req_flag_ignore, fd_ulong_max( val_sz, req_val_max ),
254 0 : req_val_max ) );
255 :
256 0 : ulong szc = fd_vinyl_data_szc( val_max );
257 :
258 0 : fd_vinyl_data_obj_t * obj = fd_vinyl_data_alloc( data, szc );
259 0 : if( FD_UNLIKELY( !obj ) ) FD_LOG_CRIT(( "increase data cache size" ));
260 :
261 0 : line[ line_idx ].obj = obj; obj->line_idx = line_idx;
262 :
263 0 : void * val = fd_vinyl_data_obj_val( obj );
264 :
265 0 : req_val_gaddr[ batch_idx ] = (ulong)val - data_laddr0;
266 :
267 : /* If we need to do I/O, start reading encoded pair data and
268 : defer the data integrity and decoding to later (and then in
269 : whatever order the I/O layer sees fit). */
270 :
271 0 : if( FD_LIKELY( !(req_flag_modify & req_flag_ignore) ) ) {
272 0 : obj->rd_active = (short)1;
273 :
274 0 : int style = fd_vinyl_bstream_ctl_style( pair_ctl );
275 0 : ulong val_esz = fd_vinyl_bstream_ctl_sz ( pair_ctl );
276 :
277 0 : FD_CRIT( val_esz<=FD_VINYL_VAL_MAX, "corruption detected" );
278 0 : FD_CRIT( (style!=FD_VINYL_BSTREAM_CTL_STYLE_RAW) | (val_sz==val_esz), "corruption detected" );
279 :
280 0 : fd_vinyl_data_obj_t * cobj;
281 :
282 0 : if( FD_LIKELY( style==FD_VINYL_BSTREAM_CTL_STYLE_RAW ) ) cobj = obj;
283 0 : else {
284 0 : cobj = fd_vinyl_data_alloc( data, fd_vinyl_data_szc( val_esz ) );
285 0 : if( FD_UNLIKELY( !cobj ) ) FD_LOG_CRIT(( "increase data cache size" ));
286 0 : }
287 :
288 0 : cobj->rd->ctx = (ulong)obj;
289 0 : cobj->rd->seq = ele0[ ele_idx ].seq;
290 0 : cobj->rd->dst = fd_vinyl_data_obj_phdr( cobj );
291 0 : cobj->rd->sz = fd_vinyl_bstream_pair_sz( val_esz );
292 :
293 0 : cobj->rd_err = req_err + batch_idx;
294 :
295 0 : fd_vinyl_io_read( io, cobj->rd );
296 0 : read_cnt++;
297 :
298 0 : quota_rem--;
299 0 : goto next_acquire;
300 0 : }
301 :
302 : /* At this point, we are acquiring to modify but we don't need
303 : the existing value. We populate the cached pair header
304 : appropriately for the modify and zero the rest to complete
305 : this request immediately. */
306 :
307 0 : obj->rd_active = (short)0;
308 :
309 0 : fd_vinyl_bstream_phdr_t * phdr = fd_vinyl_data_obj_phdr( obj );
310 :
311 0 : phdr->ctl = fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_PAIR, FD_VINYL_BSTREAM_CTL_STYLE_RAW, val_sz );
312 0 : phdr->key = *key;
313 0 : phdr->info = ele0[ ele_idx ].phdr.info;
314 :
315 0 : phdr->info.val_sz = 0U;
316 :
317 0 : memset( val, 0, fd_vinyl_data_szc_obj_footprint( szc ) - sizeof(fd_vinyl_data_obj_t) - sizeof(fd_vinyl_bstream_phdr_t) );
318 :
319 0 : DONE( FD_VINYL_SUCCESS );
320 :
321 0 : } /* pair key meta cached */
322 :
323 : /* At this point, pair key does not exist at bstream seq_present
324 : and is not in the process of being created. If we aren't
325 : allowed to create pair key, fail. Otherwise, evict the least
326 : recently used evictable line (this should always be possible if
327 : quotas are configured correctly) to make room to cache this
328 : pair, set the line's reference count appropriately, bump the
329 : version and move the line to the desired location in the
330 : eviction sequence. We do this upfront to free data cache for
331 : the alloc if the LRU line is in use. */
332 :
333 0 : if( FD_UNLIKELY( !(req_flag_modify & req_flag_create) ) ) DONE( FD_VINYL_ERR_KEY );
334 :
335 0 : ulong line_idx = fd_vinyl_line_evict_lru( &vinyl->line_idx_lru, line, line_cnt, ele0, ele_max, data );
336 :
337 0 : ulong line_ctl = line[ line_idx ].ctl;
338 :
339 0 : ulong ver = fd_vinyl_line_ctl_ver( line_ctl );
340 :
341 0 : line[ line_idx ].ctl = fd_vinyl_line_ctl( ver+1UL, -1L );
342 :
343 0 : fd_vinyl_line_evict_prio( &vinyl->line_idx_lru, line, line_cnt, line_idx, req_evict_prio );
344 :
345 : /* Allocate an appropriately sized object to hold this pair and
346 : connect it to this line. */
347 :
348 0 : ulong szc = fd_vinyl_data_szc( req_val_max );
349 :
350 0 : fd_vinyl_data_obj_t * obj = fd_vinyl_data_alloc( data, szc );
351 0 : if( FD_UNLIKELY( !obj ) ) FD_LOG_CRIT(( "increase data cache size" ));
352 :
353 0 : line[ line_idx ].obj = obj; obj->line_idx = line_idx; obj->rd_active = (short)0;
354 :
355 : /* Allocate a meta element to hold metadata for this pair and
356 : connect it to this line. Since we are inserting at meta
357 : element ele_idx, we don't need to lock anything so long as we
358 : mark the element as in use very last. */
359 :
360 0 : ulong pair_cnt = vinyl->pair_cnt;
361 0 : if( FD_UNLIKELY( pair_cnt>=pair_max ) ) FD_LOG_CRIT(( "increase meta cache size" ));
362 0 : vinyl->pair_cnt = pair_cnt + 1UL;
363 :
364 0 : ele0[ ele_idx ].memo = memo;
365 : //ele0[ ele_idx ].phdr.ctl init below
366 0 : ele0[ ele_idx ].phdr.key = *key;
367 0 : memset( &ele0[ ele_idx ].phdr.info, 0, sizeof(fd_vinyl_info_t) ); /* sets val_sz to 0 */
368 0 : ele0[ ele_idx ].line_idx = line_idx;
369 0 : ele0[ ele_idx ].seq = 0UL; /* Will be init on release */
370 :
371 0 : FD_COMPILER_MFENCE();
372 0 : ele0[ ele_idx ].phdr.ctl = ULONG_MAX; /* Mark as being created */
373 0 : FD_COMPILER_MFENCE();
374 :
375 0 : line[ line_idx ].ele_idx = ele_idx;
376 :
377 : /* Initialize the data region for a new pair */
378 :
379 0 : *fd_vinyl_data_obj_phdr( obj ) = ele0[ ele_idx ].phdr;
380 :
381 0 : uchar * val = (uchar *)fd_vinyl_data_obj_val( obj );
382 :
383 0 : memset( val, 0, fd_vinyl_data_szc_obj_footprint( szc ) - sizeof(fd_vinyl_data_obj_t) - sizeof(fd_vinyl_bstream_phdr_t) );
384 :
385 0 : req_val_gaddr[ batch_idx ] = (ulong)val - data_laddr0;
386 :
387 0 : DONE( FD_VINYL_SUCCESS );
388 :
389 0 : next_acquire: /* silly language restriction */;
390 :
391 0 : # undef DONE
392 :
393 0 : } /* for batch_idx */
394 :
395 0 : FD_CRIT( (!read_cnt) | (!(req_flag_modify & req_flag_ignore)), "corruption detected" );
396 :
397 0 : comp_err = FD_VINYL_SUCCESS;
398 0 : break;
399 0 : }
|