Line data Source code
1 : #include "fd_vinyl.h"
2 : #include "../util/pod/fd_pod.h"
3 : #include <errno.h>
4 : #include <unistd.h>
5 : #include <fcntl.h>
6 : #include <lz4.h>
7 :
8 : struct fd_vinyl_client {
9 : fd_vinyl_rq_t * rq; /* Channel for requests from this client (could be shared by multiple vinyl instances) */
10 : fd_vinyl_cq_t * cq; /* Channel for completions from this client to this vinyl instance
11 : (could be shared by multiple receivers of completions from this vinyl instance). */
12 : ulong burst_max; /* Max requests receive from this client at a time */
13 : ulong seq; /* Sequence number of the next request to receive in the rq */
14 : ulong link_id; /* Identifies requests from this client to this vinyl instance in the rq */
15 : ulong laddr0; /* A valid non-zero gaddr from this client maps to the vinyl instance's laddr laddr0 + gaddr ... */
16 : ulong laddr1; /* ... and thus is in (laddr0,laddr1). A zero gaddr maps to laddr NULL. */
17 : ulong quota_rem; /* Num of remaining acquisitions this client is allowed on this vinyl instance */
18 : ulong quota_max; /* Max quota */
19 : };
20 :
21 : typedef struct fd_vinyl_client fd_vinyl_client_t;
22 :
23 : /* MAP_REQ_GADDR maps a request global address req_gaddr to an array of
24 : cnt T's into the local address space as a T * pointer. If the result
25 : is not properly aligned or the entire range does not completely fall
26 : within the shared region with the client, returns NULL. Likewise,
27 : gaadr 0 maps to NULL. Assumes sizeof(T)*(n) does not overflow (which
28 : is true where as n is at most batch_cnt which is at most 2^32 and
29 : sizeof(T) is at most 40. */
30 :
31 0 : #define MAP_REQ_GADDR( gaddr, T, n ) ((T *)fd_vinyl_laddr( (gaddr), alignof(T), sizeof(T)*(n), client_laddr0, client_laddr1 ))
32 :
33 : FD_FN_CONST static inline void *
34 : fd_vinyl_laddr( ulong req_gaddr,
35 : ulong align,
36 : ulong footprint,
37 : ulong client_laddr0,
38 0 : ulong client_laddr1 ) {
39 0 : ulong req_laddr0 = client_laddr0 + req_gaddr;
40 0 : ulong req_laddr1 = req_laddr0 + footprint;
41 0 : return (void *)fd_ulong_if( (!!req_gaddr) & fd_ulong_is_aligned( req_laddr0, align ) &
42 0 : (client_laddr0<=req_laddr0) & (req_laddr0<=req_laddr1) & (req_laddr1<=client_laddr1),
43 0 : req_laddr0, 0UL );
44 0 : }
45 :
46 : /* FIXME: STASH THESE IN THE VINYL TOO? */
47 : #define FD_VINYL_CLIENT_MAX (1024UL)
48 0 : #define FD_VINYL_REQ_MAX (1024UL)
49 :
50 : void
51 0 : fd_vinyl_exec( fd_vinyl_t * vinyl ) {
52 :
53 : /* Unpack shared objects */
54 :
55 0 : fd_cnc_t * cnc = vinyl->cnc;
56 0 : fd_vinyl_io_t * io = vinyl->io;
57 0 : fd_vinyl_line_t * line = vinyl->line;
58 0 : fd_vinyl_meta_t * meta = vinyl->meta;
59 0 : fd_vinyl_data_t * data = vinyl->data;
60 :
61 : /* Unpack config */
62 :
63 0 : ulong line_cnt = vinyl->line_cnt;
64 0 : ulong pair_max = vinyl->pair_max;
65 0 : ulong async_min = vinyl->async_min;
66 0 : ulong async_max = vinyl->async_max;
67 :
68 : /* Unpack cnc */
69 :
70 0 : if( FD_UNLIKELY( fd_cnc_signal_query( cnc )!=FD_VINYL_CNC_SIGNAL_BOOT ) ) {
71 0 : FD_LOG_WARNING(( "cnc not booting (restarting after an unclean termination?); forcing to boot and attempting to continue" ));
72 0 : fd_cnc_signal( cnc, FD_VINYL_CNC_SIGNAL_BOOT );
73 0 : }
74 :
75 0 : fd_vinyl_cmd_t * cmd = (fd_vinyl_cmd_t *)fd_cnc_app_laddr( cnc );
76 0 : ulong * diag = (ulong *)(cmd+1);
77 :
78 : /* Unpack io */
79 :
80 0 : ulong io_seed = fd_vinyl_io_seed( io );
81 :
82 : /* Unpack meta */
83 :
84 0 : fd_vinyl_meta_ele_t * ele0 = meta->ele;
85 0 : ulong ele_max = meta->ele_max;
86 0 : ulong meta_seed = meta->seed;
87 0 : ulong * lock = meta->lock;
88 0 : int lock_shift = meta->lock_shift;
89 :
90 : /* Unpack data */
91 :
92 0 : ulong data_laddr0 = (ulong)data->laddr0;
93 0 : fd_vinyl_data_vol_t const * vol = data->vol;
94 0 : ulong vol_cnt = data->vol_cnt;
95 :
96 : /* Connected clients */
97 :
98 0 : fd_vinyl_client_t _client[ FD_VINYL_CLIENT_MAX ];
99 0 : ulong client_cnt = 0UL; /* In [0,client_max) */
100 0 : ulong client_idx = 0UL; /* If client_cnt>0, next client to poll for requests, d/c otherwise */
101 :
102 0 : ulong quota_free = line_cnt - 1UL;
103 :
104 : /* Received requests */
105 :
106 0 : fd_vinyl_req_t _req[ FD_VINYL_REQ_MAX ];
107 0 : ulong req_head = 0UL; /* Requests [0,req_head) have been processed */
108 0 : ulong req_tail = 0UL; /* Requests [req_head,req_tail) are pending */
109 : /* Requests [req_tail,ULONG_MAX) have not been received */
110 0 : ulong burst_free = FD_VINYL_REQ_MAX;
111 0 : ulong exec_max = 0UL;
112 :
113 : /* accum_dead_cnt is the number of dead blocks that have been
114 : written since the last partition block.
115 :
116 : accum_move_cnt is the number of move blocks that have been
117 : written since this last partition block.
118 :
119 : accum_garbage_cnt / sz is the number of items / bytes garbage in
120 : the bstream that have accumulated since the last time we compacted
121 : the bstream. We use this to estimate the number of rounds of
122 : compaction to do in async handling.
123 :
124 : accum_drop_link is the number of requests that were silently
125 : dropped because the request link_id did not match the client's
126 : link_id.
127 :
128 : accum_drop_comp is the number of requests that were silently
129 : dropped because an out-of-band completion was requested to be sent
130 : to an unmappable client address.
131 :
132 : accumt_req_full is the number of times we detected the pending
133 : request queue being completely full. */
134 :
135 0 : ulong accum_dead_cnt = 0UL;
136 0 : ulong accum_move_cnt = 0UL;
137 0 : ulong accum_garbage_cnt = 0UL;
138 0 : ulong accum_garbage_sz = 0UL;
139 0 : ulong accum_drop_link = 0UL;
140 0 : ulong accum_drop_comp = 0UL;
141 0 : ulong accum_cache_hit = 0UL;
142 :
143 0 : ulong seq_part = fd_vinyl_io_seq_present( io );
144 :
145 : /* Run */
146 :
147 0 : fd_cnc_signal( cnc, FD_VINYL_CNC_SIGNAL_RUN );
148 :
149 0 : ulong async_rem = 1UL;
150 :
151 0 : for(;;) {
152 :
153 : /* Process background tasks this iteration if necessary */
154 :
155 0 : if( FD_UNLIKELY( !(--async_rem) ) ) {
156 0 : long now = fd_log_wallclock();
157 0 : async_rem = async_min + (fd_ulong_hash( (ulong)now ) % (async_max-async_min+1UL)); /* FIXME: FASTER ALGO */
158 :
159 0 : fd_cnc_heartbeat( cnc, now );
160 :
161 : /* If we've written enough to justify appending a parallel
162 : recovery partition, append one. */
163 :
164 0 : ulong seq_future = fd_vinyl_io_seq_future( io );
165 0 : if( FD_UNLIKELY( (seq_future - seq_part) > vinyl->part_thresh ) ) {
166 :
167 0 : ulong seq = fd_vinyl_io_append_part( io, seq_part, accum_dead_cnt, accum_move_cnt, NULL, 0UL );
168 0 : FD_CRIT( fd_vinyl_seq_eq( seq, seq_future ), "corruption detected" );
169 0 : seq_part = seq + FD_VINYL_BSTREAM_BLOCK_SZ;
170 :
171 0 : accum_dead_cnt = 0UL;
172 0 : accum_move_cnt = 0UL;
173 :
174 0 : accum_garbage_cnt++;
175 0 : accum_garbage_sz += FD_VINYL_BSTREAM_BLOCK_SZ;
176 :
177 0 : fd_vinyl_io_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
178 :
179 0 : }
180 :
181 0 : diag[ FD_VINYL_DIAG_DROP_LINK ] += accum_drop_link; accum_drop_link = 0UL;
182 0 : diag[ FD_VINYL_DIAG_DROP_COMP ] += accum_drop_comp; accum_drop_comp = 0UL;
183 0 : diag[ FD_VINYL_DIAG_CACHE_HIT ] += accum_cache_hit; accum_cache_hit = 0UL;
184 :
185 : /* Let the number of items of garbage generated since the last
186 : compaction be accum_garbage_cnt and let the steady steady
187 : average number of live / garbage items in the bstream's past be
188 : L / G (i.e. L is the average value of pair_cnt). The average
189 : number pieces of garbage collected per garbage collection round
190 : is thus G / (L + G). If we do compact_max rounds garbage
191 : collection this async handling, we expect to collect
192 :
193 : compact_max G / (L + G)
194 :
195 : items of garbage on average. To make sure we collect garbage
196 : faster than we generate it on average, we then require:
197 :
198 : accum_garbage_cnt <~ compact_max G / (L + G)
199 : -> compact_max >~ (L + G) accum_garbage_cnt / G
200 :
201 : Let the be 2^-gc_eager be the maximum fraction of items in the
202 : bstream's past we are willing tolerate as garbage on average.
203 : We then have G = 2^-gc_eager (L + G). This implies:
204 :
205 : -> compact_max >~ accum_garbage_cnt 2^gc_eager
206 :
207 : When accum_garbage_cnt is 0, we use a compact_max of 1 to do
208 : compaction rounds at a minimum rate all the time. This allows
209 : transients (e.g. a sudden change to new steady state
210 : equilibrium, temporary disabling of garbage collection at key
211 : times for highest performance, etc) and unaccounted zero
212 : padding garbage to be absorbed when nothing else is going on. */
213 :
214 0 : int gc_eager = vinyl->gc_eager;
215 0 : if( FD_LIKELY( gc_eager>=0 ) ) {
216 :
217 : /* Saturating wide left shift */
218 0 : ulong overflow = (accum_garbage_cnt >> (63-gc_eager) >> 1); /* sigh ... avoid wide shift UB */
219 0 : ulong compact_max = fd_ulong_max( fd_ulong_if( !overflow, accum_garbage_cnt << gc_eager, ULONG_MAX ), 1UL );
220 :
221 : /**/ accum_garbage_cnt = 0UL;
222 0 : vinyl->garbage_sz += accum_garbage_sz; accum_garbage_sz = 0UL;
223 :
224 0 : fd_vinyl_compact( vinyl, compact_max );
225 :
226 0 : }
227 :
228 0 : ulong signal = fd_cnc_signal_query( cnc );
229 0 : if( FD_UNLIKELY( signal!=FD_VINYL_CNC_SIGNAL_RUN ) ) {
230 0 : if( FD_UNLIKELY( signal==FD_VINYL_CNC_SIGNAL_HALT ) ) break;
231 :
232 0 : switch( signal ) {
233 :
234 0 : case FD_VINYL_CNC_SIGNAL_SYNC: {
235 0 : fd_vinyl_io_sync( io, FD_VINYL_IO_FLAG_BLOCKING );
236 0 : break;
237 0 : }
238 :
239 0 : case FD_VINYL_CNC_SIGNAL_GET: {
240 0 : ulong old;
241 0 : int err = FD_VINYL_SUCCESS;
242 0 : switch( cmd->get.opt ) {
243 0 : case FD_VINYL_OPT_PART_THRESH: old = vinyl->part_thresh; break;
244 0 : case FD_VINYL_OPT_GC_THRESH: old = vinyl->gc_thresh; break;
245 0 : case FD_VINYL_OPT_GC_EAGER: old = (ulong)(long)vinyl->gc_eager; break;
246 0 : case FD_VINYL_OPT_STYLE: old = (ulong)(uint)vinyl->style; break;
247 0 : default: old = 0UL; err = FD_VINYL_ERR_INVAL; break;
248 0 : }
249 0 : cmd->get.val = old;
250 0 : cmd->get.err = err;
251 0 : break;
252 0 : }
253 :
254 0 : case FD_VINYL_CNC_SIGNAL_SET: { /* FIXME: ADD VALIDATION TO SET VALUES FOR OPT_GC_EAGER AND OPT_STYLE */
255 0 : ulong new = cmd->set.val;
256 0 : ulong old;
257 0 : int err = FD_VINYL_SUCCESS;
258 0 : switch( cmd->set.opt ) {
259 0 : case FD_VINYL_OPT_PART_THRESH: old = vinyl->part_thresh; vinyl->part_thresh = new; break;
260 0 : case FD_VINYL_OPT_GC_THRESH: old = vinyl->gc_thresh; vinyl->gc_thresh = new; break;
261 0 : case FD_VINYL_OPT_GC_EAGER: old = (ulong)(long)vinyl->gc_eager; vinyl->gc_eager = (int)new; break;
262 0 : case FD_VINYL_OPT_STYLE: old = (ulong)(uint)vinyl->style; vinyl->style = (int)new; break;
263 0 : default: old = 0UL; err = FD_VINYL_ERR_INVAL; break;
264 0 : }
265 0 : cmd->set.val = old;
266 0 : cmd->set.err = err;
267 0 : break;
268 0 : }
269 :
270 0 : case FD_VINYL_CNC_SIGNAL_CLIENT_JOIN: {
271 0 : int err;
272 :
273 0 : ulong link_id = cmd->join.link_id;
274 0 : ulong burst_max = cmd->join.burst_max;
275 0 : ulong quota_max = cmd->join.quota_max;
276 0 : char const * _rq = cmd->join.rq;
277 0 : char const * _cq = cmd->join.cq;
278 0 : char const * _wksp = cmd->join.wksp;
279 :
280 0 : if( FD_UNLIKELY( client_cnt>=FD_VINYL_CLIENT_MAX ) ) {
281 0 : FD_LOG_WARNING(( "Too many clients (increase FD_VINYL_CLIENT_MAX)" ));
282 0 : err = FD_VINYL_ERR_FULL;
283 0 : goto join_done;
284 0 : }
285 :
286 0 : if( FD_UNLIKELY( burst_max > burst_free ) ) {
287 0 : FD_LOG_WARNING(( "Too large burst_max (increase FD_VINYL_RECV_MAX or decrease burst_max)" ));
288 0 : err = FD_VINYL_ERR_FULL;
289 0 : goto join_done;
290 0 : }
291 :
292 0 : if( FD_UNLIKELY( quota_max > fd_ulong_min( quota_free, FD_VINYL_COMP_QUOTA_MAX ) ) ) {
293 0 : FD_LOG_WARNING(( "Too large quota_max (increase line_cnt or decrease quota_max)" ));
294 0 : err = FD_VINYL_ERR_FULL;
295 0 : goto join_done;
296 0 : }
297 :
298 0 : for( ulong client_idx=0UL; client_idx<client_cnt; client_idx++ ) {
299 0 : if( FD_UNLIKELY( _client[ client_idx ].link_id==link_id ) ) {
300 0 : FD_LOG_WARNING(( "Client already joined with this link_id" ));
301 0 : err = FD_VINYL_ERR_FULL;
302 0 : goto join_done;
303 0 : }
304 0 : }
305 :
306 0 : fd_vinyl_rq_t * rq = fd_vinyl_rq_join( fd_wksp_map( _rq ) );
307 0 : if( FD_UNLIKELY( !rq ) ) {
308 0 : FD_LOG_WARNING(( "Unable to join client rq" ));
309 0 : err = FD_VINYL_ERR_INVAL;
310 0 : goto join_done;
311 0 : }
312 :
313 0 : fd_vinyl_cq_t * cq = fd_vinyl_cq_join( fd_wksp_map( _cq ) );
314 0 : if( FD_UNLIKELY( !cq ) ) {
315 0 : FD_LOG_WARNING(( "Unable to join client cq" ));
316 0 : err = FD_VINYL_ERR_INVAL;
317 0 : goto join_done;
318 0 : }
319 :
320 0 : fd_wksp_t * wksp = fd_wksp_attach( _wksp );
321 0 : if( FD_UNLIKELY( !wksp ) ) {
322 0 : FD_LOG_WARNING(( "Unable to attach to client request workspace" ));
323 0 : err = FD_VINYL_ERR_INVAL;
324 0 : goto join_done;
325 0 : }
326 :
327 0 : _client[ client_cnt ].rq = rq;
328 0 : _client[ client_cnt ].cq = cq;
329 0 : _client[ client_cnt ].burst_max = burst_max;
330 0 : _client[ client_cnt ].seq = 0UL;
331 0 : _client[ client_cnt ].link_id = link_id;
332 0 : _client[ client_cnt ].laddr0 = (ulong)wksp;
333 0 : _client[ client_cnt ].laddr1 = ULONG_MAX; //wksp->gaddr_hi; /* FIXME: HOW TO GET THIS CLEANLY */
334 0 : _client[ client_cnt ].quota_rem = quota_max;
335 0 : _client[ client_cnt ].quota_max = quota_max;
336 0 : client_cnt++;
337 :
338 0 : quota_free -= quota_max;
339 0 : burst_free -= burst_max;
340 :
341 : /* Every client_cnt run loop iterations we receive at most:
342 :
343 : sum_clients recv_max = FD_VINYL_RECV_MAX - burst_free
344 :
345 : requests. To guarantee we processe requests fast enough
346 : that we never overrun our receive queue, under maximum
347 : client load, we need to process:
348 :
349 : sum_clients recv_max / client_cnt
350 :
351 : requests per run loop iteration. We thus set exec_max
352 : to the ceil sum_clients recv_max / client_cnt. */
353 :
354 0 : exec_max = (FD_VINYL_REQ_MAX - burst_free + client_cnt - 1UL) / client_cnt;
355 :
356 0 : err = FD_VINYL_SUCCESS;
357 :
358 0 : join_done:
359 0 : cmd->join.err = err;
360 0 : break;
361 0 : }
362 :
363 0 : case FD_VINYL_CNC_SIGNAL_CLIENT_LEAVE: {
364 0 : int err;
365 :
366 0 : ulong link_id = cmd->leave.link_id;
367 :
368 0 : for( ulong client_idx=0UL; client_idx<client_cnt; client_idx++ ) {
369 0 : if( _client[ client_idx ].link_id==link_id ) {
370 :
371 0 : if( FD_UNLIKELY( _client[ client_idx ].quota_rem != _client[ client_idx ].quota_max ) ) {
372 0 : FD_LOG_WARNING(( "client still has outstanding acquires" ));
373 0 : err = FD_VINYL_ERR_INVAL;
374 0 : goto leave_done;
375 0 : }
376 :
377 : /* discard pending requests from this client */
378 :
379 0 : ulong req_tail_new = req_head;
380 :
381 0 : for( ulong req_id=req_head; req_id<req_tail; req_id++ ) {
382 0 : ulong req_idx = req_id & (FD_VINYL_REQ_MAX-1UL);
383 0 : int discard = (_req[ req_idx ].link_id == client_idx); /* Note: link_id remapped while pending */
384 0 : _req[ req_tail_new & (FD_VINYL_REQ_MAX-1UL) ] = _req[ req_idx ];
385 0 : req_tail_new += (ulong)discard;
386 0 : }
387 :
388 0 : ulong discard_cnt = req_tail - req_tail_new;
389 0 : if( discard_cnt ) FD_LOG_WARNING(( "discard %lu pending requests from leaving client", discard_cnt ));
390 :
391 0 : req_tail = req_tail_new;
392 :
393 0 : fd_wksp_unmap( fd_vinyl_rq_leave( _client[ client_idx ].rq ) );
394 0 : fd_wksp_unmap( fd_vinyl_cq_leave( _client[ client_idx ].cq ) );
395 0 : fd_wksp_detach( (fd_wksp_t *)_client[ client_idx ].laddr0 );
396 :
397 0 : quota_free += _client[ client_idx ].quota_max;
398 0 : burst_free += _client[ client_idx ].burst_max;
399 :
400 0 : _client[ client_idx ] = _client[ --client_cnt ];
401 :
402 0 : exec_max = client_cnt ? ((FD_VINYL_REQ_MAX - burst_free + client_cnt - 1UL) / client_cnt) : 0UL;
403 :
404 0 : err = FD_VINYL_SUCCESS;
405 0 : goto leave_done;
406 0 : }
407 0 : }
408 :
409 0 : FD_LOG_WARNING(( "client not joined" ));
410 0 : err = FD_VINYL_ERR_EMPTY;
411 :
412 0 : leave_done:
413 0 : cmd->leave.err = err;
414 0 : break;
415 0 : }
416 :
417 0 : default: {
418 0 : FD_LOG_WARNING(( "unknown signal received (%lu); ignoring", signal ));
419 0 : break;
420 0 : }
421 :
422 0 : }
423 :
424 0 : fd_cnc_signal( cnc, FD_VINYL_CNC_SIGNAL_RUN );
425 0 : }
426 0 : }
427 :
428 : /* Receive requests from clients */
429 :
430 0 : if( FD_LIKELY( client_cnt ) ) {
431 :
432 : /* Select client to poll this run loop iteration */
433 :
434 0 : client_idx = fd_ulong_if( client_idx+1UL<client_cnt, client_idx+1UL, 0UL );
435 :
436 0 : fd_vinyl_client_t * client = _client + client_idx;
437 :
438 0 : fd_vinyl_rq_t * rq = client->rq;
439 0 : ulong seq = client->seq;
440 0 : ulong burst_max = client->burst_max;
441 0 : ulong link_id = client->link_id;
442 :
443 : /* Enqueue up to burst_max requests from this client into the
444 : local request queue. Using burst_max << FD_VINYL_REQ_MAX
445 : allows applications to prevent a bursty client from starving
446 : other clients of resources while preserving the spatial and
447 : temporal locality of reasonably sized O(burst_max) bursts from
448 : an individual client in processing below. Each run loop
449 : iteration can enqueue up to burst_max requests per iterations. */
450 :
451 0 : for( ulong recv_rem=fd_ulong_min( FD_VINYL_REQ_MAX-(req_tail-req_head), burst_max ); recv_rem; recv_rem-- ) {
452 0 : fd_vinyl_req_t * req = _req + (req_tail & (FD_VINYL_REQ_MAX-1UL));
453 :
454 0 : long diff = fd_vinyl_rq_recv( rq, seq, req );
455 :
456 0 : if( FD_LIKELY( diff>0L ) ) break; /* No requests waiting in rq at this time */
457 :
458 0 : if( FD_UNLIKELY( diff ) ) FD_LOG_CRIT(( "client overran request queue" ));
459 :
460 0 : seq++;
461 :
462 : /* We got the next request. Decide if we should accept it.
463 :
464 : Specifically, we ignore requests whose link_id don't match
465 : link_id (e.g. an unknown link_id or matches a different
466 : client's link_id ... don't know if it is where or even if it
467 : is safe to the completion). Even if the request provided an
468 : out-of-band location to send the completion (comp_gaddr!=0),
469 : we have no reason to trust it given the mismatch.
470 :
471 : This also gives a mechanism for a client use a single rq to
472 : send requests to multiple vinyl instances ... the client
473 : should use a different link_id for each vinyl instance. Each
474 : vinyl instance will quickly filter out the requests not
475 : addressed to it.
476 :
477 : Since we know the client_idx at this point, given a matching
478 : link_id, we stash the client_idx in the pending req link_id
479 : to eliminate the need to maintain a link_id<>client_idx map
480 : in the execution loop below. */
481 :
482 0 : if( FD_UNLIKELY( req->link_id!=link_id ) ) {
483 0 : accum_drop_link++;
484 0 : continue;
485 0 : }
486 :
487 0 : req->link_id = client_idx;
488 :
489 0 : req_tail++;
490 0 : }
491 :
492 0 : client->seq = seq;
493 0 : }
494 :
495 : /* Execute received requests */
496 :
497 0 : for( ulong exec_rem=fd_ulong_min( req_tail-req_head, exec_max ); exec_rem; exec_rem-- ) {
498 0 : fd_vinyl_req_t * req = _req + ((req_head++) & (FD_VINYL_REQ_MAX-1UL));
499 :
500 : /* Determine the client that sent this request and unpack the
501 : completion fields. We ignore requests with non-NULL but
502 : unmappable out-of-band completion because we can't send the
503 : completion in the expected manner and, in lieu of that, the
504 : receivers aren't expecting any completion to come via the cq
505 : (if any). Note that this implies requests that don't produce a
506 : completion (e.g. FETCH and FLUSH) need to either provide NULL
507 : or a valid non-NULL location for comp_gaddr to pass this
508 : validation (this is not a burden practically). */
509 :
510 0 : ulong req_id = req->req_id;
511 0 : ulong client_idx = req->link_id; /* See note above about link_id / client_idx conversion */
512 0 : ulong batch_cnt = (ulong)req->batch_cnt;
513 0 : ulong comp_gaddr = req->comp_gaddr;
514 :
515 0 : fd_vinyl_client_t * client = _client + client_idx;
516 :
517 0 : fd_vinyl_cq_t * cq = client->cq;
518 0 : ulong link_id = client->link_id;
519 0 : ulong client_laddr0 = client->laddr0;
520 0 : ulong client_laddr1 = client->laddr1;
521 0 : ulong quota_rem = client->quota_rem;
522 :
523 0 : FD_CRIT( quota_rem<=client->quota_max, "corruption detected" );
524 :
525 0 : fd_vinyl_comp_t * comp = MAP_REQ_GADDR( comp_gaddr, fd_vinyl_comp_t, 1UL );
526 0 : if( FD_UNLIKELY( (!comp) & (!!comp_gaddr) ) ) {
527 0 : accum_drop_comp++;
528 0 : continue;
529 0 : }
530 :
531 0 : int comp_err = 1;
532 0 : ulong fail_cnt = 0UL;
533 :
534 0 : ulong read_cnt = 0UL;
535 0 : ulong append_cnt = 0UL;
536 :
537 0 : switch( req->type ) {
538 :
539 0 : # include "fd_vinyl_case_acquire.c"
540 0 : # include "fd_vinyl_case_release.c"
541 0 : # include "fd_vinyl_case_erase.c"
542 0 : # include "fd_vinyl_case_move.c"
543 0 : # include "fd_vinyl_case_fetch.c"
544 0 : # include "fd_vinyl_case_flush.c"
545 0 : # include "fd_vinyl_case_try.c"
546 0 : # include "fd_vinyl_case_test.c"
547 :
548 0 : default:
549 0 : comp_err = FD_VINYL_ERR_INVAL;
550 0 : break;
551 0 : }
552 :
553 0 : for( ; read_cnt; read_cnt-- ) {
554 0 : fd_vinyl_io_rd_t * _rd; /* avoid pointer escape */
555 0 : fd_vinyl_io_poll( io, &_rd, FD_VINYL_IO_FLAG_BLOCKING );
556 0 : fd_vinyl_io_rd_t * rd = _rd;
557 :
558 0 : fd_vinyl_data_obj_t * obj = (fd_vinyl_data_obj_t *) rd->ctx;
559 0 : ulong seq = rd->seq; (void)seq;
560 0 : fd_vinyl_bstream_phdr_t * cphdr = (fd_vinyl_bstream_phdr_t *)rd->dst;
561 0 : ulong cpair_sz = rd->sz; (void)cpair_sz;
562 :
563 0 : fd_vinyl_data_obj_t * cobj = (fd_vinyl_data_obj_t *)fd_ulong_align_dn( (ulong)rd, FD_VINYL_BSTREAM_BLOCK_SZ );
564 :
565 0 : FD_CRIT( cphdr==fd_vinyl_data_obj_phdr( cobj ), "corruption detected" );
566 :
567 0 : ulong cpair_ctl = cphdr->ctl;
568 :
569 0 : int cpair_type = fd_vinyl_bstream_ctl_type ( cpair_ctl );
570 0 : int cpair_style = fd_vinyl_bstream_ctl_style( cpair_ctl );
571 0 : ulong cpair_val_esz = fd_vinyl_bstream_ctl_sz ( cpair_ctl );
572 :
573 0 : FD_CRIT( cpair_type==FD_VINYL_BSTREAM_CTL_TYPE_PAIR, "corruption detected" );
574 0 : FD_CRIT( cpair_sz ==fd_vinyl_bstream_pair_sz( cpair_val_esz ), "corruption detected" );
575 :
576 0 : schar * rd_err = cobj->rd_err;
577 :
578 0 : FD_CRIT ( rd_err, "corruption detected" );
579 0 : FD_ALERT( fd_vinyl_data_is_valid_obj( obj, vol, vol_cnt ), "corruption detected" );
580 :
581 0 : ulong line_idx = obj->line_idx;
582 :
583 0 : FD_CRIT( line_idx<line_cnt, "corruption detected" );
584 0 : FD_CRIT( line[ line_idx ].obj==obj, "corruption detected" );
585 :
586 0 : ulong ele_idx = line[ line_idx ].ele_idx;
587 :
588 0 : FD_CRIT ( ele_idx<ele_max, "corruption detected" );
589 0 : FD_ALERT( !memcmp( &ele0[ ele_idx ].phdr, cphdr, sizeof(fd_vinyl_bstream_phdr_t) ), "corruption detected" );
590 0 : FD_CRIT ( ele0[ ele_idx ].seq ==seq, "corruption detected" );
591 0 : FD_CRIT ( ele0[ ele_idx ].line_idx==line_idx, "corruption detected" );
592 :
593 : /* Verify data integrity */
594 :
595 0 : FD_ALERT( !fd_vinyl_bstream_pair_test( io_seed, seq, (fd_vinyl_bstream_block_t *)cphdr, cpair_sz ), "corruption detected" );
596 :
597 : /* Decode the pair */
598 :
599 0 : char * val = (char *)fd_vinyl_data_obj_val( obj );
600 0 : ulong val_sz = (ulong)cphdr->info.val_sz;
601 :
602 0 : FD_CRIT( val_sz <= FD_VINYL_VAL_MAX, "corruption detected" );
603 0 : FD_CRIT( fd_vinyl_data_obj_val_max( obj ) >= val_sz, "corruption detected" );
604 :
605 0 : if( FD_LIKELY( cpair_style==FD_VINYL_BSTREAM_CTL_STYLE_RAW ) ) {
606 :
607 0 : FD_CRIT( obj==cobj, "corruption detected" );
608 0 : FD_CRIT( cpair_val_esz==val_sz, "corruption detected" );
609 :
610 0 : } else {
611 :
612 0 : char const * cval = (char const *)fd_vinyl_data_obj_val( cobj );
613 0 : ulong cval_sz = fd_vinyl_bstream_ctl_sz( cpair_ctl );
614 :
615 0 : ulong _val_sz = (ulong)LZ4_decompress_safe( cval, val, (int)cval_sz, (int)val_sz );
616 0 : if( FD_UNLIKELY( _val_sz!=val_sz ) ) FD_LOG_CRIT(( "LZ4_decompress_safe failed" ));
617 :
618 0 : fd_vinyl_data_free( data, cobj );
619 :
620 0 : fd_vinyl_bstream_phdr_t * phdr = fd_vinyl_data_obj_phdr( obj );
621 :
622 0 : phdr->ctl = fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_PAIR, FD_VINYL_BSTREAM_CTL_STYLE_RAW, val_sz );
623 0 : phdr->key = cphdr->key;
624 0 : phdr->info = cphdr->info;
625 :
626 0 : }
627 :
628 0 : obj->rd_active = (short)0;
629 :
630 : /* Fill any trailing region with zeros (there is at least
631 : FD_VINYL_BSTREAM_FTR_SZ) and tell the client the item was
632 : successfully processed. */
633 :
634 0 : memset( val + val_sz, 0, fd_vinyl_data_szc_obj_footprint( (ulong)obj->szc )
635 0 : - (sizeof(fd_vinyl_data_obj_t) + sizeof(fd_vinyl_bstream_phdr_t) + val_sz) );
636 :
637 0 : FD_COMPILER_MFENCE();
638 0 : *rd_err = (schar)FD_VINYL_SUCCESS;
639 0 : FD_COMPILER_MFENCE();
640 :
641 0 : }
642 :
643 0 : if( FD_UNLIKELY( append_cnt ) ) fd_vinyl_io_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
644 :
645 0 : if( FD_LIKELY( comp_err<=0 ) ) fd_vinyl_cq_send( cq, comp, req_id, link_id, comp_err, batch_cnt, fail_cnt, quota_rem );
646 :
647 0 : client->quota_rem = quota_rem;
648 :
649 0 : }
650 :
651 0 : } /* run loop */
652 :
653 0 : ulong discard_cnt = req_tail - req_head;
654 :
655 : /* Append the final partition and sync so we can resume with a fast
656 : parallel recovery */
657 :
658 0 : fd_vinyl_io_append_part( io, seq_part, accum_dead_cnt, accum_move_cnt, NULL, 0UL );
659 :
660 0 : accum_dead_cnt = 0UL;
661 0 : accum_move_cnt = 0UL;
662 :
663 0 : accum_garbage_cnt++;
664 0 : accum_garbage_sz += FD_VINYL_BSTREAM_BLOCK_SZ;
665 :
666 0 : fd_vinyl_io_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
667 :
668 0 : fd_vinyl_io_sync( io, FD_VINYL_IO_FLAG_BLOCKING );
669 :
670 : /* Drain outstanding accumulators */
671 :
672 : /**/ accum_garbage_cnt = 0UL;
673 0 : vinyl->garbage_sz += accum_garbage_sz; accum_garbage_sz = 0UL;
674 :
675 0 : diag[ FD_VINYL_DIAG_DROP_LINK ] += accum_drop_link; accum_drop_link = 0UL;
676 0 : diag[ FD_VINYL_DIAG_DROP_COMP ] += accum_drop_comp; accum_drop_comp = 0UL;
677 0 : diag[ FD_VINYL_DIAG_CACHE_HIT ] += accum_cache_hit; accum_cache_hit = 0UL;
678 :
679 : /* Disconnect from the clients */
680 :
681 0 : ulong released_cnt = 0UL;
682 0 : for( ulong client_idx=0UL; client_idx<client_cnt; client_idx++ ) {
683 0 : released_cnt += (_client[ client_idx ].quota_max - _client[ client_idx ].quota_rem);
684 0 : fd_wksp_unmap( fd_vinyl_rq_leave( _client[ client_idx ].rq ) );
685 0 : fd_wksp_unmap( fd_vinyl_cq_leave( _client[ client_idx ].cq ) );
686 0 : fd_wksp_detach( (fd_wksp_t *)_client[ client_idx ].laddr0 );
687 0 : }
688 :
689 0 : if( FD_UNLIKELY( discard_cnt ) ) FD_LOG_WARNING(( "halt discarded %lu received requests", discard_cnt ));
690 0 : if( FD_UNLIKELY( released_cnt ) ) FD_LOG_WARNING(( "halt released %lu outstanding acquires", released_cnt ));
691 0 : if( FD_UNLIKELY( client_cnt ) ) FD_LOG_WARNING(( "halt disconneced %lu clients", client_cnt ));
692 :
693 : /* Return to boot state */
694 :
695 0 : fd_cnc_signal( cnc, FD_VINYL_CNC_SIGNAL_BOOT );
696 0 : }
|