Line data Source code
1 : #include "fd_shredb.h"
2 :
3 : #include <errno.h>
4 : #include <sys/mman.h>
5 : #include <fcntl.h>
6 : #include <unistd.h>
7 :
8 : static inline ulong
9 180 : fd_shredb_max_shreds_for_gib( ulong max_size_gib ) {
10 180 : return (max_size_gib*1024UL*1024UL*1024UL) / sizeof(fd_shredb_entry_t);
11 180 : }
12 :
13 : /* We size the slot map such that it will never fill before we start
14 : evicting from the shred_map/ring buffer. The minimum number of shreds
15 : per slot is 32 (one FEC set), so it is guaranteed that in the worst case
16 : we will be able to represent every FEC set inserted into the database.
17 :
18 : Remember that we will always be inserting complete sets, consisting of
19 : 32 data shreds at a time. */
20 : static inline ulong
21 90 : fd_shredb_max_slots_for_gib( ulong max_size_gib ) {
22 90 : return fd_shredb_max_shreds_for_gib( max_size_gib ) / 32UL;
23 90 : }
24 :
25 : FD_FN_CONST ulong
26 66 : fd_shredb_footprint( ulong max_size_gib ) {
27 66 : if( FD_UNLIKELY( !max_size_gib ) ) return 0UL;
28 :
29 63 : ulong max_shreds = fd_shredb_max_shreds_for_gib( max_size_gib );
30 63 : ulong max_slots = fd_shredb_max_slots_for_gib ( max_size_gib );
31 :
32 63 : int lg_shred_cnt = fd_ulong_find_msb( fd_ulong_pow2_up( max_shreds ) );
33 63 : int lg_slot_cnt = fd_ulong_find_msb( fd_ulong_pow2_up( max_slots ) );
34 :
35 63 : ulong l = FD_LAYOUT_INIT;
36 63 : l = FD_LAYOUT_APPEND( l, alignof(fd_shredb_t), sizeof(fd_shredb_t) );
37 63 : l = FD_LAYOUT_APPEND( l, fd_shredb_shred_map_align(), fd_shredb_shred_map_footprint( lg_shred_cnt ) );
38 63 : l = FD_LAYOUT_APPEND( l, fd_shredb_slot_map_align(), fd_shredb_slot_map_footprint ( lg_slot_cnt ) );
39 63 : return FD_LAYOUT_FINI( l, fd_shredb_align() );
40 66 : }
41 :
42 : void *
43 : fd_shredb_new( void * shmem,
44 : ulong max_size_gib,
45 : char const * file_path,
46 27 : ulong seed ) {
47 27 : if( FD_UNLIKELY( !shmem ) ) {
48 0 : FD_LOG_WARNING(( "NULL shmem" ));
49 0 : return NULL;
50 0 : }
51 :
52 27 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_shredb_align() ) ) ) {
53 0 : FD_LOG_WARNING(( "misaligned shmem" ));
54 0 : return NULL;
55 0 : }
56 :
57 27 : if( FD_UNLIKELY( !file_path ) ) {
58 0 : FD_LOG_WARNING(( "NULL file_path" ));
59 0 : return NULL;
60 0 : }
61 :
62 27 : ulong footprint = fd_shredb_footprint( max_size_gib );
63 27 : if( FD_UNLIKELY( !footprint ) ) {
64 0 : FD_LOG_WARNING(( "bad max_size_gib (%lu)", max_size_gib ));
65 0 : return NULL;
66 0 : }
67 :
68 27 : ulong max_shreds = fd_shredb_max_shreds_for_gib( max_size_gib );
69 27 : ulong max_slots = fd_shredb_max_slots_for_gib ( max_size_gib );
70 :
71 27 : int lg_shred_cnt = fd_ulong_find_msb( fd_ulong_pow2_up( max_shreds ) );
72 27 : int lg_slot_cnt = fd_ulong_find_msb( fd_ulong_pow2_up( max_slots ) );
73 :
74 27 : FD_SCRATCH_ALLOC_INIT( l, shmem );
75 27 : /**/ FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_shredb_t), sizeof(fd_shredb_t) );
76 27 : void * shred_map_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_shredb_shred_map_align(), fd_shredb_shred_map_footprint( lg_shred_cnt ) );
77 27 : void * slot_map_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_shredb_slot_map_align(), fd_shredb_slot_map_footprint ( lg_slot_cnt ) );
78 :
79 27 : fd_shredb_t * store = (fd_shredb_t *)shmem;
80 27 : store->shred_map = fd_shredb_shred_map_new( shred_map_mem, lg_shred_cnt, seed );
81 27 : store->slot_map = fd_shredb_slot_map_new ( slot_map_mem, lg_slot_cnt, seed );
82 :
83 27 : int fd = open( file_path, O_RDWR | O_CREAT | O_TRUNC, (mode_t)0600 );
84 27 : if( FD_UNLIKELY( fd<0 ) ) {
85 0 : FD_LOG_WARNING(( "open(%s) failed (%i-%s)", file_path, errno, fd_io_strerror( errno ) ));
86 0 : return NULL;
87 0 : }
88 :
89 27 : ulong initial_shreds = 128UL;
90 27 : if( FD_UNLIKELY( ftruncate( fd, (off_t)(initial_shreds * sizeof(fd_shredb_entry_t)) ) ) ) {
91 0 : FD_LOG_WARNING(( "ftruncate failed (%i-%s)", errno, fd_io_strerror( errno ) ));
92 0 : close( fd );
93 0 : return NULL;
94 0 : }
95 :
96 27 : ulong mapped_sz = max_shreds * sizeof(fd_shredb_entry_t);
97 27 : void * mapped = mmap( NULL, mapped_sz, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0 );
98 27 : if( FD_UNLIKELY( mapped==MAP_FAILED ) ) {
99 0 : FD_LOG_WARNING(( "mmap failed (%i-%s)", errno, fd_io_strerror( errno ) ));
100 0 : close( fd );
101 0 : return NULL;
102 0 : }
103 :
104 27 : store->max_shreds = max_shreds;
105 27 : store->write_head = 0UL;
106 27 : store->cnt = 0UL;
107 27 : store->mapped_sz = mapped_sz;
108 27 : store->mapped = mapped;
109 27 : store->fd = fd;
110 27 : store->file_shreds = initial_shreds;
111 :
112 27 : FD_TEST( FD_SCRATCH_ALLOC_FINI( l, fd_shredb_align() )==(ulong)shmem + footprint );
113 :
114 27 : return shmem;
115 27 : }
116 :
117 : fd_shredb_t *
118 27 : fd_shredb_join( void * shstore ) {
119 27 : if( FD_UNLIKELY( !shstore ) ) {
120 0 : FD_LOG_WARNING(( "NULL shstore" ));
121 0 : return NULL;
122 0 : }
123 :
124 27 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shstore, fd_shredb_align() ) ) ) {
125 0 : FD_LOG_WARNING(( "misaligned shstore" ));
126 0 : return NULL;
127 0 : }
128 :
129 27 : fd_shredb_t * store = (fd_shredb_t *)shstore;
130 27 : store->shred_map = fd_shredb_shred_map_join( store->shred_map );
131 27 : store->slot_map = fd_shredb_slot_map_join ( store->slot_map );
132 :
133 27 : return (fd_shredb_t *)shstore;
134 27 : }
135 :
136 : void *
137 27 : fd_shredb_leave( fd_shredb_t const * store ) {
138 27 : if( FD_UNLIKELY( !store ) ) {
139 0 : FD_LOG_WARNING(( "NULL store" ));
140 0 : return NULL;
141 0 : }
142 :
143 27 : return (void *)store;
144 27 : }
145 :
146 : void *
147 27 : fd_shredb_delete( void * shstore ) {
148 27 : if( FD_UNLIKELY( !shstore ) ) {
149 0 : FD_LOG_WARNING(( "NULL shstore" ));
150 0 : return NULL;
151 0 : }
152 :
153 27 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shstore, fd_shredb_align() ) ) ) {
154 0 : FD_LOG_WARNING(( "misaligned shstore" ));
155 0 : return NULL;
156 0 : }
157 :
158 27 : fd_shredb_t * store = (fd_shredb_t *)shstore;
159 27 : if( FD_UNLIKELY( !store->mapped ) ) {
160 0 : FD_LOG_WARNING(( "NULL mapped" ));
161 0 : return NULL;
162 0 : }
163 27 : munmap( store->mapped, store->mapped_sz );
164 27 : close( store->fd );
165 :
166 27 : return shstore;
167 27 : }
168 :
169 : static void
170 : fd_shredb_slot_evict( fd_shredb_t * store,
171 : ulong slot,
172 483852 : uint evicted_shred_idx ) {
173 483852 : fd_shredb_slot_entry_t * se = fd_shredb_slot_map_query( store->slot_map, slot, NULL );
174 483852 : FD_TEST( se );
175 :
176 483852 : se->cnt--;
177 483852 : if( FD_UNLIKELY( se->cnt==0UL ) ) {
178 15213 : fd_shredb_slot_map_remove( store->slot_map, se );
179 15213 : return;
180 15213 : }
181 :
182 : /* If the shred evicted was the highest in that slot, walk down and
183 : find the new highest that still exists in the per-shred map. */
184 468639 : if( evicted_shred_idx==se->highest_shred_idx ) {
185 0 : for( uint idx = evicted_shred_idx; ; idx-- ) {
186 0 : ulong key = fd_shredb_key_pack( slot, idx );
187 0 : if( fd_shredb_shred_map_query( store->shred_map, key, NULL ) ) {
188 0 : se->highest_shred_idx = idx;
189 0 : return;
190 0 : }
191 0 : if( FD_UNLIKELY( idx==0U ) ) break;
192 0 : }
193 0 : FD_LOG_ERR(( "corrupt store state" ));
194 0 : }
195 468639 : }
196 :
197 : static inline fd_shredb_entry_t *
198 3120963 : fd_shredb_ring( fd_shredb_t * store ) {
199 3120963 : return (fd_shredb_entry_t *)store->mapped;
200 3120963 : }
201 :
202 : void
203 : fd_shredb_insert( fd_shredb_t * store,
204 : fd_shred_t const * shred,
205 3060699 : ulong shred_sz ) {
206 3060699 : ulong slot = shred->slot;
207 3060699 : uint shred_idx = shred->idx;
208 :
209 3060699 : FD_LOG_DEBUG(( "inserting shred into store (slot=%lu, shred_idx=%u)", slot, shred_idx ));
210 :
211 3060699 : ulong key = fd_shredb_key_pack( slot, shred_idx );
212 3060699 : if( fd_shredb_shred_map_query( store->shred_map, key, NULL ) ) return;
213 :
214 : /* Grow the backing file if the write head has reached the current
215 : file capacity. Double the file size each time (superlinear growth)
216 : until we hit max_shreds, after which the ring simply evicts. */
217 3060699 : if( FD_UNLIKELY( store->write_head>=store->file_shreds ) ) {
218 81 : ulong new_file_shreds = fd_ulong_min( store->file_shreds * 2UL, store->max_shreds );
219 81 : if( FD_UNLIKELY( ftruncate( store->fd, (off_t)(new_file_shreds * sizeof(fd_shredb_entry_t)) ) ) ) {
220 0 : FD_LOG_ERR(( "ftruncate failed (%i-%s)", errno, fd_io_strerror( errno ) ));
221 0 : }
222 81 : store->file_shreds = new_file_shreds;
223 81 : }
224 :
225 3060699 : fd_shredb_entry_t * ring = fd_shredb_ring( store );
226 3060699 : fd_shredb_entry_t * entry = &ring[ store->write_head ];
227 :
228 : /* If this ring entry is occupied, evict the old entry. */
229 3060699 : if( FD_LIKELY( entry->occupied ) ) {
230 483852 : ulong old_slot = fd_shredb_key_slot( entry->key );
231 483852 : uint old_idx = fd_shredb_key_shred_idx( entry->key );
232 :
233 483852 : fd_shredb_shred_entry_t * old = fd_shredb_shred_map_query( store->shred_map, entry->key, NULL );
234 483852 : if( FD_LIKELY( old ) ) fd_shredb_shred_map_remove( store->shred_map, old );
235 :
236 483852 : fd_shredb_slot_evict( store, old_slot, old_idx );
237 483852 : store->cnt--;
238 483852 : }
239 :
240 3060699 : entry->key = key;
241 3060699 : entry->occupied = 1;
242 3060699 : entry->shred_sz = (ushort)shred_sz;
243 3060699 : fd_memcpy( entry->shred, shred, shred_sz );
244 :
245 3060699 : fd_shredb_shred_entry_t * map_entry = fd_shredb_shred_map_insert( store->shred_map, key );
246 3060699 : FD_TEST( map_entry );
247 3060699 : map_entry->ring_idx = store->write_head;
248 :
249 3060699 : fd_shredb_slot_entry_t * se = fd_shredb_slot_map_query( store->slot_map, slot, NULL );
250 3060699 : if( FD_LIKELY( se ) ) {
251 2964915 : se->cnt++;
252 2964915 : se->highest_shred_idx = fd_uint_max( se->highest_shred_idx, shred_idx );
253 2964915 : } else {
254 95784 : se = fd_shredb_slot_map_insert( store->slot_map, slot );
255 95784 : FD_TEST( se );
256 95784 : se->highest_shred_idx = shred_idx;
257 95784 : se->cnt = 1UL;
258 95784 : }
259 :
260 3060699 : store->cnt++;
261 3060699 : store->write_head = (store->write_head + 1UL) % store->max_shreds;
262 3060699 : }
263 :
264 : int
265 : fd_shredb_query( fd_shredb_t * store,
266 : ulong slot,
267 : uint shred_idx,
268 90279 : uchar out[ FD_SHRED_MAX_SZ ] ) {
269 : /* Fast-fail, if we have never heard of this slot, we must have no shreds for it. */
270 90279 : if( !fd_shredb_slot_map_query( store->slot_map, slot, NULL ) ) return -1;
271 :
272 60267 : ulong key = fd_shredb_key_pack( slot, shred_idx );
273 60267 : fd_shredb_shred_entry_t const * map_entry = fd_shredb_shred_map_query( store->shred_map, key, NULL );
274 60267 : if( FD_UNLIKELY( !map_entry ) ) return -1; /* No such shred. */
275 :
276 60264 : fd_shredb_entry_t * ring = fd_shredb_ring( store );
277 60264 : fd_shredb_entry_t * entry = &ring[ map_entry->ring_idx ];
278 :
279 60264 : fd_memcpy( out, entry->shred, entry->shred_sz );
280 60264 : return entry->shred_sz;
281 60267 : }
282 :
283 : int fd_shredb_query_highest( fd_shredb_t * store,
284 : ulong slot,
285 : uint min_shred_idx,
286 0 : uchar out[ FD_SHRED_MAX_SZ ] ) {
287 0 : fd_shredb_slot_entry_t * se = fd_shredb_slot_map_query( store->slot_map, slot, NULL );
288 0 : if( FD_UNLIKELY( !se ) ) return -1;
289 :
290 : /* Check if the highest known index meets the threshold. */
291 0 : if( se->highest_shred_idx < min_shred_idx ) return -1;
292 :
293 0 : ulong key = fd_shredb_key_pack( slot, se->highest_shred_idx );
294 0 : fd_shredb_shred_entry_t const * map_entry = fd_shredb_shred_map_query( store->shred_map, key, NULL );
295 0 : FD_TEST( map_entry );
296 :
297 0 : fd_shredb_entry_t * ring = fd_shredb_ring( store );
298 0 : fd_shredb_entry_t * entry = &ring[ map_entry->ring_idx ];
299 :
300 0 : fd_memcpy( out, entry->shred, entry->shred_sz );
301 0 : return entry->shred_sz;
302 0 : }
|