Line data Source code
1 : #include "fd_vinyl_io.h"
2 :
3 : #include <errno.h>
4 : #include <unistd.h>
5 :
6 : struct fd_vinyl_io_bd_rd;
7 : typedef struct fd_vinyl_io_bd_rd fd_vinyl_io_bd_rd_t;
8 :
9 : struct fd_vinyl_io_bd_rd {
10 : ulong ctx; /* Must mirror fd_vinyl_io_rd_t */
11 : ulong seq; /* " */
12 : void * dst; /* " */
13 : ulong sz; /* " */
14 : fd_vinyl_io_bd_rd_t * next; /* Next element in bd rd queue */
15 : };
16 :
17 : struct fd_vinyl_io_bd {
18 : fd_vinyl_io_t base[1];
19 : int dev_fd; /* File descriptor of block device */
20 : ulong dev_sync; /* Offset to block that holds bstream sync (BLOCK_SZ multiple) */
21 : ulong dev_base; /* Offset to first block (BLOCK_SZ multiple) */
22 : ulong dev_sz; /* Block store byte size (BLOCK_SZ multiple) */
23 : fd_vinyl_io_bd_rd_t * rd_head; /* Pointer to queue head */
24 : fd_vinyl_io_bd_rd_t ** rd_tail_next; /* Pointer to queue &tail->next or &rd_head if empty. */
25 : fd_vinyl_bstream_block_t sync[1];
26 : /* spad_max bytes follow */
27 : };
28 :
29 : typedef struct fd_vinyl_io_bd fd_vinyl_io_bd_t;
30 :
31 : static inline void
32 : bd_read( fd_vinyl_io_bd_t * bd,
33 : ulong off,
34 : void * buf,
35 3379572 : ulong sz ) {
36 3379572 : int fd = bd->dev_fd;
37 3379572 : bd->base->file_read_cnt++;
38 3379572 : bd->base->file_read_tot_sz += sz;
39 3379572 : ssize_t ssz = pread( fd, buf, sz, (off_t)off );
40 3379572 : if( FD_LIKELY( ssz==(ssize_t)sz ) ) return;
41 0 : if( ssz<(ssize_t)0 ) FD_LOG_CRIT(( "pread(fd %i,off %lu,sz %lu) failed (%i-%s)", fd, off, sz, errno, fd_io_strerror( errno ) ));
42 0 : /**/ FD_LOG_CRIT(( "pread(fd %i,off %lu,sz %lu) failed (unexpected sz %li)", fd, off, sz, (long)ssz ));
43 0 : }
44 :
45 : static inline void
46 : bd_write( fd_vinyl_io_bd_t * bd,
47 : ulong off,
48 : void const * buf,
49 1088319 : ulong sz ) {
50 1088319 : int fd = bd->dev_fd;
51 1088319 : bd->base->file_write_cnt++;
52 1088319 : bd->base->file_write_tot_sz += sz;
53 1088319 : ssize_t ssz = pwrite( fd, buf, sz, (off_t)off );
54 1088319 : if( FD_LIKELY( ssz==(ssize_t)sz ) ) return;
55 0 : if( ssz<(ssize_t)0 ) FD_LOG_CRIT(( "pwrite(fd %i,off %lu,sz %lu) failed (%i-%s)", fd, off, sz, errno, fd_io_strerror( errno ) ));
56 0 : else FD_LOG_CRIT(( "pwrite(fd %i,off %lu,sz %lu) failed (unexpected sz %li)", fd, off, sz, (long)ssz ));
57 0 : }
58 :
59 : static void
60 : fd_vinyl_io_bd_read_imm( fd_vinyl_io_t * io,
61 : ulong seq0,
62 : void * _dst,
63 1499367 : ulong sz ) {
64 1499367 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
65 :
66 : /* If this is a request to read nothing, succeed immediately. If
67 : this is a request to read outside the bstream's past, fail. */
68 :
69 1499367 : if( FD_UNLIKELY( !sz ) ) return;
70 :
71 1351080 : uchar * dst = (uchar *)_dst;
72 1351080 : ulong seq1 = seq0 + sz;
73 :
74 1351080 : ulong seq_past = bd->base->seq_past;
75 1351080 : ulong seq_present = bd->base->seq_present;
76 :
77 1351080 : int bad_seq = !fd_ulong_is_aligned( seq0, FD_VINYL_BSTREAM_BLOCK_SZ );
78 1351080 : int bad_dst = !dst;
79 1351080 : int bad_sz = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
80 1351080 : int bad_past = !(fd_vinyl_seq_le( seq_past, seq0 ) & fd_vinyl_seq_lt( seq0, seq1 ) & fd_vinyl_seq_le( seq1, seq_present ));
81 :
82 1351080 : if( FD_UNLIKELY( bad_seq | bad_dst | bad_sz | bad_past ) )
83 0 : FD_LOG_CRIT(( "bstream read_imm [%016lx,%016lx)/%lu failed (past [%016lx,%016lx)/%lu, %s)",
84 1351080 : seq0, seq1, sz, seq_past, seq_present, seq_present-seq_past,
85 1351080 : bad_seq ? "misaligned seq" :
86 1351080 : bad_dst ? "NULL dst" :
87 1351080 : bad_sz ? "misaligned sz" :
88 1351080 : "not in past" ));
89 :
90 : /* At this point, we have a valid read request. Map seq0 into the
91 : bstream store. Read the lesser of sz bytes or until the store end.
92 : If we hit the store end with more to go, wrap around and finish the
93 : read at the store start. */
94 :
95 1351080 : ulong dev_base = bd->dev_base;
96 1351080 : ulong dev_sz = bd->dev_sz;
97 :
98 1351080 : ulong dev_off = seq0 % dev_sz;
99 :
100 1351080 : ulong rsz = fd_ulong_min( sz, dev_sz - dev_off );
101 1351080 : bd_read( bd, dev_base + dev_off, dst, rsz );
102 1351080 : sz -= rsz;
103 :
104 1351080 : if( FD_UNLIKELY( sz ) ) bd_read( bd, dev_base, dst + rsz, sz );
105 1351080 : }
106 :
107 : static void
108 : fd_vinyl_io_bd_read( fd_vinyl_io_t * io,
109 1874070 : fd_vinyl_io_rd_t * _rd ) {
110 1874070 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *) io; /* Note: io must be non-NULL to have even been called */
111 1874070 : fd_vinyl_io_bd_rd_t * rd = (fd_vinyl_io_bd_rd_t *)_rd;
112 :
113 1874070 : rd->next = NULL;
114 1874070 : *bd->rd_tail_next = rd;
115 1874070 : bd->rd_tail_next = &rd->next;
116 :
117 1874070 : ulong seq0 = rd->seq;
118 1874070 : uchar * dst = (uchar *)rd->dst;
119 1874070 : ulong sz = rd->sz;
120 :
121 : /* If this is a request to read nothing, succeed immediately. If
122 : this is a request to read outside the bstream's past, fail. */
123 :
124 1874070 : if( FD_UNLIKELY( !sz ) ) return;
125 :
126 1689405 : ulong seq1 = seq0 + sz;
127 :
128 1689405 : ulong seq_past = bd->base->seq_past;
129 1689405 : ulong seq_present = bd->base->seq_present;
130 :
131 1689405 : int bad_seq = !fd_ulong_is_aligned( seq0, FD_VINYL_BSTREAM_BLOCK_SZ );
132 1689405 : int bad_dst = !dst;
133 1689405 : int bad_sz = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
134 1689405 : int bad_past = !(fd_vinyl_seq_le( seq_past, seq0 ) & fd_vinyl_seq_lt( seq0, seq1 ) & fd_vinyl_seq_le( seq1, seq_present ));
135 :
136 1689405 : if( FD_UNLIKELY( bad_seq | bad_dst | bad_sz | bad_past ) )
137 0 : FD_LOG_CRIT(( "bstream read [%016lx,%016lx)/%lu failed (past [%016lx,%016lx)/%lu, %s)",
138 1689405 : seq0, seq1, sz, seq_past, seq_present, seq_present-seq_past,
139 1689405 : bad_seq ? "misaligned seq" :
140 1689405 : bad_dst ? "NULL dst" :
141 1689405 : bad_sz ? "misaligned sz" :
142 1689405 : "not in past" ));
143 :
144 : /* At this point, we have a valid read request. Map seq0 into the
145 : bstream store. Read the lesser of sz bytes or until the store end.
146 : If we hit the store end with more to go, wrap around and finish the
147 : read at the store start. */
148 :
149 1689405 : ulong dev_base = bd->dev_base;
150 1689405 : ulong dev_sz = bd->dev_sz;
151 :
152 1689405 : ulong dev_off = seq0 % dev_sz;
153 :
154 1689405 : ulong rsz = fd_ulong_min( sz, dev_sz - dev_off );
155 1689405 : bd_read( bd, dev_base + dev_off, dst, rsz );
156 1689405 : sz -= rsz;
157 :
158 1689405 : if( FD_UNLIKELY( sz ) ) bd_read( bd, dev_base, dst + rsz, sz );
159 1689405 : }
160 :
161 : static int
162 : fd_vinyl_io_bd_poll( fd_vinyl_io_t * io,
163 : fd_vinyl_io_rd_t ** _rd,
164 3748140 : int flags ) {
165 3748140 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t * )io; /* Note: io must be non-NULL to have even been called */
166 3748140 : (void)flags;
167 :
168 3748140 : fd_vinyl_io_bd_rd_t * rd = bd->rd_head;
169 :
170 3748140 : if( FD_UNLIKELY( !rd ) ) {
171 1874070 : *_rd = NULL;
172 1874070 : return FD_VINYL_ERR_EMPTY;
173 1874070 : }
174 :
175 1874070 : fd_vinyl_io_bd_rd_t ** rd_tail_next = bd->rd_tail_next;
176 1874070 : fd_vinyl_io_bd_rd_t * rd_next = rd->next;
177 :
178 1874070 : bd->rd_head = rd_next;
179 1874070 : bd->rd_tail_next = fd_ptr_if( !!rd_next, rd_tail_next, &bd->rd_head );
180 :
181 1874070 : *_rd = (fd_vinyl_io_rd_t *)rd;
182 1874070 : return FD_VINYL_SUCCESS;
183 3748140 : }
184 :
185 : static ulong
186 : fd_vinyl_io_bd_append( fd_vinyl_io_t * io,
187 : void const * _src,
188 375105 : ulong sz ) {
189 375105 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
190 375105 : uchar const * src = (uchar const *)_src;
191 :
192 : /* Validate the input args. */
193 :
194 375105 : ulong seq_future = bd->base->seq_future; if( FD_UNLIKELY( !sz ) ) return seq_future;
195 375105 : ulong seq_ancient = bd->base->seq_ancient;
196 375105 : ulong dev_base = bd->dev_base;
197 375105 : ulong dev_sz = bd->dev_sz;
198 :
199 375105 : int bad_src = !src;
200 375105 : int bad_align = !fd_ulong_is_aligned( (ulong)src, FD_VINYL_BSTREAM_BLOCK_SZ );
201 375105 : int bad_sz = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
202 375105 : int bad_capacity = sz > (dev_sz - (seq_future-seq_ancient));
203 :
204 375105 : if( FD_UNLIKELY( bad_src | bad_align | bad_sz | bad_capacity ) )
205 0 : FD_LOG_CRIT(( bad_src ? "NULL src" :
206 375105 : bad_align ? "misaligned src" :
207 375105 : bad_sz ? "misaligned sz" :
208 375105 : "device full" ));
209 :
210 : /* At this point, we appear to have a valid append request. Map it to
211 : the bstream (updating seq_future) and map it to the device. Then
212 : write the lesser of sz bytes or until the store end. If we hit the
213 : store end with more to go, wrap around and finish the write at the
214 : store start. */
215 :
216 375105 : ulong seq = seq_future;
217 375105 : bd->base->seq_future = seq + sz;
218 :
219 375105 : ulong dev_off = seq % dev_sz;
220 :
221 375105 : ulong wsz = fd_ulong_min( sz, dev_sz - dev_off );
222 375105 : bd_write( bd, dev_base + dev_off, src, wsz );
223 375105 : sz -= wsz;
224 375105 : if( sz ) bd_write( bd, dev_base, src + wsz, sz );
225 :
226 375105 : return seq;
227 375105 : }
228 :
229 : static int
230 : fd_vinyl_io_bd_commit( fd_vinyl_io_t * io,
231 374502 : int flags ) {
232 374502 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
233 374502 : (void)flags;
234 :
235 374502 : bd->base->seq_present = bd->base->seq_future;
236 374502 : bd->base->spad_used = 0UL;
237 :
238 374502 : return FD_VINYL_SUCCESS;
239 374502 : }
240 :
241 : static ulong
242 : fd_vinyl_io_bd_hint( fd_vinyl_io_t * io,
243 376722 : ulong sz ) {
244 376722 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
245 :
246 376722 : ulong seq_future = bd->base->seq_future; if( FD_UNLIKELY( !sz ) ) return seq_future;
247 376722 : ulong seq_ancient = bd->base->seq_ancient;
248 376722 : ulong dev_sz = bd->dev_sz;
249 :
250 376722 : int bad_sz = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
251 376722 : int bad_capacity = sz > (dev_sz - (seq_future-seq_ancient));
252 :
253 376722 : if( FD_UNLIKELY( bad_sz | bad_capacity ) ) FD_LOG_CRIT(( bad_sz ? "misaligned sz" : "device full" ));
254 :
255 376722 : return bd->base->seq_future;
256 376722 : }
257 :
258 : static void *
259 : fd_vinyl_io_bd_alloc( fd_vinyl_io_t * io,
260 : ulong sz,
261 1482 : int flags ) {
262 1482 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
263 :
264 1482 : ulong spad_max = bd->base->spad_max;
265 1482 : ulong spad_used = bd->base->spad_used; if( FD_UNLIKELY( !sz ) ) return ((uchar *)(bd+1)) + spad_used;
266 :
267 1482 : int bad_align = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
268 1482 : int bad_sz = sz > spad_max;
269 :
270 1482 : if( FD_UNLIKELY( bad_align | bad_sz ) ) FD_LOG_CRIT(( bad_align ? "misaligned sz" : "sz too large" ));
271 :
272 1482 : if( FD_UNLIKELY( sz > (spad_max - spad_used ) ) ) {
273 0 : if( FD_UNLIKELY( fd_vinyl_io_bd_commit( io, flags ) ) ) return NULL;
274 0 : spad_used = 0UL;
275 0 : }
276 :
277 1482 : bd->base->spad_used = spad_used + sz;
278 :
279 1482 : return ((uchar *)(bd+1)) + spad_used;
280 1482 : }
281 :
282 : static ulong
283 : fd_vinyl_io_bd_copy( fd_vinyl_io_t * io,
284 : ulong seq_src0,
285 376149 : ulong sz ) {
286 376149 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
287 :
288 : /* Validate the input args */
289 :
290 376149 : ulong seq_ancient = bd->base->seq_ancient;
291 376149 : ulong seq_past = bd->base->seq_past;
292 376149 : ulong seq_present = bd->base->seq_present;
293 376149 : ulong seq_future = bd->base->seq_future; if( FD_UNLIKELY( !sz ) ) return seq_future;
294 339045 : ulong spad_max = bd->base->spad_max;
295 339045 : ulong spad_used = bd->base->spad_used;
296 339045 : ulong dev_base = bd->dev_base;
297 339045 : ulong dev_sz = bd->dev_sz;
298 :
299 339045 : ulong seq_src1 = seq_src0 + sz;
300 :
301 339045 : int bad_past = !( fd_vinyl_seq_le( seq_past, seq_src0 ) &
302 339045 : fd_vinyl_seq_lt( seq_src0, seq_src1 ) &
303 339045 : fd_vinyl_seq_le( seq_src1, seq_present ) );
304 339045 : int bad_src = !fd_ulong_is_aligned( seq_src0, FD_VINYL_BSTREAM_BLOCK_SZ );
305 339045 : int bad_sz = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
306 339045 : int bad_capacity = sz > (dev_sz - (seq_future-seq_ancient));
307 :
308 339045 : if( FD_UNLIKELY( bad_past | bad_src | bad_sz | bad_capacity ) )
309 0 : FD_LOG_CRIT(( bad_past ? "src is not in the past" :
310 339045 : bad_src ? "misaligned src_seq" :
311 339045 : bad_sz ? "misaligned sz" :
312 339045 : "device full" ));
313 :
314 : /* At this point, we appear to have a valid copy request. Get
315 : buffer space from the scratch pad (committing as necessary). */
316 :
317 339045 : if( FD_UNLIKELY( sz>(spad_max-spad_used) ) ) {
318 0 : fd_vinyl_io_bd_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
319 0 : spad_used = 0UL;
320 0 : }
321 :
322 339045 : uchar * buf = (uchar *)(bd+1) + spad_used;
323 339045 : ulong buf_max = spad_max - spad_used;
324 :
325 : /* Map the dst to the bstream (updating seq_future) and map the src
326 : and dst regions onto the device. Then copy as much as we can at a
327 : time, handling device wrap around and copy buffering space. */
328 :
329 339045 : ulong seq = seq_future;
330 339045 : bd->base->seq_future = seq + sz;
331 :
332 339045 : ulong seq_dst0 = seq;
333 :
334 339048 : for(;;) {
335 339048 : ulong src_off = seq_src0 % dev_sz;
336 339048 : ulong dst_off = seq_dst0 % dev_sz;
337 339048 : ulong csz = fd_ulong_min( fd_ulong_min( sz, buf_max ), fd_ulong_min( dev_sz - src_off, dev_sz - dst_off ) );
338 :
339 339048 : bd_read ( bd, dev_base + src_off, buf, csz );
340 339048 : bd_write( bd, dev_base + dst_off, buf, csz );
341 :
342 339048 : sz -= csz;
343 339048 : if( !sz ) break;
344 :
345 3 : seq_src0 += csz;
346 3 : seq_dst0 += csz;
347 3 : }
348 :
349 339045 : return seq;
350 339045 : }
351 :
352 : static void
353 : fd_vinyl_io_bd_forget( fd_vinyl_io_t * io,
354 131763 : ulong seq ) {
355 131763 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
356 :
357 : /* Validate input arguments. Note that we don't allow forgetting into
358 : the future even when we have no uncommitted blocks because the
359 : resulting [seq_ancient,seq_future) might contain blocks that were
360 : never written (which might not be an issue practically but it would
361 : be a bit strange for something to try to scan starting from
362 : seq_ancient and discover unwritten blocks). */
363 :
364 131763 : ulong seq_past = bd->base->seq_past;
365 131763 : ulong seq_present = bd->base->seq_present;
366 131763 : ulong seq_future = bd->base->seq_future;
367 :
368 131763 : int bad_seq = !fd_ulong_is_aligned( seq, FD_VINYL_BSTREAM_BLOCK_SZ );
369 131763 : int bad_dir = !(fd_vinyl_seq_le( seq_past, seq ) & fd_vinyl_seq_le( seq, seq_present ));
370 131763 : int bad_read = !!bd->rd_head;
371 131763 : int bad_append = fd_vinyl_seq_ne( seq_present, seq_future );
372 :
373 131763 : if( FD_UNLIKELY( bad_seq | bad_dir | bad_read | bad_append ) )
374 0 : FD_LOG_CRIT(( "forget to seq %016lx failed (past [%016lx,%016lx)/%lu, %s)",
375 131763 : seq, seq_past, seq_present, seq_present-seq_past,
376 131763 : bad_seq ? "misaligned seq" :
377 131763 : bad_dir ? "seq out of bounds" :
378 131763 : bad_read ? "reads in progress" :
379 131763 : "appends/copies in progress" ));
380 :
381 131763 : bd->base->seq_past = seq;
382 131763 : }
383 :
384 : static void
385 : fd_vinyl_io_bd_rewind( fd_vinyl_io_t * io,
386 106734 : ulong seq ) {
387 106734 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
388 :
389 : /* Validate input argments. Unlike forgot, we do allow rewinding to
390 : before seq_ancient as the region of sequence space reported to the
391 : caller as written is still accurate. */
392 :
393 106734 : ulong seq_ancient = bd->base->seq_ancient;
394 106734 : ulong seq_past = bd->base->seq_past;
395 106734 : ulong seq_present = bd->base->seq_present;
396 106734 : ulong seq_future = bd->base->seq_future;
397 :
398 106734 : int bad_seq = !fd_ulong_is_aligned( seq, FD_VINYL_BSTREAM_BLOCK_SZ );
399 106734 : int bad_dir = fd_vinyl_seq_gt( seq, seq_present );
400 106734 : int bad_read = !!bd->rd_head;
401 106734 : int bad_append = fd_vinyl_seq_ne( seq_present, seq_future );
402 :
403 106734 : if( FD_UNLIKELY( bad_seq | bad_dir | bad_read | bad_append ) )
404 0 : FD_LOG_CRIT(( "rewind to seq %016lx failed (present %016lx, %s)", seq, seq_present,
405 106734 : bad_seq ? "misaligned seq" :
406 106734 : bad_dir ? "seq after seq_present" :
407 106734 : bad_read ? "reads in progress" :
408 106734 : "appends/copies in progress" ));
409 :
410 106734 : bd->base->seq_ancient = fd_ulong_if( fd_vinyl_seq_ge( seq, seq_ancient ), seq_ancient, seq );
411 106734 : bd->base->seq_past = fd_ulong_if( fd_vinyl_seq_ge( seq, seq_past ), seq_past, seq );
412 106734 : bd->base->seq_present = seq;
413 106734 : bd->base->seq_future = seq;
414 106734 : }
415 :
416 : static int
417 : fd_vinyl_io_bd_sync( fd_vinyl_io_t * io,
418 374163 : int flags ) {
419 374163 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
420 374163 : (void)flags;
421 :
422 374163 : ulong seed = bd->base->seed;
423 374163 : ulong seq_past = bd->base->seq_past;
424 374163 : ulong seq_present = bd->base->seq_present;
425 :
426 374163 : ulong dev_sync = bd->dev_sync;
427 :
428 374163 : fd_vinyl_bstream_block_t * block = bd->sync;
429 :
430 : /* block->sync.ctl current (static) */
431 374163 : block->sync.seq_past = seq_past;
432 374163 : block->sync.seq_present = seq_present;
433 : /* block->sync.info_sz current (static) */
434 : /* block->sync.info current (static) */
435 :
436 374163 : block->sync.hash_trail = 0UL;
437 374163 : block->sync.hash_blocks = 0UL;
438 374163 : fd_vinyl_bstream_block_hash( seed, block ); /* sets hash_trail back to seed */
439 :
440 374163 : bd_write( bd, dev_sync, block, FD_VINYL_BSTREAM_BLOCK_SZ );
441 :
442 374163 : bd->base->seq_ancient = seq_past;
443 :
444 374163 : return FD_VINYL_SUCCESS;
445 374163 : }
446 :
447 : static void *
448 6 : fd_vinyl_io_bd_fini( fd_vinyl_io_t * io ) {
449 6 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
450 :
451 6 : ulong seq_present = bd->base->seq_present;
452 6 : ulong seq_future = bd->base->seq_future;
453 :
454 6 : if( FD_UNLIKELY( bd->rd_head ) ) FD_LOG_WARNING(( "fini completing outstanding reads" ));
455 6 : if( FD_UNLIKELY( fd_vinyl_seq_ne( seq_present, seq_future ) ) ) FD_LOG_WARNING(( "fini discarding uncommited blocks" ));
456 :
457 6 : return io;
458 6 : }
459 :
460 : static fd_vinyl_io_impl_t fd_vinyl_io_bd_impl[1] = { {
461 : fd_vinyl_io_bd_read_imm,
462 : fd_vinyl_io_bd_read,
463 : fd_vinyl_io_bd_poll,
464 : fd_vinyl_io_bd_append,
465 : fd_vinyl_io_bd_commit,
466 : fd_vinyl_io_bd_hint,
467 : fd_vinyl_io_bd_alloc,
468 : fd_vinyl_io_bd_copy,
469 : fd_vinyl_io_bd_forget,
470 : fd_vinyl_io_bd_rewind,
471 : fd_vinyl_io_bd_sync,
472 : fd_vinyl_io_bd_fini
473 : } };
474 :
475 : FD_STATIC_ASSERT( alignof(fd_vinyl_io_bd_t)==FD_VINYL_BSTREAM_BLOCK_SZ, layout );
476 :
477 : ulong
478 39 : fd_vinyl_io_bd_align( void ) {
479 39 : return alignof(fd_vinyl_io_bd_t);
480 39 : }
481 :
482 : ulong
483 39 : fd_vinyl_io_bd_footprint( ulong spad_max ) {
484 39 : if( FD_UNLIKELY( !((0UL<spad_max) & (spad_max<(1UL<<63)) & fd_ulong_is_aligned( spad_max, FD_VINYL_BSTREAM_BLOCK_SZ )) ) )
485 12 : return 0UL;
486 27 : return sizeof(fd_vinyl_io_bd_t) + spad_max;
487 39 : }
488 :
489 : fd_vinyl_io_t *
490 : fd_vinyl_io_bd_init( void * mem,
491 : ulong spad_max,
492 : int dev_fd,
493 : int reset,
494 : void const * info,
495 : ulong info_sz,
496 39 : ulong io_seed ) {
497 39 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)mem;
498 :
499 39 : if( FD_UNLIKELY( !bd ) ) {
500 3 : FD_LOG_WARNING(( "NULL mem" ));
501 3 : return NULL;
502 3 : }
503 :
504 36 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)bd, fd_vinyl_io_bd_align() ) ) ) {
505 3 : FD_LOG_WARNING(( "misaligned mem" ));
506 3 : return NULL;
507 3 : }
508 :
509 33 : ulong footprint = fd_vinyl_io_bd_footprint( spad_max );
510 33 : if( FD_UNLIKELY( !footprint ) ) {
511 9 : FD_LOG_WARNING(( "bad spad_max" ));
512 9 : return NULL;
513 9 : }
514 :
515 24 : off_t _dev_sz = lseek( dev_fd, (off_t)0, SEEK_END );
516 24 : if( FD_UNLIKELY( _dev_sz<(off_t)0 ) ) {
517 6 : FD_LOG_WARNING(( "lseek failed, bstream must be seekable (%i-%s)", errno, fd_io_strerror( errno ) ));
518 6 : return NULL;
519 6 : }
520 18 : ulong dev_sz = (ulong)_dev_sz;
521 :
522 18 : ulong dev_sz_min = 3UL*FD_VINYL_BSTREAM_BLOCK_SZ /* sync block, move block, closing partition */
523 18 : + fd_vinyl_bstream_pair_sz( FD_VINYL_VAL_MAX ); /* worst case pair (FIXME: LZ4_COMPRESSBOUND?) */
524 :
525 18 : int too_small = dev_sz < dev_sz_min;
526 18 : int too_large = dev_sz > (ulong)LONG_MAX;
527 18 : int misaligned = !fd_ulong_is_aligned( dev_sz, FD_VINYL_BSTREAM_BLOCK_SZ );
528 :
529 18 : if( FD_UNLIKELY( too_small | too_large | misaligned ) ) {
530 6 : FD_LOG_WARNING(( "bstream size %s", too_small ? "too small" :
531 6 : too_large ? "too large" :
532 6 : "not a block size multiple" ));
533 6 : return NULL;
534 6 : }
535 :
536 12 : if( reset ) {
537 6 : if( FD_UNLIKELY( !info ) ) info_sz = 0UL;
538 6 : if( FD_UNLIKELY( info_sz>FD_VINYL_BSTREAM_SYNC_INFO_MAX ) ) {
539 3 : FD_LOG_WARNING(( "info_sz too large" ));
540 3 : return NULL;
541 3 : }
542 6 : }
543 :
544 9 : memset( bd, 0, footprint );
545 :
546 9 : bd->base->type = FD_VINYL_IO_TYPE_BD;
547 :
548 : /* io_seed, seq_ancient, seq_past, seq_present, seq_future are init
549 : below */
550 :
551 9 : bd->base->spad_max = spad_max;
552 9 : bd->base->spad_used = 0UL;
553 9 : bd->base->impl = fd_vinyl_io_bd_impl;
554 :
555 9 : bd->dev_fd = dev_fd;
556 9 : bd->dev_sync = 0UL; /* Use the beginning of the file for the sync block */
557 9 : bd->dev_base = FD_VINYL_BSTREAM_BLOCK_SZ; /* Use the rest for the actual bstream store (at least 3.5 KiB) */
558 9 : bd->dev_sz = dev_sz - FD_VINYL_BSTREAM_BLOCK_SZ;
559 :
560 9 : bd->rd_head = NULL;
561 9 : bd->rd_tail_next = &bd->rd_head;
562 :
563 : /* Note that [seq_ancient,seq_future) (cyclic) contains at most dev_sz
564 : bytes, bstream's antiquity, past and present are subsets of this
565 : range and dev_sz is less than 2^63 given the above (practically
566 : much much less). As such, differences between two ordered bstream
567 : sequence numbers (e.g. ulong sz = seq_a - seq_b where a is
568 : logically not before b) will "just work" regardless of wrapping
569 : and/or amount of data stored. */
570 :
571 : /* FIXME: Consider having the sync block on a completely separate
572 : device (to reduce seeking when syncing). */
573 :
574 9 : fd_vinyl_bstream_block_t * block = bd->sync;
575 :
576 9 : if( reset ) {
577 :
578 : /* We are starting a new bstream. Write the initial sync block. */
579 :
580 3 : bd->base->seed = io_seed;
581 3 : bd->base->seq_ancient = 0UL;
582 3 : bd->base->seq_past = 0UL;
583 3 : bd->base->seq_present = 0UL;
584 3 : bd->base->seq_future = 0UL;
585 :
586 3 : memset( block, 0, FD_VINYL_BSTREAM_BLOCK_SZ ); /* bulk zero */
587 :
588 3 : block->sync.ctl = fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_SYNC, 0, FD_VINYL_VAL_MAX );
589 : //block->sync.seq_past = ...; /* init by sync */
590 : //block->sync.seq_present = ...; /* init by sync */
591 3 : block->sync.info_sz = info_sz;
592 3 : if( info_sz ) memcpy( block->sync.info, info, info_sz );
593 : //block->sync.hash_trail = ...; /* init by sync */
594 : //block->sync.hash_blocks = ...; /* init by sync */
595 :
596 3 : int err = fd_vinyl_io_bd_sync( bd->base, FD_VINYL_IO_FLAG_BLOCKING ); /* logs details */
597 3 : if( FD_UNLIKELY( err ) ) {
598 0 : FD_LOG_WARNING(( "sync block write failed (%i-%s)", err, fd_vinyl_strerror( err ) ));
599 0 : return NULL;
600 0 : }
601 :
602 6 : } else {
603 :
604 : /* We are resuming an existing bstream. Read and validate the
605 : bstream's sync block. */
606 :
607 6 : bd_read( bd, bd->dev_sync, block, FD_VINYL_BSTREAM_BLOCK_SZ ); /* logs details */
608 :
609 6 : int type = fd_vinyl_bstream_ctl_type ( block->sync.ctl );
610 6 : int version = fd_vinyl_bstream_ctl_style( block->sync.ctl );
611 6 : ulong val_max = fd_vinyl_bstream_ctl_sz ( block->sync.ctl );
612 6 : ulong seq_past = block->sync.seq_past;
613 6 : ulong seq_present = block->sync.seq_present;
614 6 : /**/ info_sz = block->sync.info_sz; // overrides user info_sz
615 6 : /**/ info = block->sync.info; // overrides user info
616 6 : /**/ io_seed = block->sync.hash_trail; // overrides user io_seed
617 :
618 6 : int bad_type = (type != FD_VINYL_BSTREAM_CTL_TYPE_SYNC);
619 6 : int bad_version = (version != 0);
620 6 : int bad_val_max = (val_max != FD_VINYL_VAL_MAX);
621 6 : int bad_seq_past = !fd_ulong_is_aligned( seq_past, FD_VINYL_BSTREAM_BLOCK_SZ );
622 6 : int bad_seq_present = !fd_ulong_is_aligned( seq_present, FD_VINYL_BSTREAM_BLOCK_SZ );
623 6 : int bad_info_sz = (info_sz > FD_VINYL_BSTREAM_SYNC_INFO_MAX);
624 6 : int bad_past_order = fd_vinyl_seq_gt( seq_past, seq_present );
625 6 : int bad_past_sz = ((seq_present-seq_past) > bd->dev_sz);
626 :
627 6 : if( FD_UNLIKELY( bad_type | bad_version | bad_val_max | bad_seq_past | bad_seq_present | bad_info_sz |
628 6 : bad_past_order | bad_past_sz ) ) {
629 3 : FD_LOG_WARNING(( "bad sync block when recovering bstream (%s)",
630 3 : bad_type ? "unexpected type" :
631 3 : bad_version ? "unexpected version" :
632 3 : bad_val_max ? "unexpected max pair value decoded byte size" :
633 3 : bad_seq_past ? "unaligned seq_past" :
634 3 : bad_seq_present ? "unaligned seq_present" :
635 3 : bad_info_sz ? "unexpected info size" :
636 3 : bad_past_order ? "unordered seq_past and seq_present" :
637 3 : "past size larger than bstream store" ));
638 3 : return NULL;
639 3 : }
640 :
641 3 : if( FD_UNLIKELY( fd_vinyl_bstream_block_test( io_seed, block ) ) ) {
642 0 : FD_LOG_WARNING(( "corrupt sync block when recovering bstream" ));
643 0 : return NULL;
644 0 : }
645 :
646 3 : bd->base->seed = io_seed;
647 3 : bd->base->seq_ancient = seq_past;
648 3 : bd->base->seq_past = seq_past;
649 3 : bd->base->seq_present = seq_present;
650 3 : bd->base->seq_future = seq_present;
651 :
652 3 : }
653 :
654 6 : FD_LOG_INFO(( "IO config"
655 6 : "\n\ttype bd"
656 6 : "\n\tspad_max %lu bytes"
657 6 : "\n\tdev_sz %lu bytes"
658 6 : "\n\treset %i"
659 6 : "\n\tinfo \"%s\" (info_sz %lu%s)"
660 6 : "\n\tio_seed 0x%016lx%s",
661 6 : spad_max, dev_sz, reset,
662 6 : info ? (char const *)info : "", info_sz, reset ? "" : ", discovered",
663 6 : io_seed, reset ? "" : " (discovered)" ));
664 :
665 6 : return bd->base;
666 9 : }
|