Line data Source code
1 0 : case FD_VINYL_REQ_TYPE_ACQUIRE: {
2 0 : ulong req_flags = (ulong)req->flags;
3 0 : ulong req_val_max = (ulong)req->val_max;
4 0 : fd_vinyl_key_t const * req_key = MAP_REQ_GADDR( req->key_gaddr, fd_vinyl_key_t, batch_cnt );
5 0 : ulong * req_val_gaddr = MAP_REQ_GADDR( req->val_gaddr_gaddr, ulong, batch_cnt );
6 0 : schar * req_err = MAP_REQ_GADDR( req->err_gaddr, schar, batch_cnt );
7 :
8 0 : int req_flag_modify = fd_vinyl_req_flag_modify( req_flags );
9 0 : int req_flag_ignore = fd_vinyl_req_flag_ignore( req_flags );
10 0 : int req_flag_create = fd_vinyl_req_flag_create( req_flags );
11 0 : int req_flag_excl = fd_vinyl_req_flag_excl ( req_flags );
12 0 : int req_evict_prio = fd_vinyl_req_evict_prio ( req_flags );
13 :
14 0 : int bad_gaddr = (!!batch_cnt) & ((!req_key) | (!req_val_gaddr) | (!req_err));
15 0 : int bad_val_max = req_flag_modify & (req_val_max>FD_VINYL_VAL_MAX);
16 0 : int bad_quota = quota_rem<batch_cnt;
17 :
18 0 : if( FD_UNLIKELY( bad_gaddr | bad_val_max | bad_quota ) ) {
19 0 : comp_err = (bad_gaddr | bad_val_max) ? FD_VINYL_ERR_INVAL : FD_VINYL_ERR_FULL;
20 0 : break;
21 0 : }
22 :
23 0 : for( ulong batch_idx=0UL; batch_idx<batch_cnt; batch_idx++ ) {
24 :
25 0 : # define DONE(err) do { \
26 0 : int _err = (err); \
27 0 : FD_COMPILER_MFENCE(); \
28 0 : req_err[ batch_idx ] = (schar)_err; \
29 0 : FD_COMPILER_MFENCE(); \
30 0 : quota_rem -= (ulong) !_err; \
31 0 : fail_cnt += (ulong)!!_err; \
32 0 : goto next_acquire; /* sigh ... can't use continue */ \
33 0 : } while(0)
34 :
35 : /* Query vinyl meta for key */
36 :
37 0 : fd_vinyl_key_t const * key = req_key + batch_idx;
38 :
39 0 : ulong memo = fd_vinyl_key_memo( meta_seed, key );
40 :
41 0 : ulong _ele_idx; /* avoid pointer escape */
42 0 : int err = fd_vinyl_meta_query_fast( ele0, ele_max, key, memo, &_ele_idx );
43 0 : ulong ele_idx = _ele_idx; /* In [0,ele_max) */
44 :
45 0 : if( FD_LIKELY( !err ) ) { /* pair key meta cached */
46 :
47 : /* At this point, pair key either exists at bstream seq_present
48 : or is in the process of being created. If pair key is being
49 : created, fail with AGAIN (it must be acquired for modify). */
50 :
51 0 : ulong pair_ctl = ele0[ ele_idx ].phdr.ctl;
52 :
53 0 : FD_CRIT( (fd_vinyl_bstream_ctl_type( pair_ctl )==FD_VINYL_BSTREAM_CTL_TYPE_PAIR) | (pair_ctl==ULONG_MAX),
54 0 : "corruption detected" );
55 :
56 0 : if( FD_UNLIKELY( pair_ctl==ULONG_MAX ) ) DONE( FD_VINYL_ERR_AGAIN );
57 :
58 : /* At this point, pair key exists at bstream seq_present. */
59 :
60 0 : ulong val_sz = (ulong)ele0[ ele_idx ].phdr.info.val_sz;
61 0 : ulong line_idx = ele0[ ele_idx ].line_idx;
62 :
63 0 : FD_CRIT( val_sz<=FD_VINYL_VAL_MAX, "corruption detected" );
64 0 : FD_CRIT( (line_idx<line_cnt) | (line_idx==ULONG_MAX), "corruption detected" );
65 :
66 0 : if( FD_LIKELY( line_idx<line_cnt ) ) {
67 :
68 : /* At this point, pair key is cached. Get the cache info for
69 : line line_idx. */
70 :
71 0 : FD_CRIT( line[ line_idx ].ele_idx==ele_idx, "corruption detected" );
72 :
73 0 : fd_vinyl_data_obj_t * obj = line[ line_idx ].obj;
74 :
75 0 : FD_ALERT( fd_vinyl_data_is_valid_obj( obj, vol, vol_cnt ), "corruption detected" );
76 0 : FD_CRIT ( obj->line_idx==line_idx, "corruption detected" );
77 :
78 0 : ulong line_ctl = line[ line_idx ].ctl;
79 :
80 0 : ulong ver = fd_vinyl_line_ctl_ver( line_ctl );
81 0 : long ref = fd_vinyl_line_ctl_ref( line_ctl );
82 :
83 0 : if( FD_LIKELY( !req_flag_modify ) ) {
84 :
85 : /* At this point, we are acquiring a cached pair for read.
86 : If the line is acquired for modify, fail with AGAIN. If
87 : there are too many acquires for read on this pair, CRIT
88 : (could consider AGAIN here). Otherwise, we update the
89 : ref count (don't change the ver), point the client at the
90 : line caching pair key to finish the acquire. Note that
91 : we don't validate the pair header if we detect that an
92 : earlier acquire in this batch started fetching the pair
93 : because the read might still be in progress (see note
94 : below for more details). */
95 :
96 0 : if( FD_UNLIKELY( ref<0L ) ) DONE( FD_VINYL_ERR_AGAIN );
97 0 : if( FD_UNLIKELY( ref>=FD_VINYL_LINE_REF_MAX ) ) FD_LOG_CRIT(( "too many acquires for read on this pair" ));
98 :
99 0 : if( FD_LIKELY( !obj->rd_active ) ) {
100 0 : fd_vinyl_bstream_phdr_t * phdr = fd_vinyl_data_obj_phdr( obj );
101 :
102 0 : FD_CRIT( fd_vinyl_data_obj_val_max( obj ) >= val_sz, "corruption detected" );
103 0 : FD_CRIT( phdr->ctl==fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_PAIR,
104 0 : FD_VINYL_BSTREAM_CTL_STYLE_RAW, val_sz ), "corruption detected" );
105 0 : FD_CRIT( fd_vinyl_key_eq( &phdr->key, key ), "corruption detected" );
106 0 : FD_CRIT( !memcmp( &phdr->info, &ele0[ ele_idx ].phdr.info, sizeof(fd_vinyl_info_t) ), "corruption detected" );
107 0 : }
108 :
109 0 : line[ line_idx ].ctl = fd_vinyl_line_ctl( ver, ref+1L ); /* don't bump ver */
110 :
111 0 : req_val_gaddr[ batch_idx ] = (ulong)fd_vinyl_data_obj_val( obj ) - data_laddr0;
112 :
113 0 : DONE( FD_VINYL_SUCCESS );
114 :
115 0 : }
116 :
117 : /* At this point, we are acquiring a cached pair for modify.
118 : If we are not allowed to acquire an existing pair for
119 : modify (INVAL) or if the line line_idx is already acquired
120 : for anything (AGAIN), fail. */
121 :
122 0 : if( FD_UNLIKELY( ref ) ) DONE( FD_VINYL_ERR_AGAIN );
123 0 : if( FD_UNLIKELY( req_flag_excl ) ) DONE( FD_VINYL_ERR_INVAL );
124 :
125 0 : fd_vinyl_bstream_phdr_t * phdr = fd_vinyl_data_obj_phdr( obj );
126 :
127 0 : FD_CRIT( !obj->rd_active, "corruption detected" );
128 0 : FD_CRIT( fd_vinyl_data_obj_val_max( obj ) >= val_sz, "corruption detected" );
129 0 : FD_CRIT( phdr->ctl==fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_PAIR,
130 0 : FD_VINYL_BSTREAM_CTL_STYLE_RAW, val_sz ), "corruption detected" );
131 0 : FD_CRIT( fd_vinyl_key_eq( &phdr->key, key ), "corruption detected" );
132 0 : FD_CRIT( !memcmp( &phdr->info, &ele0[ ele_idx ].phdr.info, sizeof(fd_vinyl_info_t) ), "corruption detected" );
133 :
134 : /* If the ignore flag is set, set the cached value size to 0. */
135 :
136 0 : if( req_flag_ignore ) {
137 0 : phdr->info.val_sz = 0U;
138 0 : val_sz = 0UL;
139 0 : }
140 :
141 : /* If the current location for the pair key's data isn't
142 : sufficient to hold the worst case val_sz that the client
143 : might modify the pair's value into, adjust the space
144 : available for the pair to the user's val_max. Because we
145 : might be ignoring the existing value, this could be smaller
146 : than the current object. (We could chose to not trim in
147 : this case because it will get trimmed again on release.
148 : But doing so makes a more consistent guarantee to the
149 : client and makes testing easier.) */
150 :
151 0 : ulong csz = sizeof(fd_vinyl_bstream_phdr_t) + val_sz;
152 :
153 0 : ulong szc_new = fd_vinyl_data_szc( fd_ulong_max( val_sz, req_val_max ) );
154 0 : ulong szc_old = (ulong)obj->szc;
155 :
156 0 : if( FD_UNLIKELY( szc_new != szc_old ) ) {
157 :
158 0 : fd_vinyl_data_obj_t * obj_new = fd_vinyl_data_alloc( data, szc_new );
159 0 : if( FD_UNLIKELY( !obj_new ) ) FD_LOG_CRIT(( "increase data cache size" ));
160 :
161 0 : fd_vinyl_bstream_phdr_t * phdr_new = fd_vinyl_data_obj_phdr( obj_new );
162 :
163 0 : memcpy( phdr_new, phdr, csz );
164 :
165 0 : fd_vinyl_data_free( data, obj );
166 :
167 0 : phdr = phdr_new;
168 0 : obj = obj_new;
169 :
170 0 : line[ line_idx ].obj = obj; obj->line_idx = line_idx; obj->rd_active = (short)0;
171 0 : }
172 :
173 : /* Zero out any remaining space in the pair. */
174 :
175 0 : ulong zsz = fd_vinyl_bstream_pair_sz( fd_vinyl_data_szc_val_max( szc_new ) ) - csz;
176 0 : memset( ((uchar *)phdr) + csz, 0, zsz );
177 :
178 : /* Finish up acquiring for modify */
179 :
180 : //line[ line_idx ].obj = ... already init;
181 : //line[ line_idx ].ele_idx = ... already init;
182 0 : line[ line_idx ].ctl = fd_vinyl_line_ctl( ver+1UL, -1L ); /* bump ver */
183 :
184 0 : fd_vinyl_line_evict_prio( &vinyl->line_idx_lru, line, line_cnt, line_idx, req_evict_prio );
185 :
186 : //phdr->ctl = ... already init
187 : //phdr->key = ... already init
188 : //phdr->info = ... already init
189 :
190 : //ele0[ ele_idx ] = ... already init
191 :
192 0 : req_val_gaddr[ batch_idx ] = (ulong)fd_vinyl_data_obj_val( obj ) - data_laddr0;
193 :
194 0 : DONE( FD_VINYL_SUCCESS );
195 :
196 0 : } /* pair key data cached */
197 :
198 : /* At this point, pair key is not cached. If we are not allowed
199 : to acquire this pair, fail. Otherwise, evict the least
200 : recently used evictable line (this should always be possible
201 : if quotas are confiured correctly) to make room to cache this
202 : pair. Connect this line to meta element ele_idx, set the
203 : line's reference count appropriately, bump the line's version
204 : and move the line to the desired location in the eviction
205 : sequence. We don't modify any shared fields in meta element
206 : ele_idx so we can do the modification fast.
207 :
208 : We do this upfront to free data cache for the alloc if the
209 : LRU line is in use and to handle the same pair appearing
210 : multiple times in an acquire.
211 :
212 : That is, if req_key appears multiple times in an acquire to
213 : modify, the trailing redundant acquires will see the object
214 : as cached with ref==-1 and fail with AGAIN. If the key
215 : appears multiple times in an acquire for read, the trailing
216 : redundant acquires will see the object as cached with ref>0
217 : and rd_active==1, conclude that the first redundant acquire
218 : is in the process of reading the pair into cache, skip any
219 : racy metadata checks, increase the ref count and succeed.
220 :
221 : IMPORTANT SAFETY TIP! Note that this implies that client
222 : doing an acquire-for-read with redundant keys and with
223 : speculative processing will see req_err transition to success
224 : for the trailing redundant items for a key before the leading
225 : item of that key transitions to success (and thus before the
226 : object is fully read / verified and/or decoded). It is up to
227 : the client doing speculative cut through processing to avoid
228 : redundant keys or react accordingly. */
229 :
230 0 : if( FD_UNLIKELY( req_flag_modify & req_flag_excl ) ) DONE( FD_VINYL_ERR_INVAL );
231 :
232 0 : line_idx = fd_vinyl_line_evict_lru( &vinyl->line_idx_lru, line, line_cnt, ele0, ele_max, data );
233 :
234 0 : ulong line_ctl = line[ line_idx ].ctl;
235 :
236 0 : ulong ver = fd_vinyl_line_ctl_ver( line_ctl );
237 :
238 0 : line[ line_idx ].ele_idx = ele_idx; ele0[ ele_idx ].line_idx = line_idx;
239 0 : line[ line_idx ].ctl = fd_vinyl_line_ctl( ver+1UL, req_flag_modify ? -1L : 1L );
240 :
241 0 : fd_vinyl_line_evict_prio( &vinyl->line_idx_lru, line, line_cnt, line_idx, req_evict_prio );
242 :
243 : /* Allocate an appropriately sized object to hold this pair,
244 : connect it to this line and report the location to the client. */
245 :
246 0 : ulong val_max = fd_ulong_if( !req_flag_modify, val_sz,
247 0 : fd_ulong_if( !req_flag_ignore, fd_ulong_max( val_sz, req_val_max ),
248 0 : req_val_max ) );
249 :
250 0 : ulong szc = fd_vinyl_data_szc( val_max );
251 :
252 0 : fd_vinyl_data_obj_t * obj = fd_vinyl_data_alloc( data, szc );
253 0 : if( FD_UNLIKELY( !obj ) ) FD_LOG_CRIT(( "increase data cache size" ));
254 :
255 0 : line[ line_idx ].obj = obj; obj->line_idx = line_idx;
256 :
257 0 : void * val = fd_vinyl_data_obj_val( obj );
258 :
259 0 : req_val_gaddr[ batch_idx ] = (ulong)val - data_laddr0;
260 :
261 : /* If we need to do I/O, start reading encoded pair data and
262 : defer the data integrity and decoding to later (and then in
263 : whatever order the I/O layer sees fit). */
264 :
265 0 : if( FD_LIKELY( !(req_flag_modify & req_flag_ignore) ) ) {
266 0 : obj->rd_active = (short)1;
267 :
268 0 : int style = fd_vinyl_bstream_ctl_style( pair_ctl );
269 0 : ulong val_esz = fd_vinyl_bstream_ctl_sz ( pair_ctl );
270 :
271 0 : FD_CRIT( val_esz<=FD_VINYL_VAL_MAX, "corruption detected" );
272 0 : FD_CRIT( (style!=FD_VINYL_BSTREAM_CTL_STYLE_RAW) | (val_sz==val_esz), "corruption detected" );
273 :
274 0 : fd_vinyl_data_obj_t * cobj;
275 :
276 0 : if( FD_LIKELY( style==FD_VINYL_BSTREAM_CTL_STYLE_RAW ) ) cobj = obj;
277 0 : else {
278 0 : cobj = fd_vinyl_data_alloc( data, fd_vinyl_data_szc( val_esz ) );
279 0 : if( FD_UNLIKELY( !cobj ) ) FD_LOG_CRIT(( "increase data cache size" ));
280 0 : }
281 :
282 0 : cobj->rd->ctx = (ulong)obj;
283 0 : cobj->rd->seq = ele0[ ele_idx ].seq;
284 0 : cobj->rd->dst = fd_vinyl_data_obj_phdr( cobj );
285 0 : cobj->rd->sz = fd_vinyl_bstream_pair_sz( val_esz );
286 :
287 0 : cobj->rd_err = req_err + batch_idx;
288 :
289 0 : fd_vinyl_io_read( io, cobj->rd );
290 0 : read_cnt++;
291 :
292 0 : quota_rem--;
293 0 : goto next_acquire;
294 0 : }
295 :
296 : /* At this point, we are acquiring to modify but we don't need
297 : the existing value. We populate the cached pair header
298 : appropriately for the modify and zero the rest to complete
299 : this request immediately. */
300 :
301 0 : obj->rd_active = (short)0;
302 :
303 0 : fd_vinyl_bstream_phdr_t * phdr = fd_vinyl_data_obj_phdr( obj );
304 :
305 0 : phdr->ctl = fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_PAIR, FD_VINYL_BSTREAM_CTL_STYLE_RAW, val_sz );
306 0 : phdr->key = *key;
307 0 : phdr->info = ele0[ ele_idx ].phdr.info;
308 :
309 0 : phdr->info.val_sz = 0U;
310 :
311 0 : memset( val, 0, fd_vinyl_data_szc_obj_footprint( szc ) - sizeof(fd_vinyl_data_obj_t) - sizeof(fd_vinyl_bstream_phdr_t) );
312 :
313 0 : DONE( FD_VINYL_SUCCESS );
314 :
315 0 : } /* pair key meta cached */
316 :
317 : /* At this point, pair key does not exist at bstream seq_present
318 : and is not in the process of being created. If we aren't
319 : allowed to create pair key, fail. Otherwise, evict the least
320 : recently used evictable line (this should always be possible if
321 : quotas are confiured correctly) to make room to cache this
322 : pair, set the line's reference count appropriately, bump the
323 : version and move the line to the desired location in the
324 : eviction sequence. We do this upfront to free data cache for
325 : the alloc if the LRU line is in use. */
326 :
327 0 : if( FD_UNLIKELY( !(req_flag_modify & req_flag_create) ) ) DONE( FD_VINYL_ERR_KEY );
328 :
329 0 : ulong line_idx = fd_vinyl_line_evict_lru( &vinyl->line_idx_lru, line, line_cnt, ele0, ele_max, data );
330 :
331 0 : ulong line_ctl = line[ line_idx ].ctl;
332 :
333 0 : ulong ver = fd_vinyl_line_ctl_ver( line_ctl );
334 :
335 0 : line[ line_idx ].ctl = fd_vinyl_line_ctl( ver+1UL, -1L );
336 :
337 0 : fd_vinyl_line_evict_prio( &vinyl->line_idx_lru, line, line_cnt, line_idx, req_evict_prio );
338 :
339 : /* Allocate an appropriately sized object to hold this pair and
340 : connect it to this line. */
341 :
342 0 : ulong szc = fd_vinyl_data_szc( req_val_max );
343 :
344 0 : fd_vinyl_data_obj_t * obj = fd_vinyl_data_alloc( data, szc );
345 0 : if( FD_UNLIKELY( !obj ) ) FD_LOG_CRIT(( "increase data cache size" ));
346 :
347 0 : line[ line_idx ].obj = obj; obj->line_idx = line_idx; obj->rd_active = (short)0;
348 :
349 : /* Allocate a meta element to hold metadata for this pair and
350 : connect it to this line. Since we are inserting at meta
351 : element ele_idx, we don't need to lock anything so long as we
352 : mark the element as in use very last. */
353 :
354 0 : ulong pair_cnt = vinyl->pair_cnt;
355 0 : if( FD_UNLIKELY( pair_cnt>=pair_max ) ) FD_LOG_CRIT(( "increase meta cache size" ));
356 0 : vinyl->pair_cnt = pair_cnt + 1UL;
357 :
358 0 : ele0[ ele_idx ].memo = memo;
359 : //ele0[ ele_idx ].phdr.ctl init below
360 0 : ele0[ ele_idx ].phdr.key = *key;
361 0 : memset( &ele0[ ele_idx ].phdr.info, 0, sizeof(fd_vinyl_info_t) ); /* sets val_sz to 0 */
362 0 : ele0[ ele_idx ].line_idx = line_idx;
363 0 : ele0[ ele_idx ].seq = 0UL; /* Will be init on release */
364 :
365 0 : FD_COMPILER_MFENCE();
366 0 : ele0[ ele_idx ].phdr.ctl = ULONG_MAX; /* Mark as being created */
367 0 : FD_COMPILER_MFENCE();
368 :
369 0 : line[ line_idx ].ele_idx = ele_idx;
370 :
371 : /* Initialize the data region for a new pair */
372 :
373 0 : *fd_vinyl_data_obj_phdr( obj ) = ele0[ ele_idx ].phdr;
374 :
375 0 : uchar * val = (uchar *)fd_vinyl_data_obj_val( obj );
376 :
377 0 : memset( val, 0, fd_vinyl_data_szc_obj_footprint( szc ) - sizeof(fd_vinyl_data_obj_t) - sizeof(fd_vinyl_bstream_phdr_t) );
378 :
379 0 : req_val_gaddr[ batch_idx ] = (ulong)val - data_laddr0;
380 :
381 0 : DONE( FD_VINYL_SUCCESS );
382 :
383 0 : next_acquire: /* silly language restriction */;
384 :
385 0 : # undef DONE
386 :
387 0 : } /* for batch_idx */
388 :
389 0 : FD_CRIT( (!read_cnt) | (!(req_flag_modify & req_flag_ignore)), "corruption detected" );
390 :
391 0 : comp_err = FD_VINYL_SUCCESS;
392 0 : break;
393 0 : }
|