Line data Source code
1 : #define _GNU_SOURCE
2 : #include "fd_vinyl_io_ur_private.h"
3 :
4 : #include <errno.h>
5 : #include <linux/io_uring.h>
6 :
7 : /* fd_vinyl_io_ur_fini is identical to fd_vinyl_io_bd_fini. */
8 :
9 : static void *
10 0 : fd_vinyl_io_ur_fini( fd_vinyl_io_t * io ) {
11 0 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
12 :
13 0 : ulong seq_present = ur->base->seq_present;
14 0 : ulong seq_future = ur->base->seq_future;
15 :
16 0 : if( FD_UNLIKELY( ur->rq_head ) ) FD_LOG_WARNING(( "fini completing outstanding reads" ));
17 0 : if( FD_UNLIKELY( ur->rc_head ) ) FD_LOG_WARNING(( "fini completing outstanding reads" ));
18 0 : if( FD_UNLIKELY( fd_vinyl_seq_ne( seq_present, seq_future ) ) ) FD_LOG_WARNING(( "fini discarding uncommited blocks" ));
19 :
20 0 : return io;
21 0 : }
22 :
23 : static fd_vinyl_io_impl_t fd_vinyl_io_ur_impl[1] = { {
24 : fd_vinyl_io_ur_read_imm, /* rd */
25 : fd_vinyl_io_ur_read, /* rd */
26 : fd_vinyl_io_ur_poll, /* rd */
27 : fd_vinyl_io_ur_append, /* wb */
28 : fd_vinyl_io_ur_commit, /* wb */
29 : fd_vinyl_io_ur_hint, /* wb */
30 : fd_vinyl_io_ur_alloc, /* wb */
31 : fd_vinyl_io_ur_copy, /* wb */
32 : fd_vinyl_io_ur_forget, /* wb */
33 : fd_vinyl_io_ur_rewind, /* wb */
34 : fd_vinyl_io_ur_sync, /* wb */
35 : fd_vinyl_io_ur_fini
36 : } };
37 :
38 : FD_STATIC_ASSERT( alignof(fd_vinyl_io_ur_t)==FD_VINYL_BSTREAM_BLOCK_SZ, layout );
39 :
40 : ulong
41 0 : fd_vinyl_io_ur_align( void ) {
42 0 : return alignof(fd_vinyl_io_ur_t);
43 0 : }
44 :
45 : ulong
46 0 : fd_vinyl_io_ur_footprint( ulong spad_max ) {
47 0 : if( FD_UNLIKELY( !((0UL<spad_max) & (spad_max<(1UL<<63)) & fd_ulong_is_aligned( spad_max, FD_VINYL_BSTREAM_BLOCK_SZ )) ) )
48 0 : return 0UL;
49 0 : return sizeof(fd_vinyl_io_ur_t) + spad_max;
50 0 : }
51 :
52 : fd_vinyl_io_t *
53 : fd_vinyl_io_ur_init( void * mem,
54 : ulong spad_max,
55 : int dev_fd,
56 0 : fd_io_uring_t * ring ) {
57 0 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)mem;
58 :
59 0 : if( FD_UNLIKELY( !ur ) ) {
60 0 : FD_LOG_WARNING(( "NULL mem" ));
61 0 : return NULL;
62 0 : }
63 :
64 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)ur, fd_vinyl_io_ur_align() ) ) ) {
65 0 : FD_LOG_WARNING(( "misaligned mem" ));
66 0 : return NULL;
67 0 : }
68 :
69 0 : ulong footprint = fd_vinyl_io_ur_footprint( spad_max );
70 0 : if( FD_UNLIKELY( !footprint ) ) {
71 0 : FD_LOG_WARNING(( "bad spad_max" ));
72 0 : return NULL;
73 0 : }
74 :
75 0 : off_t _dev_sz = lseek( dev_fd, (off_t)0, SEEK_END );
76 0 : if( FD_UNLIKELY( _dev_sz<(off_t)0 ) ) {
77 0 : FD_LOG_WARNING(( "lseek failed, bstream must be seekable (%i-%s)", errno, fd_io_strerror( errno ) ));
78 0 : return NULL;
79 0 : }
80 0 : ulong dev_sz = (ulong)_dev_sz;
81 :
82 0 : ulong dev_sz_min = 3UL*FD_VINYL_BSTREAM_BLOCK_SZ /* sync block, move block, closing partition */
83 0 : + fd_vinyl_bstream_pair_sz( FD_VINYL_VAL_MAX ); /* worst case pair (FIXME: LZ4_COMPRESSBOUND?) */
84 :
85 0 : int too_small = dev_sz < dev_sz_min;
86 0 : int too_large = dev_sz > (ulong)LONG_MAX;
87 0 : int misaligned = !fd_ulong_is_aligned( dev_sz, FD_VINYL_BSTREAM_BLOCK_SZ );
88 :
89 0 : if( FD_UNLIKELY( too_small | too_large | misaligned ) ) {
90 0 : FD_LOG_WARNING(( "bstream size %s", too_small ? "too small" :
91 0 : too_large ? "too large" :
92 0 : "not a block size multiple" ));
93 0 : return NULL;
94 0 : }
95 :
96 0 : memset( ur, 0, footprint );
97 :
98 0 : ur->base->type = FD_VINYL_IO_TYPE_UR;
99 :
100 : /* io_seed, seq_ancient, seq_past, seq_present, seq_future are init
101 : below */
102 :
103 0 : ur->base->spad_max = spad_max;
104 0 : ur->base->spad_used = 0UL; /* unused */
105 0 : ur->base->impl = fd_vinyl_io_ur_impl;
106 :
107 0 : ur->dev_fd = dev_fd;
108 0 : ur->dev_sync = 0UL; /* Use the beginning of the file for the sync block */
109 0 : ur->dev_base = FD_VINYL_BSTREAM_BLOCK_SZ; /* Use the rest for the actual bstream store (at least 3.5 KiB) */
110 0 : ur->dev_sz = dev_sz - FD_VINYL_BSTREAM_BLOCK_SZ;
111 :
112 0 : ur->rq_head = NULL;
113 0 : ur->rq_tail_next = &ur->rq_head;
114 :
115 0 : ur->rc_head = NULL;
116 0 : ur->rc_tail_next = &ur->rc_head;
117 :
118 0 : ur->ring = ring;
119 :
120 : /* FIXME: Consider having the sync block on a completely separate
121 : device (to reduce seeking when syncing). */
122 :
123 0 : fd_vinyl_bstream_block_t * block = ur->sync;
124 :
125 0 : bd_read( dev_fd, ur->dev_sync, block, FD_VINYL_BSTREAM_BLOCK_SZ ); /* logs details */
126 :
127 0 : int type = fd_vinyl_bstream_ctl_type ( block->sync.ctl );
128 0 : int version = fd_vinyl_bstream_ctl_style( block->sync.ctl );
129 0 : ulong val_max = fd_vinyl_bstream_ctl_sz ( block->sync.ctl );
130 0 : ulong seq_past = block->sync.seq_past;
131 0 : ulong seq_present = block->sync.seq_present;
132 0 : ulong info_sz = block->sync.info_sz; // overrides user info_sz
133 0 : void const * info = block->sync.info; // overrides user info
134 0 : ulong io_seed = block->sync.hash_trail; // overrides user io_seed
135 :
136 0 : int bad_type = (type != FD_VINYL_BSTREAM_CTL_TYPE_SYNC);
137 0 : int bad_version = (version != 0);
138 0 : int bad_val_max = (val_max != FD_VINYL_VAL_MAX);
139 0 : int bad_seq_past = !fd_ulong_is_aligned( seq_past, FD_VINYL_BSTREAM_BLOCK_SZ );
140 0 : int bad_seq_present = !fd_ulong_is_aligned( seq_present, FD_VINYL_BSTREAM_BLOCK_SZ );
141 0 : int bad_info_sz = (info_sz > FD_VINYL_BSTREAM_SYNC_INFO_MAX);
142 0 : int bad_past_order = fd_vinyl_seq_gt( seq_past, seq_present );
143 0 : int bad_past_sz = ((seq_present-seq_past) > ur->dev_sz);
144 :
145 0 : if( FD_UNLIKELY( bad_type | bad_version | bad_val_max | bad_seq_past | bad_seq_present | bad_info_sz |
146 0 : bad_past_order | bad_past_sz ) ) {
147 0 : FD_LOG_WARNING(( "bad sync block when recovering bstream (%s)",
148 0 : bad_type ? "unexpected type" :
149 0 : bad_version ? "unexpected version" :
150 0 : bad_val_max ? "unexpected max pair value decoded byte size" :
151 0 : bad_seq_past ? "unaligned seq_past" :
152 0 : bad_seq_present ? "unaligned seq_present" :
153 0 : bad_info_sz ? "unexpected info size" :
154 0 : bad_past_order ? "unordered seq_past and seq_present" :
155 0 : "past size larger than bstream store" ));
156 0 : return NULL;
157 0 : }
158 :
159 0 : if( FD_UNLIKELY( fd_vinyl_bstream_block_test( io_seed, block ) ) ) {
160 0 : FD_LOG_WARNING(( "corrupt sync block when recovering bstream" ));
161 0 : return NULL;
162 0 : }
163 :
164 0 : ur->base->seed = io_seed;
165 0 : ur->base->seq_ancient = seq_past;
166 0 : ur->base->seq_past = seq_past;
167 0 : ur->base->seq_present = seq_present;
168 0 : ur->base->seq_future = seq_present;
169 0 : ur-> seq_cache = seq_present;
170 0 : ur-> seq_clean = seq_present;
171 0 : ur-> seq_write = seq_present;
172 :
173 0 : wb_ring_init( &ur->wb, seq_present, spad_max );
174 0 : wq_ring_init( &ur->wq, seq_present, WQ_DEPTH );
175 :
176 0 : FD_LOG_INFO(( "IO config"
177 0 : "\n\ttype ur"
178 0 : "\n\tspad_max %lu bytes"
179 0 : "\n\tdev_sz %lu bytes"
180 0 : "\n\tinfo \"%s\" (info_sz %lu, discovered)"
181 0 : "\n\tio_seed 0x%016lx (discovered)"
182 0 : "\n\tsq depth %u entries",
183 0 : spad_max, dev_sz,
184 0 : (char const *)info, info_sz,
185 0 : io_seed,
186 0 : ring->sq->depth ));
187 :
188 0 : return ur->base;
189 0 : }
|