Line data Source code
1 0 : case FD_VINYL_REQ_TYPE_ACQUIRE: {
2 0 : ulong req_flags = (ulong)req->flags;
3 0 : fd_vinyl_key_t const * req_key = MAP_REQ_GADDR( req->key_gaddr, fd_vinyl_key_t, batch_cnt );
4 0 : ulong * req_val_gaddr = MAP_REQ_GADDR( req->val_gaddr_gaddr, ulong, batch_cnt );
5 0 : schar * req_err = MAP_REQ_GADDR( req->err_gaddr, schar, batch_cnt );
6 :
7 0 : int req_flag_modify = fd_vinyl_req_flag_modify( req_flags );
8 0 : int req_flag_ignore = fd_vinyl_req_flag_ignore( req_flags );
9 0 : int req_flag_create = fd_vinyl_req_flag_create( req_flags );
10 0 : int req_flag_excl = fd_vinyl_req_flag_excl ( req_flags );
11 0 : int req_evict_prio = fd_vinyl_req_evict_prio ( req_flags );
12 :
13 0 : int bad_gaddr = (!!batch_cnt) & ((!req_key) | (!req_val_gaddr) | (!req_err));
14 0 : int bad_quota = quota_rem<batch_cnt;
15 :
16 0 : if( FD_UNLIKELY( bad_gaddr | bad_quota ) ) {
17 0 : comp_err = bad_gaddr ? FD_VINYL_ERR_INVAL : FD_VINYL_ERR_FULL;
18 0 : break;
19 0 : }
20 :
21 0 : for( ulong batch_idx=0UL; batch_idx<batch_cnt; batch_idx++ ) {
22 :
23 0 : # define DONE(err) do { \
24 0 : int _err = (err); \
25 0 : FD_COMPILER_MFENCE(); \
26 0 : req_err[ batch_idx ] = (schar)_err; \
27 0 : FD_COMPILER_MFENCE(); \
28 0 : quota_rem -= (ulong) !_err; \
29 0 : fail_cnt += (ulong)!!_err; \
30 0 : goto next_acquire; /* sigh ... can't use continue */ \
31 0 : } while(0)
32 :
33 0 : ulong req_val_max = 0UL;
34 0 : if( req_flag_modify ) {
35 0 : req_val_max = req_val_gaddr[ batch_idx ];
36 0 : if( FD_UNLIKELY( req_val_max>FD_VINYL_VAL_MAX ) ) DONE( FD_VINYL_ERR_INVAL );
37 0 : }
38 :
39 : /* Query vinyl meta for key */
40 :
41 0 : fd_vinyl_key_t const * key = req_key + batch_idx;
42 :
43 0 : ulong memo = fd_vinyl_key_memo( meta_seed, key );
44 :
45 0 : ulong _ele_idx; /* avoid pointer escape */
46 0 : int err = fd_vinyl_meta_query_fast( ele0, ele_max, key, memo, &_ele_idx );
47 0 : ulong ele_idx = _ele_idx; /* In [0,ele_max) */
48 :
49 0 : if( FD_LIKELY( !err ) ) { /* pair key meta cached */
50 :
51 : /* At this point, pair key either exists at bstream seq_present
52 : or is in the process of being created. If pair key is being
53 : created, fail with AGAIN (it must be acquired for modify). */
54 :
55 0 : ulong pair_ctl = ele0[ ele_idx ].phdr.ctl;
56 :
57 0 : FD_DCHECK_CRIT( (fd_vinyl_bstream_ctl_type( pair_ctl )==FD_VINYL_BSTREAM_CTL_TYPE_PAIR) | (pair_ctl==ULONG_MAX),
58 0 : "corruption detected" );
59 :
60 0 : if( FD_UNLIKELY( pair_ctl==ULONG_MAX ) ) DONE( FD_VINYL_ERR_AGAIN );
61 :
62 : /* At this point, pair key exists at bstream seq_present. */
63 :
64 0 : ulong val_sz = (ulong)ele0[ ele_idx ].phdr.info.val_sz;
65 0 : ulong line_idx = ele0[ ele_idx ].line_idx;
66 :
67 0 : FD_DCHECK_CRIT( val_sz<=FD_VINYL_VAL_MAX, "corruption detected" );
68 0 : FD_DCHECK_CRIT( (line_idx<line_cnt) | (line_idx==ULONG_MAX), "corruption detected" );
69 :
70 0 : if( FD_LIKELY( line_idx<line_cnt ) ) {
71 :
72 : /* At this point, pair key is cached. Get the cache info for
73 : line line_idx. */
74 :
75 0 : FD_DCHECK_CRIT( line[ line_idx ].ele_idx==ele_idx, "corruption detected" );
76 :
77 0 : fd_vinyl_data_obj_t * obj = line[ line_idx ].obj;
78 :
79 0 : FD_DCHECK_ALERT( fd_vinyl_data_is_valid_obj( obj, vol, vol_cnt ), "corruption detected" );
80 0 : FD_DCHECK_CRIT ( obj->line_idx==line_idx, "corruption detected" );
81 :
82 0 : ulong line_ctl = line[ line_idx ].ctl;
83 :
84 0 : ulong ver = fd_vinyl_line_ctl_ver( line_ctl );
85 0 : long ref = fd_vinyl_line_ctl_ref( line_ctl );
86 :
87 0 : if( FD_LIKELY( !req_flag_modify ) ) {
88 :
89 : /* At this point, we are acquiring a cached pair for read.
90 : If the line is acquired for modify, fail with AGAIN. If
91 : there are too many acquires for read on this pair, CRIT
92 : (could consider AGAIN here). Otherwise, we update the
93 : ref count (don't change the ver), point the client at the
94 : line caching pair key to finish the acquire. Note that
95 : we don't validate the pair header if we detect that an
96 : earlier acquire in this batch started fetching the pair
97 : because the read might still be in progress (see note
98 : below for more details). */
99 :
100 0 : if( FD_UNLIKELY( ref<0L ) ) DONE( FD_VINYL_ERR_AGAIN );
101 0 : if( FD_UNLIKELY( ref>=FD_VINYL_LINE_REF_MAX ) ) FD_LOG_CRIT(( "too many acquires for read on this pair" ));
102 :
103 0 : if( FD_LIKELY( !obj->rd_active ) ) {
104 0 : fd_vinyl_bstream_phdr_t * phdr = fd_vinyl_data_obj_phdr( obj );
105 :
106 0 : FD_DCHECK_CRIT( fd_vinyl_data_obj_val_max( obj ) >= val_sz, "corruption detected" );
107 0 : FD_DCHECK_CRIT( phdr->ctl==fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_PAIR,
108 0 : FD_VINYL_BSTREAM_CTL_STYLE_RAW, val_sz ), "corruption detected" );
109 0 : FD_DCHECK_CRIT( fd_vinyl_key_eq( &phdr->key, key ), "corruption detected" );
110 0 : FD_DCHECK_CRIT( !memcmp( &phdr->info, &ele0[ ele_idx ].phdr.info, sizeof(fd_vinyl_info_t) ), "corruption detected" );
111 0 : }
112 :
113 0 : line[ line_idx ].ctl = fd_vinyl_line_ctl( ver, ref+1L ); /* don't bump ver */
114 :
115 0 : req_val_gaddr[ batch_idx ] = (ulong)fd_vinyl_data_obj_val( obj ) - data_laddr0;
116 :
117 0 : DONE( FD_VINYL_SUCCESS );
118 :
119 0 : }
120 :
121 : /* At this point, we are acquiring a cached pair for modify.
122 : If we are not allowed to acquire an existing pair for
123 : modify (INVAL) or if the line line_idx is already acquired
124 : for anything (AGAIN), fail. */
125 :
126 0 : if( FD_UNLIKELY( ref ) ) DONE( FD_VINYL_ERR_AGAIN );
127 0 : if( FD_UNLIKELY( req_flag_excl ) ) DONE( FD_VINYL_ERR_INVAL );
128 :
129 0 : fd_vinyl_bstream_phdr_t * phdr = fd_vinyl_data_obj_phdr( obj );
130 :
131 0 : FD_DCHECK_CRIT( !obj->rd_active, "corruption detected" );
132 0 : FD_DCHECK_CRIT( fd_vinyl_data_obj_val_max( obj ) >= val_sz, "corruption detected" );
133 0 : FD_DCHECK_CRIT( phdr->ctl==fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_PAIR,
134 0 : FD_VINYL_BSTREAM_CTL_STYLE_RAW, val_sz ), "corruption detected" );
135 0 : FD_DCHECK_CRIT( fd_vinyl_key_eq( &phdr->key, key ), "corruption detected" );
136 0 : FD_DCHECK_CRIT( !memcmp( &phdr->info, &ele0[ ele_idx ].phdr.info, sizeof(fd_vinyl_info_t) ), "corruption detected" );
137 :
138 : /* If the ignore flag is set, set the cached value size to 0. */
139 :
140 0 : if( req_flag_ignore ) {
141 0 : phdr->info.val_sz = 0U;
142 0 : val_sz = 0UL;
143 0 : }
144 :
145 : /* If the current location for the pair key's data isn't
146 : sufficient to hold the worst case val_sz that the client
147 : might modify the pair's value into, adjust the space
148 : available for the pair to the user's val_max. Because we
149 : might be ignoring the existing value, this could be smaller
150 : than the current object. (We could chose to not trim in
151 : this case because it will get trimmed again on release.
152 : But doing so makes a more consistent guarantee to the
153 : client and makes testing easier.) */
154 :
155 0 : ulong csz = sizeof(fd_vinyl_bstream_phdr_t) + val_sz;
156 :
157 0 : ulong szc_new = fd_vinyl_data_szc( fd_ulong_max( val_sz, req_val_max ) );
158 0 : ulong szc_old = (ulong)obj->szc;
159 :
160 0 : if( FD_UNLIKELY( szc_new != szc_old ) ) {
161 :
162 0 : fd_vinyl_data_obj_t * obj_new = fd_vinyl_data_alloc( data, szc_new );
163 0 : if( FD_UNLIKELY( !obj_new ) ) FD_LOG_CRIT(( "increase data cache size" ));
164 :
165 0 : fd_vinyl_bstream_phdr_t * phdr_new = fd_vinyl_data_obj_phdr( obj_new );
166 :
167 0 : memcpy( phdr_new, phdr, csz );
168 :
169 0 : fd_vinyl_data_free( data, obj );
170 :
171 0 : phdr = phdr_new;
172 0 : obj = obj_new;
173 :
174 0 : line[ line_idx ].obj = obj; obj->line_idx = line_idx; obj->rd_active = (short)0;
175 0 : }
176 :
177 : /* Zero out any remaining space in the pair. */
178 :
179 0 : ulong zsz = fd_vinyl_bstream_pair_sz( fd_vinyl_data_szc_val_max( szc_new ) ) - csz;
180 0 : memset( ((uchar *)phdr) + csz, 0, zsz );
181 :
182 : /* Finish up acquiring for modify */
183 :
184 : //line[ line_idx ].obj = ... already init;
185 : //line[ line_idx ].ele_idx = ... already init;
186 0 : line[ line_idx ].ctl = fd_vinyl_line_ctl( ver+1UL, -1L ); /* bump ver */
187 :
188 0 : fd_vinyl_line_evict_prio( &vinyl->line_idx_lru, line, line_cnt, line_idx, req_evict_prio );
189 :
190 : //phdr->ctl = ... already init
191 : //phdr->key = ... already init
192 : //phdr->info = ... already init
193 :
194 : //ele0[ ele_idx ] = ... already init
195 :
196 0 : req_val_gaddr[ batch_idx ] = (ulong)fd_vinyl_data_obj_val( obj ) - data_laddr0;
197 :
198 0 : DONE( FD_VINYL_SUCCESS );
199 :
200 0 : } /* pair key data cached */
201 :
202 : /* At this point, pair key is not cached. If we are not allowed
203 : to acquire this pair, fail. Otherwise, evict the least
204 : recently used evictable line (this should always be possible
205 : if quotas are confiured correctly) to make room to cache this
206 : pair. Connect this line to meta element ele_idx, set the
207 : line's reference count appropriately, bump the line's version
208 : and move the line to the desired location in the eviction
209 : sequence. We don't modify any shared fields in meta element
210 : ele_idx so we can do the modification fast.
211 :
212 : We do this upfront to free data cache for the alloc if the
213 : LRU line is in use and to handle the same pair appearing
214 : multiple times in an acquire.
215 :
216 : That is, if req_key appears multiple times in an acquire to
217 : modify, the trailing redundant acquires will see the object
218 : as cached with ref==-1 and fail with AGAIN. If the key
219 : appears multiple times in an acquire for read, the trailing
220 : redundant acquires will see the object as cached with ref>0
221 : and rd_active==1, conclude that the first redundant acquire
222 : is in the process of reading the pair into cache, skip any
223 : racy metadata checks, increase the ref count and succeed.
224 :
225 : IMPORTANT SAFETY TIP! Note that this implies that client
226 : doing an acquire-for-read with redundant keys and with
227 : speculative processing will see req_err transition to success
228 : for the trailing redundant items for a key before the leading
229 : item of that key transitions to success (and thus before the
230 : object is fully read / verified and/or decoded). It is up to
231 : the client doing speculative cut through processing to avoid
232 : redundant keys or react accordingly. */
233 :
234 0 : if( FD_UNLIKELY( req_flag_modify & req_flag_excl ) ) DONE( FD_VINYL_ERR_INVAL );
235 :
236 0 : line_idx = fd_vinyl_line_evict_lru( &vinyl->line_idx_lru, line, line_cnt, ele0, ele_max, data );
237 :
238 0 : ulong line_ctl = line[ line_idx ].ctl;
239 :
240 0 : ulong ver = fd_vinyl_line_ctl_ver( line_ctl );
241 :
242 0 : line[ line_idx ].ele_idx = ele_idx; ele0[ ele_idx ].line_idx = line_idx;
243 0 : line[ line_idx ].ctl = fd_vinyl_line_ctl( ver+1UL, req_flag_modify ? -1L : 1L );
244 :
245 0 : fd_vinyl_line_evict_prio( &vinyl->line_idx_lru, line, line_cnt, line_idx, req_evict_prio );
246 :
247 : /* Allocate an appropriately sized object to hold this pair,
248 : connect it to this line and report the location to the client. */
249 :
250 0 : ulong val_max = fd_ulong_if( !req_flag_modify, val_sz,
251 0 : fd_ulong_if( !req_flag_ignore, fd_ulong_max( val_sz, req_val_max ),
252 0 : req_val_max ) );
253 :
254 0 : ulong szc = fd_vinyl_data_szc( val_max );
255 :
256 0 : fd_vinyl_data_obj_t * obj = fd_vinyl_data_alloc( data, szc );
257 0 : if( FD_UNLIKELY( !obj ) ) FD_LOG_CRIT(( "increase data cache size" ));
258 :
259 0 : line[ line_idx ].obj = obj; obj->line_idx = line_idx;
260 :
261 0 : void * val = fd_vinyl_data_obj_val( obj );
262 :
263 0 : req_val_gaddr[ batch_idx ] = (ulong)val - data_laddr0;
264 :
265 : /* If we need to do I/O, start reading encoded pair data and
266 : defer the data integrity and decoding to later (and then in
267 : whatever order the I/O layer sees fit). */
268 :
269 0 : if( FD_LIKELY( !(req_flag_modify & req_flag_ignore) ) ) {
270 0 : obj->rd_active = (short)1;
271 :
272 0 : int style = fd_vinyl_bstream_ctl_style( pair_ctl );
273 0 : ulong val_esz = fd_vinyl_bstream_ctl_sz ( pair_ctl );
274 :
275 0 : FD_DCHECK_CRIT( val_esz<=FD_VINYL_VAL_MAX, "corruption detected" );
276 0 : FD_DCHECK_CRIT( (style!=FD_VINYL_BSTREAM_CTL_STYLE_RAW) | (val_sz==val_esz), "corruption detected" );
277 :
278 0 : fd_vinyl_data_obj_t * cobj;
279 :
280 0 : if( FD_LIKELY( style==FD_VINYL_BSTREAM_CTL_STYLE_RAW ) ) cobj = obj;
281 0 : else {
282 0 : cobj = fd_vinyl_data_alloc( data, fd_vinyl_data_szc( val_esz ) );
283 0 : if( FD_UNLIKELY( !cobj ) ) FD_LOG_CRIT(( "increase data cache size" ));
284 0 : }
285 :
286 0 : cobj->rd->ctx = (ulong)obj;
287 0 : cobj->rd->seq = ele0[ ele_idx ].seq;
288 0 : cobj->rd->dst = fd_vinyl_data_obj_phdr( cobj );
289 0 : cobj->rd->sz = fd_vinyl_bstream_pair_sz( val_esz );
290 :
291 0 : cobj->rd_err = req_err + batch_idx;
292 :
293 0 : fd_vinyl_io_read( io, cobj->rd );
294 0 : read_cnt++;
295 :
296 0 : quota_rem--;
297 0 : goto next_acquire;
298 0 : }
299 :
300 : /* At this point, we are acquiring to modify but we don't need
301 : the existing value. We populate the cached pair header
302 : appropriately for the modify and zero the rest to complete
303 : this request immediately. */
304 :
305 0 : obj->rd_active = (short)0;
306 :
307 0 : fd_vinyl_bstream_phdr_t * phdr = fd_vinyl_data_obj_phdr( obj );
308 :
309 0 : phdr->ctl = fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_PAIR, FD_VINYL_BSTREAM_CTL_STYLE_RAW, val_sz );
310 0 : phdr->key = *key;
311 0 : phdr->info = ele0[ ele_idx ].phdr.info;
312 :
313 0 : phdr->info.val_sz = 0U;
314 :
315 0 : memset( val, 0, fd_vinyl_data_szc_obj_footprint( szc ) - sizeof(fd_vinyl_data_obj_t) - sizeof(fd_vinyl_bstream_phdr_t) );
316 :
317 0 : DONE( FD_VINYL_SUCCESS );
318 :
319 0 : } /* pair key meta cached */
320 :
321 : /* At this point, pair key does not exist at bstream seq_present
322 : and is not in the process of being created. If we aren't
323 : allowed to create pair key, fail. Otherwise, evict the least
324 : recently used evictable line (this should always be possible if
325 : quotas are configured correctly) to make room to cache this
326 : pair, set the line's reference count appropriately, bump the
327 : version and move the line to the desired location in the
328 : eviction sequence. We do this upfront to free data cache for
329 : the alloc if the LRU line is in use. */
330 :
331 0 : if( FD_UNLIKELY( !(req_flag_modify & req_flag_create) ) ) DONE( FD_VINYL_ERR_KEY );
332 :
333 0 : ulong line_idx = fd_vinyl_line_evict_lru( &vinyl->line_idx_lru, line, line_cnt, ele0, ele_max, data );
334 :
335 0 : ulong line_ctl = line[ line_idx ].ctl;
336 :
337 0 : ulong ver = fd_vinyl_line_ctl_ver( line_ctl );
338 :
339 0 : line[ line_idx ].ctl = fd_vinyl_line_ctl( ver+1UL, -1L );
340 :
341 0 : fd_vinyl_line_evict_prio( &vinyl->line_idx_lru, line, line_cnt, line_idx, req_evict_prio );
342 :
343 : /* Allocate an appropriately sized object to hold this pair and
344 : connect it to this line. */
345 :
346 0 : ulong szc = fd_vinyl_data_szc( req_val_max );
347 :
348 0 : fd_vinyl_data_obj_t * obj = fd_vinyl_data_alloc( data, szc );
349 0 : if( FD_UNLIKELY( !obj ) ) FD_LOG_CRIT(( "increase data cache size" ));
350 :
351 0 : line[ line_idx ].obj = obj; obj->line_idx = line_idx; obj->rd_active = (short)0;
352 :
353 : /* Allocate a meta element to hold metadata for this pair and
354 : connect it to this line. Since we are inserting at meta
355 : element ele_idx, we don't need to lock anything so long as we
356 : mark the element as in use very last. */
357 :
358 0 : ulong pair_cnt = vinyl->pair_cnt;
359 0 : if( FD_UNLIKELY( pair_cnt>=pair_max ) ) FD_LOG_CRIT(( "increase meta cache size" ));
360 0 : vinyl->pair_cnt = pair_cnt + 1UL;
361 :
362 0 : ele0[ ele_idx ].memo = memo;
363 : //ele0[ ele_idx ].phdr.ctl init below
364 0 : ele0[ ele_idx ].phdr.key = *key;
365 0 : memset( &ele0[ ele_idx ].phdr.info, 0, sizeof(fd_vinyl_info_t) ); /* sets val_sz to 0 */
366 0 : ele0[ ele_idx ].line_idx = line_idx;
367 0 : ele0[ ele_idx ].seq = 0UL; /* Will be init on release */
368 :
369 0 : FD_COMPILER_MFENCE();
370 0 : ele0[ ele_idx ].phdr.ctl = ULONG_MAX; /* Mark as being created */
371 0 : FD_COMPILER_MFENCE();
372 :
373 0 : line[ line_idx ].ele_idx = ele_idx;
374 :
375 : /* Initialize the data region for a new pair */
376 :
377 0 : *fd_vinyl_data_obj_phdr( obj ) = ele0[ ele_idx ].phdr;
378 :
379 0 : uchar * val = (uchar *)fd_vinyl_data_obj_val( obj );
380 :
381 0 : memset( val, 0, fd_vinyl_data_szc_obj_footprint( szc ) - sizeof(fd_vinyl_data_obj_t) - sizeof(fd_vinyl_bstream_phdr_t) );
382 :
383 0 : req_val_gaddr[ batch_idx ] = (ulong)val - data_laddr0;
384 :
385 0 : DONE( FD_VINYL_SUCCESS );
386 :
387 0 : next_acquire: /* silly language restriction */;
388 :
389 0 : # undef DONE
390 :
391 0 : } /* for batch_idx */
392 :
393 0 : FD_DCHECK_CRIT( (!read_cnt) | (!(req_flag_modify & req_flag_ignore)), "corruption detected" );
394 :
395 0 : comp_err = FD_VINYL_SUCCESS;
396 0 : break;
397 0 : }
|