Line data Source code
1 : #include "fd_vinyl_io.h"
2 : #include <lz4.h>
3 :
4 : /* Using a separate translation unit for fini is a little silly but
5 : compiler won't inline stuff with heavy operations like logging and
6 : we don't want to give up logging edge cases with constructors. */
7 :
8 : void *
9 18 : fd_vinyl_io_fini( fd_vinyl_io_t * io ) {
10 18 : if( !io ) {
11 6 : FD_LOG_WARNING(( "NULL io" ));
12 6 : return NULL;
13 6 : }
14 12 : return io->impl->fini( io );
15 18 : }
16 :
17 : ulong
18 0 : fd_vinyl_io_spad_est( void ) {
19 0 : return 2UL*fd_vinyl_bstream_pair_sz( (ulong)LZ4_COMPRESSBOUND( (int)FD_VINYL_VAL_MAX ) );
20 0 : }
21 :
22 : ulong
23 : fd_vinyl_io_append_pair_raw( fd_vinyl_io_t * io,
24 : fd_vinyl_key_t const * key,
25 : fd_vinyl_info_t const * info,
26 0 : void const * val ) {
27 :
28 : /* Allocate scratch to hold the formatted pair */
29 :
30 0 : ulong val_sz = (ulong)info->val_sz;
31 0 : FD_CRIT( val_sz<=FD_VINYL_VAL_MAX, "corruption detected" );
32 :
33 0 : ulong pair_sz = fd_vinyl_bstream_pair_sz( val_sz );
34 0 : uchar * pair = (uchar *)fd_vinyl_io_alloc( io, pair_sz, FD_VINYL_IO_FLAG_BLOCKING );
35 :
36 0 : uchar * dst = pair;
37 0 : ulong dst_rem = pair_sz; (void)dst_rem;
38 :
39 : /* Gather the pair header */
40 :
41 0 : fd_vinyl_bstream_phdr_t * phdr = (fd_vinyl_bstream_phdr_t *)dst;
42 :
43 0 : phdr->ctl = fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_PAIR, FD_VINYL_BSTREAM_CTL_STYLE_RAW, val_sz );
44 0 : phdr->key = *key;
45 0 : phdr->info = *info;
46 :
47 0 : dst += sizeof(fd_vinyl_bstream_phdr_t);
48 0 : dst_rem -= sizeof(fd_vinyl_bstream_phdr_t);
49 :
50 : /* Gather the pair value */
51 :
52 0 : if( val_sz ) memcpy( dst, val, val_sz );
53 :
54 0 : dst += val_sz;
55 0 : dst_rem -= val_sz;
56 :
57 0 : fd_vinyl_bstream_pair_hash( fd_vinyl_io_seed( io ), (fd_vinyl_bstream_block_t *)pair );
58 :
59 0 : return fd_vinyl_io_append( io, pair, pair_sz );
60 0 : }
61 :
62 : ulong
63 : fd_vinyl_io_append_dead( fd_vinyl_io_t * io,
64 : fd_vinyl_bstream_phdr_t const * phdr,
65 : void const * info,
66 0 : ulong info_sz ) {
67 :
68 0 : if( !info ) info_sz = 0UL;
69 0 : FD_CRIT( info_sz<=FD_VINYL_BSTREAM_DEAD_INFO_MAX, "corruption detected" );
70 :
71 0 : fd_vinyl_bstream_block_t * block = (fd_vinyl_bstream_block_t *)
72 0 : fd_vinyl_io_alloc( io, FD_VINYL_BSTREAM_BLOCK_SZ, FD_VINYL_IO_FLAG_BLOCKING );
73 :
74 0 : memset( block, 0, FD_VINYL_BSTREAM_BLOCK_SZ ); /* bulk zero */
75 :
76 0 : block->dead.ctl = fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_DEAD,
77 0 : FD_VINYL_BSTREAM_CTL_STYLE_RAW,
78 0 : FD_VINYL_BSTREAM_BLOCK_SZ );
79 0 : block->dead.seq = io->seq_future;
80 0 : block->dead.phdr = *phdr;
81 0 : block->dead.info_sz = info_sz;
82 0 : if( info_sz ) memcpy( block->dead.info, info, info_sz );
83 :
84 0 : fd_vinyl_bstream_block_hash( fd_vinyl_io_seed( io ), block );
85 :
86 0 : return fd_vinyl_io_append( io, block, FD_VINYL_BSTREAM_BLOCK_SZ );
87 0 : }
88 :
89 : ulong
90 : fd_vinyl_io_append_move( fd_vinyl_io_t * io,
91 : fd_vinyl_bstream_phdr_t const * src,
92 : fd_vinyl_key_t const * dst,
93 : void const * info,
94 0 : ulong info_sz ) {
95 :
96 0 : if( !info ) info_sz = 0UL;
97 0 : FD_CRIT( info_sz<=FD_VINYL_BSTREAM_MOVE_INFO_MAX, "corruption detected" );
98 :
99 0 : fd_vinyl_bstream_block_t * block = (fd_vinyl_bstream_block_t *)
100 0 : fd_vinyl_io_alloc( io, FD_VINYL_BSTREAM_BLOCK_SZ, FD_VINYL_IO_FLAG_BLOCKING );
101 :
102 0 : memset( block, 0, FD_VINYL_BSTREAM_BLOCK_SZ ); /* bulk zero */
103 :
104 0 : block->move.ctl = fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_MOVE,
105 0 : FD_VINYL_BSTREAM_CTL_STYLE_RAW,
106 0 : FD_VINYL_BSTREAM_BLOCK_SZ );
107 0 : block->move.seq = io->seq_future;
108 0 : block->move.src = *src;
109 0 : block->move.dst = *dst;
110 0 : block->move.info_sz = info_sz;
111 0 : if( info_sz ) memcpy( block->move.info, info, info_sz );
112 :
113 0 : fd_vinyl_bstream_block_hash( fd_vinyl_io_seed( io ), block );
114 :
115 0 : return fd_vinyl_io_append( io, block, FD_VINYL_BSTREAM_BLOCK_SZ );
116 0 : }
117 :
118 : ulong
119 : fd_vinyl_io_append_part( fd_vinyl_io_t * io,
120 : ulong seq0,
121 : ulong dead_cnt,
122 : ulong move_cnt,
123 : void const * info,
124 0 : ulong info_sz ) {
125 :
126 0 : if( !info ) info_sz = 0UL;
127 0 : FD_CRIT( info_sz<=FD_VINYL_BSTREAM_PART_INFO_MAX, "corruption detected" );
128 :
129 0 : fd_vinyl_bstream_block_t * block = (fd_vinyl_bstream_block_t *)
130 0 : fd_vinyl_io_alloc( io, FD_VINYL_BSTREAM_BLOCK_SZ, FD_VINYL_IO_FLAG_BLOCKING );
131 :
132 0 : memset( block, 0, FD_VINYL_BSTREAM_BLOCK_SZ ); /* bulk zero */
133 :
134 0 : block->part.ctl = fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_PART,
135 0 : FD_VINYL_BSTREAM_CTL_STYLE_RAW,
136 0 : FD_VINYL_BSTREAM_BLOCK_SZ );
137 0 : block->part.seq = io->seq_future;
138 0 : block->part.seq0 = seq0;
139 0 : block->part.dead_cnt = dead_cnt;
140 0 : block->part.move_cnt = move_cnt;
141 0 : block->part.info_sz = info_sz;
142 0 : if( info_sz ) memcpy( block->part.info, info, info_sz );
143 :
144 0 : fd_vinyl_bstream_block_hash( fd_vinyl_io_seed( io ), block );
145 :
146 0 : return fd_vinyl_io_append( io, block, FD_VINYL_BSTREAM_BLOCK_SZ );
147 0 : }
148 :
149 : ulong
150 : fd_vinyl_io_append_pair_inplace( fd_vinyl_io_t * io,
151 : int style,
152 : fd_vinyl_bstream_phdr_t * phdr,
153 : int * _style,
154 0 : ulong * _val_esz ) {
155 :
156 0 : ulong val_sz = (ulong)phdr->info.val_sz;
157 :
158 0 : FD_CRIT( val_sz <= FD_VINYL_VAL_MAX, "corruption detected" );
159 :
160 0 : ulong pair_sz = fd_vinyl_bstream_pair_sz( val_sz );
161 :
162 0 : ulong phdr_ctl = phdr->ctl;
163 :
164 0 : FD_CRIT( fd_vinyl_bstream_ctl_style( phdr_ctl )==FD_VINYL_BSTREAM_CTL_STYLE_RAW, "corruption detected" );
165 0 : FD_CRIT( fd_vinyl_bstream_ctl_sz ( phdr_ctl )==val_sz, "corruption detected" );
166 :
167 0 : switch( style ) {
168 :
169 0 : case FD_VINYL_BSTREAM_CTL_STYLE_RAW: break;
170 :
171 0 : case FD_VINYL_BSTREAM_CTL_STYLE_LZ4: {
172 :
173 : /* If the pair is already small enough, no point in compressing.
174 : Otherwise, allocate scratch from the io append spad for the worst
175 : case compressed size and compress the pair val into it. If
176 : compression fails (shouldn't given use of LZ4_COMPRESSBOUND) or
177 : the value doesn't compress enough to make a difference in bstream
178 : usage, free the scratch allocation and append the uncompressed
179 : version. */
180 :
181 0 : if( FD_UNLIKELY( val_sz<=FD_VINYL_BSTREAM_LZ4_VAL_THRESH ) ) break;
182 :
183 0 : ulong cval_max = (ulong)LZ4_COMPRESSBOUND( (int)val_sz );
184 0 : ulong cpair_max = fd_vinyl_bstream_pair_sz( cval_max );
185 0 : fd_vinyl_bstream_phdr_t * cphdr = (fd_vinyl_bstream_phdr_t *)fd_vinyl_io_alloc( io, cpair_max, FD_VINYL_IO_FLAG_BLOCKING );
186 :
187 0 : ulong cval_sz = (ulong)LZ4_compress_default( (char const *)(phdr+1), (char *)(cphdr+1), (int)val_sz, (int)cval_max );
188 0 : ulong cpair_sz = fd_vinyl_bstream_pair_sz( cval_sz );
189 :
190 0 : if( FD_UNLIKELY( (!cval_sz) | (cpair_sz>=pair_sz) ) ) {
191 0 : fd_vinyl_io_trim( io, cpair_max );
192 0 : break;
193 0 : }
194 :
195 : /* At this point, we usefully LZ4 compressed the pair val. Trim
196 : the scratch allocation to compressed pair size, prepend the pair
197 : header, clear any zero padding, append the pair footer and start
198 : appending the compressed version to the bstream. */
199 :
200 0 : fd_vinyl_io_trim( io, cpair_max - cpair_sz );
201 :
202 0 : cphdr->ctl = fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_PAIR, FD_VINYL_BSTREAM_CTL_STYLE_LZ4, cval_sz );
203 0 : cphdr->key = phdr->key;
204 0 : cphdr->info = phdr->info;
205 :
206 0 : fd_vinyl_bstream_pair_hash( fd_vinyl_io_seed( io ), (fd_vinyl_bstream_block_t *)cphdr );
207 :
208 0 : *_style = FD_VINYL_BSTREAM_CTL_STYLE_LZ4;
209 0 : *_val_esz = cval_sz;
210 0 : return fd_vinyl_io_append( io, cphdr, cpair_sz );
211 :
212 0 : }
213 :
214 0 : default:
215 0 : FD_LOG_CRIT(( "unsupported style" ));
216 :
217 0 : }
218 :
219 : /* Append in place */
220 :
221 0 : fd_vinyl_bstream_pair_hash( fd_vinyl_io_seed( io ), (fd_vinyl_bstream_block_t *)phdr );
222 :
223 0 : *_style = FD_VINYL_BSTREAM_CTL_STYLE_RAW;
224 0 : *_val_esz = val_sz;
225 0 : return fd_vinyl_io_append( io, phdr, pair_sz );
226 0 : }
|