Line data Source code
1 : #include "fd_vinyl_io_wd.h"
2 : #include "fd_ssctrl.h"
3 :
4 : /* fd_vinyl_io_wd manages a pool of DMA-friendly buffers.
5 :
6 : The system keeps an invariant that
7 : - There is a fixed amount of buffers in total. Buffers transition
8 : atomically between either of 3 states:
9 : - IDLE: queue of multiple buffers waiting to move to APPEND
10 : - APPEND: zero or one buffer currently being filled
11 : - IOWAIT: queue of multiple filled buffers sitting in a write queue
12 : waiting for snapwr to report them as complete
13 :
14 : ***********************************************************************/
15 :
16 : /* wd_block_used returns the number of bytes already committed in an
17 : APPEND buffer. */
18 :
19 : static ulong
20 0 : wd_block_used( fd_vinyl_io_wd_t * wd ) {
21 0 : if( FD_UNLIKELY( !wd->buf_append ) ) return 0UL;
22 0 : wd_buf_t * buf = wd->buf_append;
23 0 : if( buf->state!=WD_BUF_APPEND ) FD_LOG_NOTICE(( "append buf state %u", buf->state ));
24 0 : FD_CRIT( buf->state==WD_BUF_APPEND, "append buf in invalid state" );
25 :
26 0 : FD_CRIT( fd_vinyl_seq_ge( wd->base->seq_future, wd->base->seq_present ), "corrupt bstream io state" );
27 0 : ulong block_sz = wd->base->seq_future - buf->bstream_seq;
28 0 : FD_CRIT( block_sz<=wd->wr_mtu, "corrupt bstream io state" );
29 0 : return block_sz;
30 0 : }
31 :
32 : /* wd_dispatch enqueues the current APPEND buffer for writing, moving it
33 : to IOWAIT. */
34 :
35 : static void
36 0 : wd_dispatch( fd_vinyl_io_wd_t * wd ) {
37 :
38 : /* Map the current buffer to a bstream sequence range */
39 :
40 0 : FD_CRIT( wd->buf_append, "no append buf to dispatch" );
41 0 : wd_buf_t * buf = wd->buf_append;
42 0 : FD_CRIT( buf->state==WD_BUF_APPEND, "append buf in invalid state" );
43 :
44 0 : FD_CRIT( fd_vinyl_seq_ge( wd->base->seq_future, wd->base->seq_present ), "corrupt bstream io state" );
45 0 : ulong block_sz = wd_block_used( wd );
46 0 : FD_CRIT( fd_ulong_is_aligned( block_sz, FD_VINYL_BSTREAM_BLOCK_SZ ), "misaligned block_sz (bad appends?)" );
47 0 : FD_CRIT( block_sz>0UL, "attempted to dispatch empty buf" );
48 :
49 : /* Align up block_sz to multiple of 4096 bytes.
50 : This step is critical for good O_DIRECT performance. */
51 :
52 0 : ulong aligned_sz = fd_ulong_align_up( block_sz, 4096UL );
53 0 : ulong pad_sz = aligned_sz - block_sz;
54 0 : FD_CRIT( aligned_sz<=wd->wr_mtu, "corrupt bstream io state" );
55 0 : if( FD_UNLIKELY( pad_sz ) ) {
56 0 : FD_CRIT( fd_ulong_is_aligned( pad_sz, FD_VINYL_BSTREAM_BLOCK_SZ ), "misaligned zero padding computed" );
57 0 : for( ulong pad_rem=pad_sz; pad_rem>0UL; pad_rem-=FD_VINYL_BSTREAM_BLOCK_SZ ) {
58 0 : memset( buf->buf + block_sz, 0, FD_VINYL_BSTREAM_BLOCK_SZ );
59 0 : block_sz += FD_VINYL_BSTREAM_BLOCK_SZ;
60 0 : wd->base->seq_future += FD_VINYL_BSTREAM_BLOCK_SZ;
61 0 : }
62 0 : }
63 :
64 : /* Map the bstream sequence range to the device */
65 :
66 0 : FD_CRIT( buf->bstream_seq+block_sz==wd->base->seq_future, "corrupt bstream io state" );
67 0 : if( FD_UNLIKELY( wd->base->seq_future > wd->dev_sz ) ) {
68 : /* FIXME log vinyl instance name */
69 0 : FD_LOG_ERR(( "vinyl database is out of space (dev_sz=%lu)", wd->dev_sz ));
70 0 : }
71 0 : ulong dev_off = wd->dev_base + buf->bstream_seq;
72 :
73 0 : ulong seq = wd->wr_seq;
74 0 : ulong sig = dev_off;
75 0 : ulong ctl = FD_SNAPSHOT_MSG_DATA;
76 0 : ulong chunk = fd_laddr_to_chunk( wd->wr_base, buf->buf );
77 0 : ulong sz = block_sz>>FD_VINYL_BSTREAM_BLOCK_LG_SZ;
78 0 : FD_CRIT( sz<=USHORT_MAX, "block_sz too large" );
79 0 : fd_mcache_publish( wd->wr_mcache, wd->wr_depth, seq, sig, chunk, sz, ctl, 0UL, 0UL );
80 0 : wd->wr_seq = fd_seq_inc( seq, 1UL );
81 :
82 0 : buf->next = NULL;
83 0 : buf->state = WD_BUF_IOWAIT;
84 0 : buf->io_seq = seq;
85 0 : buf->bstream_seq = wd->base->seq_future;
86 0 : wd->buf_append = NULL;
87 0 : if( wd->buf_iowait_tail ) wd->buf_iowait_tail->next = buf;
88 0 : if( !wd->buf_iowait_head ) wd->buf_iowait_head = buf;
89 0 : wd->buf_iowait_tail = buf;
90 0 : }
91 :
92 : static void
93 : fd_vinyl_io_wd_read_imm( fd_vinyl_io_t * io,
94 : ulong seq0,
95 : void * _dst,
96 0 : ulong sz ) {
97 0 : (void)io; (void)seq0; (void)_dst; (void)sz;
98 0 : FD_LOG_CRIT(( "vinyl_io_wd does not support read_imm" ));
99 0 : }
100 :
101 : static void
102 : fd_vinyl_io_wd_read( fd_vinyl_io_t * io,
103 0 : fd_vinyl_io_rd_t * _rd ) {
104 0 : (void)io; (void)_rd;
105 0 : FD_LOG_CRIT(( "vinyl_io_wd does not support read" ));
106 0 : }
107 :
108 : static int
109 : fd_vinyl_io_wd_poll( fd_vinyl_io_t * io,
110 : fd_vinyl_io_rd_t ** _rd,
111 0 : int flags ) {
112 0 : (void)io; (void)_rd; (void)flags;
113 0 : FD_LOG_CRIT(( "vinyl_io_wd does not support poll" ));
114 0 : }
115 :
116 : /* fd_vinyl_io_wd_append starts an asynchronous append operation. _src
117 : is assumed to be a pointer returned by the most recent alloc.
118 : Updates io->base->seq_future. */
119 :
120 : static ulong
121 : fd_vinyl_io_wd_append( fd_vinyl_io_t * io,
122 : void const * _src,
123 0 : ulong sz ) {
124 0 : fd_vinyl_io_wd_t * wd = (fd_vinyl_io_wd_t *)io; /* Note: io must be non-NULL to have even been called */
125 0 : uchar const * src = (uchar const *)_src;
126 :
127 : /* Validate the input args. */
128 :
129 0 : ulong seq_future = wd->base->seq_future;
130 0 : if( FD_UNLIKELY( !sz ) ) return seq_future;
131 :
132 0 : wd_buf_t * buf = wd->buf_append;
133 0 : FD_CRIT( buf, "no append buf" );
134 0 : FD_CRIT( buf->state==WD_BUF_APPEND, "corrupt wd_buf" );
135 0 : FD_CRIT( src>=buf->buf && (src+sz)<=buf->buf+wd->wr_mtu, "alloc invalidated" );
136 :
137 0 : int bad_src = !src || src<wd->wr_chunk0 || (src+sz)>wd->wr_chunk1;
138 0 : int bad_align = !fd_ulong_is_aligned( (ulong)src, FD_VINYL_BSTREAM_BLOCK_SZ );
139 0 : int bad_sz = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
140 :
141 0 : if( FD_UNLIKELY( bad_src | bad_align | bad_sz ) ) {
142 0 : if( bad_src ) FD_LOG_CRIT(( "src not a valid alloc ptr ([%p,%p) not in [%p,%p))",
143 0 : (void *)src, (void *)(src+sz),
144 0 : (void *)wd->wr_chunk0, (void *)wd->wr_chunk1 ));
145 0 : if( bad_align ) FD_LOG_CRIT(( "misaligned src %p", (void *)src ));
146 0 : if( bad_sz ) FD_LOG_CRIT(( "misaligned sz %lu", sz ));
147 0 : }
148 :
149 : /* At this point, we appear to have a valid append request. Map it to
150 : the bstream (updating seq_future). */
151 :
152 0 : ulong seq = seq_future;
153 0 : wd->base->seq_future = seq + sz;
154 :
155 0 : return seq;
156 0 : }
157 :
158 : /* wd_poll_write checks for write completions (fseq). Returns
159 : FD_VINYL_SUCCESS if all writes completed, FD_VINYL_ERR_AGAIN
160 : otherwise. Updates io->base->seq_present. */
161 :
162 : static int
163 0 : wd_poll_write( fd_vinyl_io_t * io ) {
164 0 : fd_vinyl_io_wd_t * wd = (fd_vinyl_io_wd_t *)io; /* Note: io must be non-NULL to have even been called */
165 :
166 0 : while( wd->buf_iowait_head ) {
167 0 : wd_buf_t * buf = wd->buf_iowait_head;
168 0 : FD_CRIT( buf->state==WD_BUF_IOWAIT, "corrupt wd_buf" );
169 :
170 0 : ulong comp_seq = buf->io_seq;
171 0 : ulong found_seq = fd_fseq_query( wd->wr_fseq );
172 : /* each seq in [0,found_seq) is consumed (non-inclusive) */
173 0 : if( fd_seq_ge( comp_seq, found_seq ) ) break;
174 0 : FD_CRIT( fd_seq_le( found_seq, wd->wr_seq ), "got completion for a sequence number not yet submitted" );
175 0 : FD_CRIT( fd_vinyl_seq_eq( comp_seq, wd->wr_seqack ), "out-of-order ACK polling" );
176 :
177 : /* Buffer completed, remove from IOWAIT list */
178 0 : wd->buf_iowait_head = buf->next;
179 0 : if( !wd->buf_iowait_head ) wd->buf_iowait_tail = NULL;
180 :
181 : /* Update seq_present (tracks last write) */
182 0 : wd->base->seq_present = fd_ulong_max( wd->base->seq_present, buf->bstream_seq );
183 :
184 : /* Return buffer to IDLE list */
185 0 : buf->state = WD_BUF_IDLE;
186 0 : buf->next = wd->buf_idle;
187 0 : buf->io_seq = 0UL;
188 0 : buf->bstream_seq = 0UL;
189 0 : wd->buf_idle = buf;
190 0 : wd->wr_seqack = fd_seq_inc( comp_seq, 1UL );
191 0 : }
192 :
193 0 : return wd->buf_iowait_head ? FD_VINYL_ERR_AGAIN : FD_VINYL_SUCCESS;
194 0 : }
195 :
196 : /* fd_vinyl_io_wd_commit completes the dispatches the current APPEND
197 : block and polls for completions. */
198 :
199 : static int
200 : fd_vinyl_io_wd_commit( fd_vinyl_io_t * io,
201 0 : int flags ) {
202 0 : fd_vinyl_io_wd_t * wd = (fd_vinyl_io_wd_t *)io; /* Note: io must be non-NULL to have even been called */
203 :
204 0 : if( FD_UNLIKELY( fd_vinyl_seq_ne( wd->base->seq_present, wd->base->seq_future ) ) ) {
205 0 : FD_CRIT( wd->buf_append, "no append buf to commit, but seq_future indicates inflight write" );
206 :
207 0 : wd_dispatch( wd );
208 :
209 0 : wd->base->seq_present = wd->base->seq_future;
210 0 : wd->base->spad_used = 0UL;
211 0 : }
212 :
213 0 : int poll_err;
214 0 : do {
215 0 : poll_err = wd_poll_write( io );
216 0 : if( FD_LIKELY( poll_err!=FD_VINYL_ERR_AGAIN ) ) break;
217 0 : FD_SPIN_PAUSE();
218 0 : } while( flags & FD_VINYL_IO_FLAG_BLOCKING );
219 0 : return poll_err;
220 0 : }
221 :
222 : static ulong
223 : fd_vinyl_io_wd_hint( fd_vinyl_io_t * io,
224 0 : ulong sz ) {
225 0 : (void)io; (void)sz;
226 0 : FD_LOG_CRIT(( "vinyl_io_wd does not support hint" ));
227 0 : }
228 :
229 : /* fd_vinyl_io_wd_alloc reserves sz bytes from the current APPEND block
230 : (starting a new one or dispatching the previous one if needed).
231 :
232 : Note that the caller could do two allocs back-to-back, in which case
233 : the first alloc should be discarded and replaced by the first. Also
234 : note that trim could reduce the size of an alloc. */
235 :
236 : static void *
237 : fd_vinyl_io_wd_alloc1( fd_vinyl_io_t * io,
238 0 : ulong sz ) {
239 0 : fd_vinyl_io_wd_t * wd = (fd_vinyl_io_wd_t *)io; /* Note: io must be non-NULL to have even been called */
240 :
241 0 : if( FD_UNLIKELY( sz > wd->wr_mtu ) ) {
242 0 : FD_LOG_CRIT(( "requested write sz (%lu) exceeds MTU (%lu)", sz, wd->wr_mtu ));
243 0 : }
244 :
245 : /* Acquire an append buffer */
246 :
247 0 : wd_buf_t * buf = NULL;
248 0 : ulong buf_used = 0UL;
249 0 : if( wd->buf_append ) {
250 0 : /* */ buf_used = wd_block_used( wd );
251 0 : ulong buf_free = wd->wr_mtu - buf_used;
252 0 : FD_CRIT( buf_used <= wd->wr_mtu, "corrupt wd_buf" );
253 0 : if( FD_LIKELY( sz <= buf_free ) ) {
254 0 : buf = wd->buf_append;
255 0 : } else {
256 0 : wd_dispatch( wd ); /* transition buf from APPEND to IOWAIT */
257 0 : buf = NULL;
258 0 : buf_used = 0UL;
259 0 : }
260 0 : }
261 0 : if( FD_UNLIKELY( !buf ) ) {
262 0 : if( FD_LIKELY( wd->buf_idle ) ) {
263 : /* Start a new buffer */
264 0 : FD_CRIT( wd->buf_idle->state==WD_BUF_IDLE, "corrupt wd_buf" );
265 0 : wd->buf_append = wd->buf_idle;
266 0 : wd->buf_idle = wd->buf_idle->next;
267 0 : buf = wd->buf_append;
268 :
269 0 : wd->buf_append->next = NULL;
270 0 : wd->buf_append->state = WD_BUF_APPEND;
271 0 : wd->buf_append->bstream_seq = wd->base->seq_future;
272 0 : } else {
273 : /* All buffers are IOWAIT, cannot make progress */
274 0 : return NULL;
275 0 : }
276 0 : }
277 :
278 : /* At this point, we have an APPEND state buffer that is large enough
279 : to fit the alloc. */
280 :
281 0 : FD_CRIT( buf->state==WD_BUF_APPEND, "corrupt wd_buf" );
282 0 : return buf->buf + buf_used;
283 0 : }
284 :
285 : void *
286 : fd_vinyl_io_wd_alloc( fd_vinyl_io_t * io,
287 : ulong sz,
288 0 : int flags ) {
289 0 : if( FD_UNLIKELY( !sz ) ) return NULL;
290 0 : for(;;) {
291 0 : void * p = fd_vinyl_io_wd_alloc1( io, sz );
292 0 : if( FD_LIKELY( p || !(flags & FD_VINYL_IO_FLAG_BLOCKING) ) ) {
293 0 : return p;
294 0 : }
295 0 : wd_poll_write( io );
296 0 : FD_SPIN_PAUSE();
297 0 : }
298 0 : }
299 :
300 : static ulong
301 : fd_vinyl_io_wd_copy( fd_vinyl_io_t * io,
302 : ulong seq_src0,
303 0 : ulong sz ) {
304 0 : (void)io; (void)seq_src0; (void)sz;
305 0 : FD_LOG_CRIT(( "vinyl_io_wd does not support copy" ));
306 0 : }
307 :
308 : static void
309 : fd_vinyl_io_wd_forget( fd_vinyl_io_t * io,
310 0 : ulong seq ) {
311 0 : (void)io; (void)seq;
312 0 : FD_LOG_CRIT(( "vinyl_io_wd does not support forget" ));
313 0 : }
314 :
315 : static void
316 : fd_vinyl_io_wd_rewind( fd_vinyl_io_t * io,
317 0 : ulong seq ) {
318 0 : (void)io; (void)seq;
319 0 : FD_LOG_CRIT(( "vinyl_io_wd does not support rewind" ));
320 0 : }
321 :
322 : static int
323 : fd_vinyl_io_wd_sync( fd_vinyl_io_t * io,
324 0 : int flags ) {
325 0 : (void)io; (void)flags;
326 0 : FD_LOG_CRIT(( "vinyl_io_wd does not support sync" ));
327 0 : }
328 :
329 : static void *
330 0 : fd_vinyl_io_wd_fini( fd_vinyl_io_t * io ) {
331 0 : fd_vinyl_io_wd_t * wd = (fd_vinyl_io_wd_t *)io; /* Note: io must be non-NULL to have even been called */
332 :
333 0 : ulong seq_present = wd->base->seq_present;
334 0 : ulong seq_future = wd->base->seq_future;
335 :
336 0 : if( FD_UNLIKELY( fd_vinyl_seq_ne( seq_present, seq_future ) ) ) FD_LOG_WARNING(( "fini discarding uncommited blocks" ));
337 :
338 0 : return io;
339 0 : }
340 :
341 : fd_vinyl_io_impl_t fd_vinyl_io_wd_impl = {
342 : fd_vinyl_io_wd_read_imm,
343 : fd_vinyl_io_wd_read,
344 : fd_vinyl_io_wd_poll,
345 : fd_vinyl_io_wd_append,
346 : fd_vinyl_io_wd_commit,
347 : fd_vinyl_io_wd_hint,
348 : fd_vinyl_io_wd_alloc,
349 : fd_vinyl_io_wd_copy,
350 : fd_vinyl_io_wd_forget,
351 : fd_vinyl_io_wd_rewind,
352 : fd_vinyl_io_wd_sync,
353 : fd_vinyl_io_wd_fini
354 : };
355 :
356 : ulong
357 0 : fd_vinyl_io_wd_align( void ) {
358 0 : return alignof(fd_vinyl_io_wd_t);
359 0 : }
360 :
361 : ulong
362 0 : fd_vinyl_io_wd_footprint( ulong queue_depth ) {
363 0 : if( FD_UNLIKELY( !queue_depth || !fd_ulong_is_pow2( queue_depth ) || queue_depth>UINT_MAX ) )
364 0 : return 0UL;
365 0 : return sizeof(fd_vinyl_io_wd_t) + (queue_depth*sizeof(wd_buf_t));
366 0 : }
367 :
368 : fd_vinyl_io_t *
369 : fd_vinyl_io_wd_init( void * lmem,
370 : ulong dev_sz,
371 : ulong io_seed,
372 : fd_frag_meta_t * block_mcache,
373 : uchar * block_dcache,
374 : ulong const * block_fseq,
375 0 : ulong block_mtu ) {
376 : /* Mostly copied from fd_vinyl_io_bd.c */
377 :
378 0 : fd_vinyl_io_wd_t * wd = (fd_vinyl_io_wd_t *)lmem;
379 :
380 0 : if( FD_UNLIKELY( !wd ) ) {
381 0 : FD_LOG_WARNING(( "NULL mem" ));
382 0 : return NULL;
383 0 : }
384 :
385 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)wd, fd_vinyl_io_wd_align() ) ) ) {
386 0 : FD_LOG_WARNING(( "misaligned mem" ));
387 0 : return NULL;
388 0 : }
389 :
390 0 : if( FD_UNLIKELY( !block_mcache ) ) {
391 0 : FD_LOG_WARNING(( "NULL block_mcache" ));
392 0 : return NULL;
393 0 : }
394 :
395 0 : if( FD_UNLIKELY( !block_dcache ) ) {
396 0 : FD_LOG_WARNING(( "NULL block_dcache" ));
397 0 : return NULL;
398 0 : }
399 :
400 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)block_dcache, FD_VINYL_BSTREAM_BLOCK_SZ ) ) ) {
401 0 : FD_LOG_WARNING(( "misaligned block_dcache (addr=%p)", (void *)block_dcache ));
402 0 : return NULL;
403 0 : }
404 :
405 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( block_mtu, FD_VINYL_BSTREAM_BLOCK_SZ ) ) ) {
406 0 : FD_LOG_WARNING(( "misaligned block_mtu (%lu)", block_mtu ));
407 0 : return NULL;
408 0 : }
409 :
410 0 : if( FD_UNLIKELY( block_mtu > (USHORT_MAX<<FD_VINYL_BSTREAM_BLOCK_LG_SZ) ) ) {
411 0 : FD_LOG_WARNING(( "oversz block_mtu (%lu)", block_mtu ));
412 0 : return NULL;
413 0 : }
414 :
415 0 : ulong block_depth = fd_mcache_depth( block_mcache );
416 0 : if( FD_UNLIKELY( !block_depth || !fd_ulong_is_pow2( block_depth ) || block_depth>UINT_MAX ) ) {
417 0 : FD_LOG_WARNING(( "block_mcache depth (%lu) invalid", block_depth ));
418 0 : return NULL;
419 0 : }
420 :
421 0 : ulong dcache_data_sz = fd_dcache_data_sz( block_dcache );
422 0 : if( FD_UNLIKELY( dcache_data_sz<(block_depth*block_mtu) ) ) {
423 0 : FD_LOG_WARNING(( "block_dcache size (%lu) too small for block_depth (%lu) and block_mtu (%lu), required at least %lu bytes",
424 0 : dcache_data_sz, block_depth, block_mtu, block_depth*block_mtu ));
425 0 : return NULL;
426 0 : }
427 :
428 0 : ulong footprint = fd_vinyl_io_wd_footprint( block_depth );
429 0 : if( FD_UNLIKELY( !footprint ) ) {
430 0 : FD_LOG_WARNING(( "bad spad_max" ));
431 0 : return NULL;
432 0 : }
433 :
434 0 : memset( wd, 0, footprint );
435 :
436 0 : wd->base->type = FD_VINYL_IO_TYPE_BD;
437 :
438 : /* Base class members. Note that vinyl_io does not have an actual
439 : scratch pad (emplaces writes directly into dcache). Instead,
440 : spad_{used_max} track the space available in the current APPEND
441 : block. */
442 :
443 0 : wd->base->spad_max = block_mtu;
444 0 : wd->base->spad_used = 0UL;
445 0 : wd->base->impl = &fd_vinyl_io_wd_impl;
446 :
447 0 : wd->dev_base = FD_VINYL_BSTREAM_BLOCK_SZ;
448 0 : wd->dev_sz = dev_sz - FD_VINYL_BSTREAM_BLOCK_SZ;
449 :
450 0 : wd->buf_idle = NULL;
451 0 : wd->buf_append = NULL;
452 0 : wd->buf_iowait_head = NULL;
453 0 : wd->buf_iowait_tail = NULL;
454 :
455 0 : wd->wr_mcache = block_mcache;
456 0 : wd->wr_seq = fd_mcache_seq0( block_mcache );
457 0 : wd->wr_seqack = wd->wr_seq;
458 0 : wd->wr_depth = block_depth;
459 0 : wd->wr_base = block_dcache;
460 0 : wd->wr_chunk0 = wd->wr_base;
461 0 : wd->wr_chunk1 = wd->wr_base + (block_depth*block_mtu);
462 0 : wd->wr_fseq = block_fseq;
463 0 : wd->wr_mtu = block_mtu;
464 :
465 : /* Set vinyl io_seed */
466 :
467 0 : FD_CRIT( fd_dcache_app_sz( block_dcache )>=sizeof(ulong), "block_dcache app region too small to hold io_seed" );
468 0 : FD_COMPILER_MFENCE();
469 0 : FD_STORE( ulong, fd_dcache_app_laddr( block_dcache ), io_seed );
470 0 : FD_COMPILER_MFENCE();
471 :
472 : /* Set up initial buffer list state */
473 :
474 0 : wd_buf_t * buf_pool = (wd_buf_t *)(wd+1);
475 0 : for( long i=(long)block_depth-1L; i>=0L; i-- ) {
476 0 : uchar * block = wd->wr_base + (ulong)i*block_mtu;
477 0 : FD_CRIT( fd_ulong_is_aligned( (ulong)block, FD_VINYL_BSTREAM_BLOCK_SZ ), "misaligned wd_buf" );
478 0 : wd_buf_t * buf = buf_pool+i;
479 0 : *buf = (wd_buf_t) {
480 0 : .buf = block,
481 0 : .next = wd->buf_idle,
482 0 : .state = WD_BUF_IDLE
483 0 : };
484 0 : wd->buf_idle = buf;
485 0 : }
486 :
487 : /* We are starting a new bstream. */
488 :
489 0 : wd->base->seed = io_seed;
490 0 : wd->base->seq_ancient = 0UL;
491 0 : wd->base->seq_past = 0UL;
492 0 : wd->base->seq_present = 0UL;
493 0 : wd->base->seq_future = 0UL;
494 :
495 0 : FD_LOG_INFO(( "IO config"
496 0 : "\n\ttype wd"
497 0 : "\n\tblock_depth %lu"
498 0 : "\n\tblock_mtu %lu bytes"
499 0 : "\n\treset 1"
500 0 : "\n\tio_seed 0x%016lx",
501 0 : block_depth, block_mtu,
502 0 : io_seed ));
503 :
504 0 : return wd->base;
505 0 : }
506 :
507 : int
508 0 : fd_vinyl_io_wd_busy( fd_vinyl_io_t * io ) {
509 0 : fd_vinyl_io_wd_t * wd = (fd_vinyl_io_wd_t *)io;
510 0 : return !!wd->buf_append || !!wd->buf_iowait_head;
511 0 : }
512 :
513 : void
514 : fd_vinyl_io_wd_ctrl( fd_vinyl_io_t * io,
515 : ulong ctl,
516 0 : ulong sig ) {
517 0 : fd_vinyl_io_wd_t * wd = (fd_vinyl_io_wd_t *)io;
518 :
519 0 : if( FD_UNLIKELY( fd_vinyl_io_wd_busy( io ) ) ) {
520 0 : FD_LOG_CRIT(( "Attempted to send control message via io_wd while still busy" ));
521 0 : }
522 :
523 0 : fd_mcache_publish( wd->wr_mcache, wd->wr_depth, wd->wr_seq, sig, 0UL, 0UL, ctl, 0UL, 0UL );
524 0 : }
|