Line data Source code
1 : #include "fd_vinyl_io.h"
2 :
3 : #include <errno.h>
4 : #include <unistd.h>
5 :
6 : static inline void
7 : bd_read( int fd,
8 : ulong off,
9 : void * buf,
10 3292437 : ulong sz ) {
11 3292437 : ssize_t ssz = pread( fd, buf, sz, (off_t)off );
12 3292437 : if( FD_LIKELY( ssz==(ssize_t)sz ) ) return;
13 0 : if( ssz<(ssize_t)0 ) FD_LOG_CRIT(( "pread(fd %i,off %lu,sz %lu) failed (%i-%s)", fd, off, sz, errno, fd_io_strerror( errno ) ));
14 0 : /**/ FD_LOG_CRIT(( "pread(fd %i,off %lu,sz %lu) failed (unexpected sz %li)", fd, off, sz, (long)ssz ));
15 0 : }
16 :
17 : static inline void
18 : bd_write( int fd,
19 : ulong off,
20 : void const * buf,
21 1079595 : ulong sz ) {
22 1079595 : ssize_t ssz = pwrite( fd, buf, sz, (off_t)off );
23 1079595 : if( FD_LIKELY( ssz==(ssize_t)sz ) ) return;
24 0 : if( ssz<(ssize_t)0 ) FD_LOG_CRIT(( "pwrite(fd %i,off %lu,sz %lu) failed (%i-%s)", fd, off, sz, errno, fd_io_strerror( errno ) ));
25 0 : else FD_LOG_CRIT(( "pwrite(fd %i,off %lu,sz %lu) failed (unexpected sz %li)", fd, off, sz, (long)ssz ));
26 0 : }
27 :
28 : struct fd_vinyl_io_bd_rd;
29 : typedef struct fd_vinyl_io_bd_rd fd_vinyl_io_bd_rd_t;
30 :
31 : struct fd_vinyl_io_bd_rd {
32 : ulong ctx; /* Must mirror fd_vinyl_io_rd_t */
33 : ulong seq; /* " */
34 : void * dst; /* " */
35 : ulong sz; /* " */
36 : fd_vinyl_io_bd_rd_t * next; /* Next element in bd rd queue */
37 : };
38 :
39 : struct fd_vinyl_io_bd {
40 : fd_vinyl_io_t base[1];
41 : int dev_fd; /* File descriptor of block device */
42 : ulong dev_sync; /* Offset to block that holds bstream sync (BLOCK_SZ multiple) */
43 : ulong dev_base; /* Offset to first block (BLOCK_SZ multiple) */
44 : ulong dev_sz; /* Block store byte size (BLOCK_SZ multiple) */
45 : fd_vinyl_io_bd_rd_t * rd_head; /* Pointer to queue head */
46 : fd_vinyl_io_bd_rd_t ** rd_tail_next; /* Pointer to queue &tail->next or &rd_head if empty. */
47 : fd_vinyl_bstream_block_t sync[1];
48 : /* spad_max bytes follow */
49 : };
50 :
51 : typedef struct fd_vinyl_io_bd fd_vinyl_io_bd_t;
52 :
53 : static void
54 : fd_vinyl_io_bd_read_imm( fd_vinyl_io_t * io,
55 : ulong seq0,
56 : void * _dst,
57 1500474 : ulong sz ) {
58 1500474 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
59 :
60 : /* If this is a request to read nothing, succeed immediately. If
61 : this is a request to read outside the bstream's past, fail. */
62 :
63 1500474 : if( FD_UNLIKELY( !sz ) ) return;
64 :
65 1316775 : uchar * dst = (uchar *)_dst;
66 1316775 : ulong seq1 = seq0 + sz;
67 :
68 1316775 : ulong seq_past = bd->base->seq_past;
69 1316775 : ulong seq_present = bd->base->seq_present;
70 :
71 1316775 : int bad_seq = !fd_ulong_is_aligned( seq0, FD_VINYL_BSTREAM_BLOCK_SZ );
72 1316775 : int bad_dst = (!fd_ulong_is_aligned( (ulong)dst, FD_VINYL_BSTREAM_BLOCK_SZ )) | !dst;
73 1316775 : int bad_sz = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
74 1316775 : int bad_past = !(fd_vinyl_seq_le( seq_past, seq0 ) & fd_vinyl_seq_lt( seq0, seq1 ) & fd_vinyl_seq_le( seq1, seq_present ));
75 :
76 1316775 : if( FD_UNLIKELY( bad_seq | bad_dst | bad_sz | bad_past ) )
77 0 : FD_LOG_CRIT(( "bstream read_imm [%016lx,%016lx)/%lu failed (past [%016lx,%016lx)/%lu, %s)",
78 1316775 : seq0, seq1, sz, seq_past, seq_present, seq_present-seq_past,
79 1316775 : bad_seq ? "misaligned seq" :
80 1316775 : bad_dst ? "misaligned or NULL dst" :
81 1316775 : bad_sz ? "misaligned sz" :
82 1316775 : "not in past" ));
83 :
84 : /* At this point, we have a valid read request. Map seq0 into the
85 : bstream store. Read the lesser of sz bytes or until the store end.
86 : If we hit the store end with more to go, wrap around and finish the
87 : read at the store start. */
88 :
89 1316775 : int dev_fd = bd->dev_fd;
90 1316775 : ulong dev_base = bd->dev_base;
91 1316775 : ulong dev_sz = bd->dev_sz;
92 :
93 1316775 : ulong dev_off = seq0 % dev_sz;
94 :
95 1316775 : ulong rsz = fd_ulong_min( sz, dev_sz - dev_off );
96 1316775 : bd_read( dev_fd, dev_base + dev_off, dst, rsz );
97 1316775 : sz -= rsz;
98 :
99 1316775 : if( FD_UNLIKELY( sz ) ) bd_read( dev_fd, dev_base, dst + rsz, sz );
100 1316775 : }
101 :
102 : static void
103 : fd_vinyl_io_bd_read( fd_vinyl_io_t * io,
104 1873707 : fd_vinyl_io_rd_t * _rd ) {
105 1873707 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *) io; /* Note: io must be non-NULL to have even been called */
106 1873707 : fd_vinyl_io_bd_rd_t * rd = (fd_vinyl_io_bd_rd_t *)_rd;
107 :
108 1873707 : rd->next = NULL;
109 1873707 : *bd->rd_tail_next = rd;
110 1873707 : bd->rd_tail_next = &rd->next;
111 :
112 1873707 : ulong seq0 = rd->seq;
113 1873707 : uchar * dst = (uchar *)rd->dst;
114 1873707 : ulong sz = rd->sz;
115 :
116 : /* If this is a request to read nothing, succeed immediately. If
117 : this is a request to read outside the bstream's past, fail. */
118 :
119 1873707 : if( FD_UNLIKELY( !sz ) ) return;
120 :
121 1645263 : ulong seq1 = seq0 + sz;
122 :
123 1645263 : ulong seq_past = bd->base->seq_past;
124 1645263 : ulong seq_present = bd->base->seq_present;
125 :
126 1645263 : int bad_seq = !fd_ulong_is_aligned( seq0, FD_VINYL_BSTREAM_BLOCK_SZ );
127 1645263 : int bad_dst = (!fd_ulong_is_aligned( (ulong)dst, FD_VINYL_BSTREAM_BLOCK_SZ )) | !dst;
128 1645263 : int bad_sz = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
129 1645263 : int bad_past = !(fd_vinyl_seq_le( seq_past, seq0 ) & fd_vinyl_seq_lt( seq0, seq1 ) & fd_vinyl_seq_le( seq1, seq_present ));
130 :
131 1645263 : if( FD_UNLIKELY( bad_seq | bad_dst | bad_sz | bad_past ) )
132 0 : FD_LOG_CRIT(( "bstream read [%016lx,%016lx)/%lu failed (past [%016lx,%016lx)/%lu, %s)",
133 1645263 : seq0, seq1, sz, seq_past, seq_present, seq_present-seq_past,
134 1645263 : bad_seq ? "misaligned seq" :
135 1645263 : bad_dst ? "misaligned or NULL dst" :
136 1645263 : bad_sz ? "misaligned sz" :
137 1645263 : "not in past" ));
138 :
139 : /* At this point, we have a valid read request. Map seq0 into the
140 : bstream store. Read the lesser of sz bytes or until the store end.
141 : If we hit the store end with more to go, wrap around and finish the
142 : read at the store start. */
143 :
144 1645263 : int dev_fd = bd->dev_fd;
145 1645263 : ulong dev_base = bd->dev_base;
146 1645263 : ulong dev_sz = bd->dev_sz;
147 :
148 1645263 : ulong dev_off = seq0 % dev_sz;
149 :
150 1645263 : ulong rsz = fd_ulong_min( sz, dev_sz - dev_off );
151 1645263 : bd_read( dev_fd, dev_base + dev_off, dst, rsz );
152 1645263 : sz -= rsz;
153 :
154 1645263 : if( FD_UNLIKELY( sz ) ) bd_read( dev_fd, dev_base, dst + rsz, sz );
155 1645263 : }
156 :
157 : static int
158 : fd_vinyl_io_bd_poll( fd_vinyl_io_t * io,
159 : fd_vinyl_io_rd_t ** _rd,
160 3747414 : int flags ) {
161 3747414 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t * )io; /* Note: io must be non-NULL to have even been called */
162 3747414 : (void)flags;
163 :
164 3747414 : fd_vinyl_io_bd_rd_t * rd = bd->rd_head;
165 :
166 3747414 : if( FD_UNLIKELY( !rd ) ) {
167 1873707 : *_rd = NULL;
168 1873707 : return FD_VINYL_ERR_EMPTY;
169 1873707 : }
170 :
171 1873707 : fd_vinyl_io_bd_rd_t ** rd_tail_next = bd->rd_tail_next;
172 1873707 : fd_vinyl_io_bd_rd_t * rd_next = rd->next;
173 :
174 1873707 : bd->rd_head = rd_next;
175 1873707 : bd->rd_tail_next = fd_ptr_if( !!rd_next, rd_tail_next, &bd->rd_head );
176 :
177 1873707 : *_rd = (fd_vinyl_io_rd_t *)rd;
178 1873707 : return FD_VINYL_SUCCESS;
179 3747414 : }
180 :
181 : static ulong
182 : fd_vinyl_io_bd_append( fd_vinyl_io_t * io,
183 : void const * _src,
184 374868 : ulong sz ) {
185 374868 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
186 374868 : uchar const * src = (uchar const *)_src;
187 :
188 : /* Validate the input args. */
189 :
190 374868 : ulong seq_future = bd->base->seq_future; if( FD_UNLIKELY( !sz ) ) return seq_future;
191 374868 : ulong seq_ancient = bd->base->seq_ancient;
192 374868 : int dev_fd = bd->dev_fd;
193 374868 : ulong dev_base = bd->dev_base;
194 374868 : ulong dev_sz = bd->dev_sz;
195 :
196 374868 : int bad_src = !src;
197 374868 : int bad_align = !fd_ulong_is_aligned( (ulong)src, FD_VINYL_BSTREAM_BLOCK_SZ );
198 374868 : int bad_sz = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
199 374868 : int bad_capacity = sz > (dev_sz - (seq_future-seq_ancient));
200 :
201 374868 : if( FD_UNLIKELY( bad_src | bad_align | bad_sz | bad_capacity ) )
202 0 : FD_LOG_CRIT(( bad_src ? "NULL src" :
203 374868 : bad_align ? "misaligned src" :
204 374868 : bad_sz ? "misaligned sz" :
205 374868 : "device full" ));
206 :
207 : /* At this point, we appear to have a valid append request. Map it to
208 : the bstream (updating seq_future) and map it to the device. Then
209 : write the lesser of sz bytes or until the store end. If we hit the
210 : store end with more to go, wrap around and finish the write at the
211 : store start. */
212 :
213 374868 : ulong seq = seq_future;
214 374868 : bd->base->seq_future = seq + sz;
215 :
216 374868 : ulong dev_off = seq % dev_sz;
217 :
218 374868 : ulong wsz = fd_ulong_min( sz, dev_sz - dev_off );
219 374868 : bd_write( dev_fd, dev_base + dev_off, src, wsz );
220 374868 : sz -= wsz;
221 374868 : if( sz ) bd_write( dev_fd, dev_base, src + wsz, sz );
222 :
223 374868 : return seq;
224 374868 : }
225 :
226 : static int
227 : fd_vinyl_io_bd_commit( fd_vinyl_io_t * io,
228 374532 : int flags ) {
229 374532 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
230 374532 : (void)flags;
231 :
232 374532 : bd->base->seq_present = bd->base->seq_future;
233 374532 : bd->base->spad_used = 0UL;
234 :
235 374532 : return FD_VINYL_SUCCESS;
236 374532 : }
237 :
238 : static ulong
239 : fd_vinyl_io_bd_hint( fd_vinyl_io_t * io,
240 376362 : ulong sz ) {
241 376362 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
242 :
243 376362 : ulong seq_future = bd->base->seq_future; if( FD_UNLIKELY( !sz ) ) return seq_future;
244 376362 : ulong seq_ancient = bd->base->seq_ancient;
245 376362 : ulong dev_sz = bd->dev_sz;
246 :
247 376362 : int bad_sz = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
248 376362 : int bad_capacity = sz > (dev_sz - (seq_future-seq_ancient));
249 :
250 376362 : if( FD_UNLIKELY( bad_sz | bad_capacity ) ) FD_LOG_CRIT(( bad_sz ? "misaligned sz" : "device full" ));
251 :
252 376362 : return bd->base->seq_future;
253 376362 : }
254 :
255 : static void *
256 : fd_vinyl_io_bd_alloc( fd_vinyl_io_t * io,
257 : ulong sz,
258 363 : int flags ) {
259 363 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
260 :
261 363 : ulong spad_max = bd->base->spad_max;
262 363 : ulong spad_used = bd->base->spad_used; if( FD_UNLIKELY( !sz ) ) return ((uchar *)(bd+1)) + spad_used;
263 :
264 363 : int bad_align = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
265 363 : int bad_sz = sz > spad_max;
266 :
267 363 : if( FD_UNLIKELY( bad_align | bad_sz ) ) FD_LOG_CRIT(( bad_align ? "misaligned sz" : "sz too large" ));
268 :
269 363 : if( FD_UNLIKELY( sz > (spad_max - spad_used ) ) ) {
270 0 : if( FD_UNLIKELY( fd_vinyl_io_bd_commit( io, flags ) ) ) return NULL;
271 0 : spad_used = 0UL;
272 0 : }
273 :
274 363 : bd->base->spad_used = spad_used + sz;
275 :
276 363 : return ((uchar *)(bd+1)) + spad_used;
277 363 : }
278 :
279 : static ulong
280 : fd_vinyl_io_bd_copy( fd_vinyl_io_t * io,
281 : ulong seq_src0,
282 376227 : ulong sz ) {
283 376227 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
284 :
285 : /* Validate the input args */
286 :
287 376227 : ulong seq_ancient = bd->base->seq_ancient;
288 376227 : ulong seq_past = bd->base->seq_past;
289 376227 : ulong seq_present = bd->base->seq_present;
290 376227 : ulong seq_future = bd->base->seq_future; if( FD_UNLIKELY( !sz ) ) return seq_future;
291 330246 : ulong spad_max = bd->base->spad_max;
292 330246 : ulong spad_used = bd->base->spad_used;
293 330246 : int dev_fd = bd->dev_fd;
294 330246 : ulong dev_base = bd->dev_base;
295 330246 : ulong dev_sz = bd->dev_sz;
296 :
297 330246 : ulong seq_src1 = seq_src0 + sz;
298 :
299 330246 : int bad_past = !( fd_vinyl_seq_le( seq_past, seq_src0 ) &
300 330246 : fd_vinyl_seq_lt( seq_src0, seq_src1 ) &
301 330246 : fd_vinyl_seq_le( seq_src1, seq_present ) );
302 330246 : int bad_src = !fd_ulong_is_aligned( seq_src0, FD_VINYL_BSTREAM_BLOCK_SZ );
303 330246 : int bad_sz = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
304 330246 : int bad_capacity = sz > (dev_sz - (seq_future-seq_ancient));
305 :
306 330246 : if( FD_UNLIKELY( bad_past | bad_src | bad_sz | bad_capacity ) )
307 0 : FD_LOG_CRIT(( bad_past ? "src is not in the past" :
308 330246 : bad_src ? "misaligned src_seq" :
309 330246 : bad_sz ? "misaligned sz" :
310 330246 : "device full" ));
311 :
312 : /* At this point, we appear to have a valid copy request. Get
313 : buffer space from the scratch pad (committing as necessary). */
314 :
315 330246 : if( FD_UNLIKELY( sz>(spad_max-spad_used) ) ) {
316 0 : fd_vinyl_io_bd_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
317 0 : spad_used = 0UL;
318 0 : }
319 :
320 330246 : uchar * buf = (uchar *)(bd+1) + spad_used;
321 330246 : ulong buf_max = spad_max - spad_used;
322 :
323 : /* Map the dst to the bstream (updating seq_future) and map the src
324 : and dst regions onto the device. Then copy as much as we can at a
325 : time, handling device wrap around and copy buffering space. */
326 :
327 330246 : ulong seq = seq_future;
328 330246 : bd->base->seq_future = seq + sz;
329 :
330 330246 : ulong seq_dst0 = seq;
331 :
332 330273 : for(;;) {
333 330273 : ulong src_off = seq_src0 % dev_sz;
334 330273 : ulong dst_off = seq_dst0 % dev_sz;
335 330273 : ulong csz = fd_ulong_min( fd_ulong_min( sz, buf_max ), fd_ulong_min( dev_sz - src_off, dev_sz - dst_off ) );
336 :
337 330273 : bd_read ( dev_fd, dev_base + src_off, buf, csz );
338 330273 : bd_write( dev_fd, dev_base + dst_off, buf, csz );
339 :
340 330273 : sz -= csz;
341 330273 : if( !sz ) break;
342 :
343 27 : seq_src0 += csz;
344 27 : seq_dst0 += csz;
345 27 : }
346 :
347 330246 : return seq;
348 330246 : }
349 :
350 : static void
351 : fd_vinyl_io_bd_forget( fd_vinyl_io_t * io,
352 133263 : ulong seq ) {
353 133263 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
354 :
355 : /* Validate input arguments. Note that we don't allow forgetting into
356 : the future even when we have no uncommitted blocks because the
357 : resulting [seq_ancient,seq_future) might contain blocks that were
358 : never written (which might not be an issue practically but it would
359 : be a bit strange for something to try to scan starting from
360 : seq_ancient and discover unwritten blocks). */
361 :
362 133263 : ulong seq_past = bd->base->seq_past;
363 133263 : ulong seq_present = bd->base->seq_present;
364 133263 : ulong seq_future = bd->base->seq_future;
365 :
366 133263 : int bad_seq = !fd_ulong_is_aligned( seq, FD_VINYL_BSTREAM_BLOCK_SZ );
367 133263 : int bad_dir = !(fd_vinyl_seq_le( seq_past, seq ) & fd_vinyl_seq_le( seq, seq_present ));
368 133263 : int bad_read = !!bd->rd_head;
369 133263 : int bad_append = fd_vinyl_seq_ne( seq_present, seq_future );
370 :
371 133263 : if( FD_UNLIKELY( bad_seq | bad_dir | bad_read | bad_append ) )
372 0 : FD_LOG_CRIT(( "forget to seq %016lx failed (past [%016lx,%016lx)/%lu, %s)",
373 133263 : seq, seq_past, seq_present, seq_present-seq_past,
374 133263 : bad_seq ? "misaligned seq" :
375 133263 : bad_dir ? "seq out of bounds" :
376 133263 : bad_read ? "reads in progress" :
377 133263 : "appends/copies in progress" ));
378 :
379 133263 : bd->base->seq_past = seq;
380 133263 : }
381 :
382 : static void
383 : fd_vinyl_io_bd_rewind( fd_vinyl_io_t * io,
384 132108 : ulong seq ) {
385 132108 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
386 :
387 : /* Validate input argments. Unlike forgot, we do allow rewinding to
388 : before seq_ancient as the region of sequence space reported to the
389 : caller as written is still accurate. */
390 :
391 132108 : ulong seq_ancient = bd->base->seq_ancient;
392 132108 : ulong seq_past = bd->base->seq_past;
393 132108 : ulong seq_present = bd->base->seq_present;
394 132108 : ulong seq_future = bd->base->seq_future;
395 :
396 132108 : int bad_seq = !fd_ulong_is_aligned( seq, FD_VINYL_BSTREAM_BLOCK_SZ );
397 132108 : int bad_dir = fd_vinyl_seq_gt( seq, seq_present );
398 132108 : int bad_read = !!bd->rd_head;
399 132108 : int bad_append = fd_vinyl_seq_ne( seq_present, seq_future );
400 :
401 132108 : if( FD_UNLIKELY( bad_seq | bad_dir | bad_read | bad_append ) )
402 0 : FD_LOG_CRIT(( "rewind to seq %016lx failed (present %016lx, %s)", seq, seq_present,
403 132108 : bad_seq ? "misaligned seq" :
404 132108 : bad_dir ? "seq after seq_present" :
405 132108 : bad_read ? "reads in progress" :
406 132108 : "appends/copies in progress" ));
407 :
408 132108 : bd->base->seq_ancient = fd_ulong_if( fd_vinyl_seq_ge( seq, seq_ancient ), seq_ancient, seq );
409 132108 : bd->base->seq_past = fd_ulong_if( fd_vinyl_seq_ge( seq, seq_past ), seq_past, seq );
410 132108 : bd->base->seq_present = seq;
411 132108 : bd->base->seq_future = seq;
412 132108 : }
413 :
414 : static int
415 : fd_vinyl_io_bd_sync( fd_vinyl_io_t * io,
416 374442 : int flags ) {
417 374442 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
418 374442 : (void)flags;
419 :
420 374442 : ulong seed = bd->base->seed;
421 374442 : ulong seq_past = bd->base->seq_past;
422 374442 : ulong seq_present = bd->base->seq_present;
423 :
424 374442 : int dev_fd = bd->dev_fd;
425 374442 : ulong dev_sync = bd->dev_sync;
426 :
427 374442 : fd_vinyl_bstream_block_t * block = bd->sync;
428 :
429 : /* block->sync.ctl current (static) */
430 374442 : block->sync.seq_past = seq_past;
431 374442 : block->sync.seq_present = seq_present;
432 : /* block->sync.info_sz current (static) */
433 : /* block->sync.info current (static) */
434 :
435 374442 : block->sync.hash_trail = 0UL;
436 374442 : block->sync.hash_blocks = 0UL;
437 374442 : fd_vinyl_bstream_block_hash( seed, block ); /* sets hash_trail back to seed */
438 :
439 374442 : bd_write( dev_fd, dev_sync, block, FD_VINYL_BSTREAM_BLOCK_SZ );
440 :
441 374442 : bd->base->seq_ancient = seq_past;
442 :
443 374442 : return FD_VINYL_SUCCESS;
444 374442 : }
445 :
446 : static void *
447 6 : fd_vinyl_io_bd_fini( fd_vinyl_io_t * io ) {
448 6 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
449 :
450 6 : ulong seq_present = bd->base->seq_present;
451 6 : ulong seq_future = bd->base->seq_future;
452 :
453 6 : if( FD_UNLIKELY( bd->rd_head ) ) FD_LOG_WARNING(( "fini completing outstanding reads" ));
454 6 : if( FD_UNLIKELY( fd_vinyl_seq_ne( seq_present, seq_future ) ) ) FD_LOG_WARNING(( "fini discarding uncommited blocks" ));
455 :
456 6 : return io;
457 6 : }
458 :
459 : static fd_vinyl_io_impl_t fd_vinyl_io_bd_impl[1] = { {
460 : fd_vinyl_io_bd_read_imm,
461 : fd_vinyl_io_bd_read,
462 : fd_vinyl_io_bd_poll,
463 : fd_vinyl_io_bd_append,
464 : fd_vinyl_io_bd_commit,
465 : fd_vinyl_io_bd_hint,
466 : fd_vinyl_io_bd_alloc,
467 : fd_vinyl_io_bd_copy,
468 : fd_vinyl_io_bd_forget,
469 : fd_vinyl_io_bd_rewind,
470 : fd_vinyl_io_bd_sync,
471 : fd_vinyl_io_bd_fini
472 : } };
473 :
474 : FD_STATIC_ASSERT( alignof(fd_vinyl_io_bd_t)==FD_VINYL_BSTREAM_BLOCK_SZ, layout );
475 :
476 : ulong
477 39 : fd_vinyl_io_bd_align( void ) {
478 39 : return alignof(fd_vinyl_io_bd_t);
479 39 : }
480 :
481 : ulong
482 39 : fd_vinyl_io_bd_footprint( ulong spad_max ) {
483 39 : if( FD_UNLIKELY( !((0UL<spad_max) & (spad_max<(1UL<<63)) & fd_ulong_is_aligned( spad_max, FD_VINYL_BSTREAM_BLOCK_SZ )) ) )
484 12 : return 0UL;
485 27 : return sizeof(fd_vinyl_io_bd_t) + spad_max;
486 39 : }
487 :
488 : fd_vinyl_io_t *
489 : fd_vinyl_io_bd_init( void * mem,
490 : ulong spad_max,
491 : int dev_fd,
492 : int reset,
493 : void const * info,
494 : ulong info_sz,
495 39 : ulong io_seed ) {
496 39 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)mem;
497 :
498 39 : if( FD_UNLIKELY( !bd ) ) {
499 3 : FD_LOG_WARNING(( "NULL mem" ));
500 3 : return NULL;
501 3 : }
502 :
503 36 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)bd, fd_vinyl_io_bd_align() ) ) ) {
504 3 : FD_LOG_WARNING(( "misaligned mem" ));
505 3 : return NULL;
506 3 : }
507 :
508 33 : ulong footprint = fd_vinyl_io_bd_footprint( spad_max );
509 33 : if( FD_UNLIKELY( !footprint ) ) {
510 9 : FD_LOG_WARNING(( "bad spad_max" ));
511 9 : return NULL;
512 9 : }
513 :
514 24 : off_t _dev_sz = lseek( dev_fd, (off_t)0, SEEK_END );
515 24 : if( FD_UNLIKELY( _dev_sz<(off_t)0 ) ) {
516 6 : FD_LOG_WARNING(( "lseek failed, bstream must be seekable (%i-%s)", errno, fd_io_strerror( errno ) ));
517 6 : return NULL;
518 6 : }
519 18 : ulong dev_sz = (ulong)_dev_sz;
520 :
521 18 : ulong dev_sz_min = 3UL*FD_VINYL_BSTREAM_BLOCK_SZ /* sync block, move block, closing partition */
522 18 : + fd_vinyl_bstream_pair_sz( FD_VINYL_VAL_MAX ); /* worst case pair (FIXME: LZ4_COMPRESSBOUND?) */
523 :
524 18 : int too_small = dev_sz < dev_sz_min;
525 18 : int too_large = dev_sz > (ulong)LONG_MAX;
526 18 : int misaligned = !fd_ulong_is_aligned( dev_sz, FD_VINYL_BSTREAM_BLOCK_SZ );
527 :
528 18 : if( FD_UNLIKELY( too_small | too_large | misaligned ) ) {
529 6 : FD_LOG_WARNING(( "bstream size %s", too_small ? "too small" :
530 6 : too_large ? "too large" :
531 6 : "not a block size multiple" ));
532 6 : return NULL;
533 6 : }
534 :
535 12 : if( reset ) {
536 6 : if( FD_UNLIKELY( !info ) ) info_sz = 0UL;
537 6 : if( FD_UNLIKELY( info_sz>FD_VINYL_BSTREAM_SYNC_INFO_MAX ) ) {
538 3 : FD_LOG_WARNING(( "info_sz too large" ));
539 3 : return NULL;
540 3 : }
541 6 : }
542 :
543 9 : memset( bd, 0, footprint );
544 :
545 9 : bd->base->type = FD_VINYL_IO_TYPE_BD;
546 :
547 : /* io_seed, seq_ancient, seq_past, seq_present, seq_future are init
548 : below */
549 :
550 9 : bd->base->spad_max = spad_max;
551 9 : bd->base->spad_used = 0UL;
552 9 : bd->base->impl = fd_vinyl_io_bd_impl;
553 :
554 9 : bd->dev_fd = dev_fd;
555 9 : bd->dev_sync = 0UL; /* Use the beginning of the file for the sync block */
556 9 : bd->dev_base = FD_VINYL_BSTREAM_BLOCK_SZ; /* Use the rest for the actual bstream store (at least 3.5 KiB) */
557 9 : bd->dev_sz = dev_sz - FD_VINYL_BSTREAM_BLOCK_SZ;
558 :
559 9 : bd->rd_head = NULL;
560 9 : bd->rd_tail_next = &bd->rd_head;
561 :
562 : /* Note that [seq_ancient,seq_future) (cyclic) contains at most dev_sz
563 : bytes, bstream's antiquity, past and present are subsets of this
564 : range and dev_sz is less than 2^63 given the above (practically
565 : much much less). As such, differences between two ordered bstream
566 : sequence numbers (e.g. ulong sz = seq_a - seq_b where a is
567 : logically not before b) will "just work" regardless of wrapping
568 : and/or amount of data stored. */
569 :
570 : /* FIXME: Consider having the sync block on a completely separate
571 : device (to reduce seeking when syncing). */
572 :
573 9 : fd_vinyl_bstream_block_t * block = bd->sync;
574 :
575 9 : if( reset ) {
576 :
577 : /* We are starting a new bstream. Write the initial sync block. */
578 :
579 3 : bd->base->seed = io_seed;
580 3 : bd->base->seq_ancient = 0UL;
581 3 : bd->base->seq_past = 0UL;
582 3 : bd->base->seq_present = 0UL;
583 3 : bd->base->seq_future = 0UL;
584 :
585 3 : memset( block, 0, FD_VINYL_BSTREAM_BLOCK_SZ ); /* bulk zero */
586 :
587 3 : block->sync.ctl = fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_SYNC, 0, FD_VINYL_VAL_MAX );
588 : //block->sync.seq_past = ...; /* init by sync */
589 : //block->sync.seq_present = ...; /* init by sync */
590 3 : block->sync.info_sz = info_sz;
591 3 : if( info_sz ) memcpy( block->sync.info, info, info_sz );
592 : //block->sync.hash_trail = ...; /* init by sync */
593 : //block->sync.hash_blocks = ...; /* init by sync */
594 :
595 3 : int err = fd_vinyl_io_bd_sync( bd->base, FD_VINYL_IO_FLAG_BLOCKING ); /* logs details */
596 3 : if( FD_UNLIKELY( err ) ) {
597 0 : FD_LOG_WARNING(( "sync block write failed (%i-%s)", err, fd_vinyl_strerror( err ) ));
598 0 : return NULL;
599 0 : }
600 :
601 6 : } else {
602 :
603 : /* We are resuming an existing bstream. Read and validate the
604 : bstream's sync block. */
605 :
606 6 : bd_read( dev_fd, bd->dev_sync, block, FD_VINYL_BSTREAM_BLOCK_SZ ); /* logs details */
607 :
608 6 : int type = fd_vinyl_bstream_ctl_type ( block->sync.ctl );
609 6 : int version = fd_vinyl_bstream_ctl_style( block->sync.ctl );
610 6 : ulong val_max = fd_vinyl_bstream_ctl_sz ( block->sync.ctl );
611 6 : ulong seq_past = block->sync.seq_past;
612 6 : ulong seq_present = block->sync.seq_present;
613 6 : /**/ info_sz = block->sync.info_sz; // overrides user info_sz
614 6 : /**/ info = block->sync.info; // overrides user info
615 6 : /**/ io_seed = block->sync.hash_trail; // overrides user io_seed
616 :
617 6 : int bad_type = (type != FD_VINYL_BSTREAM_CTL_TYPE_SYNC);
618 6 : int bad_version = (version != 0);
619 6 : int bad_val_max = (val_max != FD_VINYL_VAL_MAX);
620 6 : int bad_seq_past = !fd_ulong_is_aligned( seq_past, FD_VINYL_BSTREAM_BLOCK_SZ );
621 6 : int bad_seq_present = !fd_ulong_is_aligned( seq_present, FD_VINYL_BSTREAM_BLOCK_SZ );
622 6 : int bad_info_sz = (info_sz > FD_VINYL_BSTREAM_SYNC_INFO_MAX);
623 6 : int bad_past_order = fd_vinyl_seq_gt( seq_past, seq_present );
624 6 : int bad_past_sz = ((seq_present-seq_past) > bd->dev_sz);
625 :
626 6 : if( FD_UNLIKELY( bad_type | bad_version | bad_val_max | bad_seq_past | bad_seq_present | bad_info_sz |
627 6 : bad_past_order | bad_past_sz ) ) {
628 3 : FD_LOG_WARNING(( "bad sync block when recovering bstream (%s)",
629 3 : bad_type ? "unexpected type" :
630 3 : bad_version ? "unexpected version" :
631 3 : bad_val_max ? "unexpected max pair value decoded byte size" :
632 3 : bad_seq_past ? "unaligned seq_past" :
633 3 : bad_seq_present ? "unaligned seq_present" :
634 3 : bad_info_sz ? "unexpected info size" :
635 3 : bad_past_order ? "unordered seq_past and seq_present" :
636 3 : "past size larger than bstream store" ));
637 3 : return NULL;
638 3 : }
639 :
640 3 : if( FD_UNLIKELY( fd_vinyl_bstream_block_test( io_seed, block ) ) ) {
641 0 : FD_LOG_WARNING(( "corrupt sync block when recovering bstream" ));
642 0 : return NULL;
643 0 : }
644 :
645 3 : bd->base->seed = io_seed;
646 3 : bd->base->seq_ancient = seq_past;
647 3 : bd->base->seq_past = seq_past;
648 3 : bd->base->seq_present = seq_present;
649 3 : bd->base->seq_future = seq_present;
650 :
651 3 : }
652 :
653 6 : FD_LOG_NOTICE(( "IO config"
654 6 : "\n\ttype bd"
655 6 : "\n\tspad_max %lu bytes"
656 6 : "\n\tdev_sz %lu bytes"
657 6 : "\n\treset %i"
658 6 : "\n\tinfo \"%s\" (info_sz %lu%s)"
659 6 : "\n\tio_seed 0x%016lx%s",
660 6 : spad_max, dev_sz, reset,
661 6 : info ? (char const *)info : "", info_sz, reset ? "" : ", discovered",
662 6 : io_seed, reset ? "" : " (discovered)" ));
663 :
664 6 : return bd->base;
665 9 : }
|