Line data Source code
1 : #define _GNU_SOURCE
2 : #define _FILE_OFFSET_BITS 64
3 : #include "fd_funk_filemap.h"
4 : #include <sys/types.h>
5 : #include <sys/stat.h>
6 : #include <unistd.h>
7 : #include <errno.h>
8 : #include <fcntl.h>
9 : #include <sys/mman.h>
10 :
11 306 : #define PAGESIZE (1UL<<12) /* 4 KiB */
12 :
13 : fd_funk_t *
14 : fd_funk_open_file( void * ljoin,
15 : const char * filename,
16 : ulong wksp_tag,
17 : ulong seed,
18 : ulong txn_max,
19 : ulong rec_max,
20 : ulong total_sz,
21 : fd_funk_file_mode_t mode,
22 153 : fd_funk_close_file_args_t * close_args_out ) {
23 :
24 : /* See if we already have the file open */
25 :
26 153 : if( mode == FD_FUNK_READONLY || mode == FD_FUNK_READ_WRITE ) {
27 150 : fd_shmem_join_info_t info;
28 150 : if( !fd_shmem_join_query_by_name("funk", &info) ) {
29 0 : void * shmem = info.join;
30 0 : fd_wksp_t * wksp = fd_wksp_join( shmem );
31 0 : if( FD_UNLIKELY( !wksp ) ) {
32 0 : FD_LOG_WARNING(( "fd_wksp_join(%p) failed", shmem ));
33 0 : return NULL;
34 0 : }
35 :
36 0 : fd_wksp_tag_query_info_t info2;
37 0 : if( FD_UNLIKELY( !fd_wksp_tag_query( wksp, &wksp_tag, 1, &info2, 1 ) ) ) {
38 0 : FD_LOG_WARNING(( "%s does not contain a funk database", filename ));
39 0 : return NULL;
40 0 : }
41 :
42 0 : void * funk_shmem = fd_wksp_laddr_fast( wksp, info2.gaddr_lo );
43 0 : fd_funk_t * funk = fd_funk_join( ljoin, funk_shmem );
44 0 : if( FD_UNLIKELY( funk == NULL ) ) {
45 0 : FD_LOG_WARNING(( "Failed to join funk database at %s:0x%lx", fd_wksp_name( wksp ), info2.gaddr_lo ));
46 0 : return NULL;
47 0 : }
48 :
49 0 : if( FD_UNLIKELY( close_args_out != NULL ) ) {
50 0 : close_args_out->shmem = shmem;
51 0 : close_args_out->fd = -1;
52 0 : close_args_out->total_sz = 0;
53 0 : }
54 0 : return funk;
55 0 : }
56 150 : }
57 :
58 : /* Open the file */
59 :
60 153 : int open_flags, can_resize, can_create, do_new;
61 153 : switch( mode ) {
62 0 : case FD_FUNK_READONLY:
63 0 : if( filename == NULL || filename[0] == '\0' ) {
64 0 : FD_LOG_WARNING(( "mode FD_FUNK_READONLY can not be used with an anonymous workspace, funk file required" ));
65 0 : return NULL;
66 0 : }
67 0 : open_flags = O_RDWR; /* We mark the memory as read-only after we are done setting up */
68 0 : can_create = 0;
69 0 : can_resize = 0;
70 0 : do_new = 0;
71 0 : break;
72 150 : case FD_FUNK_READ_WRITE:
73 150 : if( filename == NULL || filename[0] == '\0' ) {
74 0 : FD_LOG_WARNING(( "mode FD_FUNK_READ_WRITE can not be used with an anonymous workspace, funk file required" ));
75 0 : return NULL;
76 0 : }
77 150 : open_flags = O_RDWR;
78 150 : can_create = 0;
79 150 : can_resize = 0;
80 150 : do_new = 0;
81 150 : break;
82 0 : case FD_FUNK_CREATE:
83 0 : open_flags = O_CREAT|O_RDWR;
84 0 : can_create = 1;
85 0 : can_resize = 0;
86 0 : do_new = 0;
87 0 : break;
88 3 : case FD_FUNK_OVERWRITE:
89 3 : open_flags = O_CREAT|O_RDWR;
90 3 : can_create = 1;
91 3 : can_resize = 1;
92 3 : do_new = 1;
93 3 : break;
94 0 : case FD_FUNK_CREATE_EXCL:
95 0 : open_flags = O_CREAT|O_EXCL|O_RDWR;
96 0 : can_create = 1;
97 0 : can_resize = 1;
98 0 : do_new = 1;
99 0 : break;
100 0 : default:
101 0 : FD_LOG_WARNING(( "invalid mode when opening %s", filename ));
102 0 : return NULL;
103 153 : }
104 :
105 153 : int fd;
106 153 : if( FD_UNLIKELY( filename == NULL || filename[0] == '\0' ) ) {
107 0 : fd = -1; /* Anonymous */
108 0 : do_new = 1;
109 153 : } else {
110 :
111 : /* Open the file */
112 153 : FD_LOG_DEBUG(( "opening %s", filename ));
113 153 : fd = open( filename, open_flags, S_IRUSR|S_IWUSR );
114 153 : if( FD_UNLIKELY( fd < 0 ) ) {
115 0 : FD_LOG_WARNING(( "error opening %s: %s", filename, strerror(errno) ));
116 0 : return NULL;
117 0 : }
118 :
119 : /* Resize the file */
120 :
121 153 : struct stat statbuf;
122 153 : int r = fstat( fd, &statbuf );
123 153 : if( FD_UNLIKELY( r < 0 ) ) {
124 0 : FD_LOG_WARNING(( "error opening %s: %s", filename, strerror(errno) ));
125 0 : close( fd );
126 0 : return NULL;
127 0 : }
128 153 : if( (can_create && statbuf.st_size == 0) ||
129 153 : (can_resize && statbuf.st_size != (off_t)total_sz) ) {
130 3 : FD_LOG_DEBUG(( "resizing %s to %lu", filename, total_sz ));
131 3 : if( FD_UNLIKELY( ftruncate( fd, (off_t)total_sz ) < 0 ) ) {
132 0 : FD_LOG_WARNING(( "error resizing %s: %s", filename, strerror(errno) ));
133 0 : close( fd );
134 0 : return NULL;
135 0 : }
136 3 : do_new = 1;
137 150 : } else {
138 150 : total_sz = (ulong)statbuf.st_size;
139 150 : }
140 153 : }
141 :
142 153 : if( FD_UNLIKELY( total_sz & (PAGESIZE-1) ) ) {
143 0 : FD_LOG_WARNING(( "file size must be a multiple of a %lu", PAGESIZE ));
144 0 : close( fd );
145 0 : return NULL;
146 0 : }
147 :
148 : /* Force all the disk blocks to be physically allocated to avoid major faults in the future */
149 :
150 153 : if( do_new & (fd != -1) ) {
151 3 : FD_LOG_DEBUG(( "zeroing %s", (filename ? filename : "(NULL)") ));
152 3 : uchar zeros[4<<20];
153 3 : memset( zeros, 0, sizeof(zeros) );
154 771 : for( ulong i = 0; i < total_sz; ) {
155 768 : ulong sz = fd_ulong_min( sizeof(zeros), total_sz - i );
156 768 : if( FD_UNLIKELY( pwrite( fd, zeros, sz, (__off_t)i ) < (ssize_t)sz ) ) {
157 0 : FD_LOG_WARNING(( "error zeroing %s: %s", (filename ? filename : "(NULL)"), strerror(errno) ));
158 0 : close( fd );
159 0 : return NULL;
160 0 : }
161 768 : sync_file_range( fd, (__off64_t)i, (__off64_t)sz, SYNC_FILE_RANGE_WRITE );
162 768 : i += sz;
163 768 : }
164 3 : }
165 :
166 : /* Create the memory map */
167 :
168 153 : FD_LOG_DEBUG(( "mapping %s", (filename ? filename : "(NULL)") ));
169 153 : void * shmem = mmap( NULL, total_sz, (PROT_READ|PROT_WRITE),
170 153 : (fd == -1 ? (MAP_ANONYMOUS|MAP_PRIVATE) : MAP_SHARED), fd, 0 );
171 153 : if( FD_UNLIKELY ( shmem == MAP_FAILED ) ) {
172 0 : FD_LOG_WARNING(( "error mapping %s: %s", (filename ? filename : "(NULL)"), strerror(errno) ));
173 0 : close( fd );
174 0 : return NULL;
175 0 : }
176 :
177 153 : if( do_new ) {
178 :
179 : /* Create the data structures */
180 :
181 3 : ulong part_max = fd_wksp_part_max_est( total_sz, 1U<<18U );
182 3 : if( FD_UNLIKELY( !part_max ) ) {
183 0 : FD_LOG_WARNING(( "fd_wksp_part_max_est(%lu,64KiB) failed", total_sz ));
184 0 : munmap( shmem, total_sz );
185 0 : close( fd );
186 0 : return NULL;
187 0 : }
188 :
189 3 : ulong data_max = fd_wksp_data_max_est( total_sz, part_max );
190 3 : if( FD_UNLIKELY( !data_max ) ) {
191 0 : FD_LOG_WARNING(( "part_max (%lu) too large for footprint %lu", part_max, total_sz ));
192 0 : munmap( shmem, total_sz );
193 0 : close( fd );
194 0 : return NULL;
195 0 : }
196 :
197 3 : FD_LOG_DEBUG(( "creating workspace in %s", (filename ? filename : "(NULL)") ));
198 3 : void * shwksp = fd_wksp_new( shmem, "funk", (uint)seed, part_max, data_max );
199 3 : if( FD_UNLIKELY( !shwksp ) ) {
200 0 : FD_LOG_WARNING(( "fd_wksp_new(%p,\"%s\",%lu,%lu,%lu) failed", shmem, "funk", seed, part_max, data_max ));
201 0 : munmap( shmem, total_sz );
202 0 : close( fd );
203 0 : return NULL;
204 0 : }
205 :
206 3 : fd_wksp_t * wksp = fd_wksp_join( shwksp );
207 3 : if( FD_UNLIKELY( !wksp ) ) {
208 0 : FD_LOG_WARNING(( "fd_wksp_join(%p) failed", shwksp ));
209 0 : munmap( shmem, total_sz );
210 0 : close( fd );
211 0 : return NULL;
212 0 : }
213 :
214 3 : ulong page_sz = PAGESIZE;
215 3 : ulong page_cnt = total_sz/PAGESIZE;
216 3 : int join_err = fd_shmem_join_anonymous( "funk", FD_SHMEM_JOIN_MODE_READ_WRITE, wksp, shmem, page_sz, page_cnt );
217 3 : if( join_err ) {
218 0 : FD_LOG_WARNING(( "fd_shmem_join_anonymous failed" ));
219 0 : }
220 :
221 3 : FD_LOG_DEBUG(( "creating funk in %s", (filename ? filename : "(NULL)") ));
222 3 : void * funk_shmem = fd_wksp_alloc_laddr( wksp, fd_funk_align(), fd_funk_footprint( txn_max, rec_max ), wksp_tag );
223 3 : if( FD_UNLIKELY(funk_shmem == NULL ) ) {
224 0 : FD_LOG_WARNING(( "failed to allocate a funky" ));
225 0 : munmap( shmem, total_sz );
226 0 : close( fd );
227 0 : return NULL;
228 0 : }
229 :
230 3 : fd_funk_t * funk = fd_funk_join( ljoin, fd_funk_new( funk_shmem, wksp_tag, seed, txn_max, rec_max ) );
231 3 : if( FD_UNLIKELY( funk == NULL ) ) {
232 0 : FD_LOG_WARNING(( "failed to allocate a funky" ));
233 0 : munmap( shmem, total_sz );
234 0 : close( fd );
235 0 : return NULL;
236 0 : }
237 :
238 3 : FD_LOG_NOTICE(( "opened funk size %f GB, backing file %s", ((double)total_sz)/((double)(1LU<<30)), (filename ? filename : "(NULL)") ));
239 :
240 3 : if( FD_UNLIKELY( close_args_out != NULL ) ) {
241 3 : close_args_out->shmem = shmem;
242 3 : close_args_out->fd = fd;
243 3 : close_args_out->total_sz = total_sz;
244 3 : }
245 3 : return funk;
246 :
247 150 : } else {
248 :
249 : /* Join the data existing structures */
250 :
251 150 : fd_wksp_t * wksp = fd_wksp_join( shmem );
252 150 : if( FD_UNLIKELY( !wksp ) ) {
253 0 : FD_LOG_WARNING(( "fd_wksp_join(%p) failed", shmem ));
254 0 : munmap( shmem, total_sz );
255 0 : close( fd );
256 0 : return NULL;
257 0 : }
258 :
259 150 : ulong page_sz = PAGESIZE;
260 150 : ulong page_cnt = total_sz/PAGESIZE;
261 150 : int join_err = fd_shmem_join_anonymous( "funk", FD_SHMEM_JOIN_MODE_READ_WRITE, wksp, shmem, page_sz, page_cnt );
262 150 : if( FD_UNLIKELY( join_err ) ) {
263 0 : FD_LOG_WARNING(( "fd_shmem_join_anonymous failed" ));
264 0 : }
265 :
266 150 : fd_wksp_tag_query_info_t info;
267 150 : if( FD_UNLIKELY( !fd_wksp_tag_query( wksp, &wksp_tag, 1, &info, 1 ) ) ) {
268 0 : FD_LOG_WARNING(( "%s does not contain a funky", filename ));
269 0 : munmap( shmem, total_sz );
270 0 : close( fd );
271 0 : return NULL;
272 0 : }
273 :
274 150 : void * funk_shmem = fd_wksp_laddr_fast( wksp, info.gaddr_lo );
275 150 : fd_funk_t * funk = fd_funk_join( ljoin, funk_shmem );
276 150 : if( FD_UNLIKELY( funk == NULL ) ) {
277 0 : FD_LOG_WARNING(( "failed to join a funky" ));
278 0 : munmap( shmem, total_sz );
279 0 : close( fd );
280 0 : return NULL;
281 0 : }
282 :
283 150 : if( mode == FD_FUNK_READONLY ) {
284 0 : if( FD_UNLIKELY( mprotect( shmem, total_sz, PROT_READ ) ) ) {
285 0 : FD_LOG_WARNING(( "mprotect failed (%i-%s)", errno, fd_io_strerror( errno ) ));
286 0 : }
287 0 : }
288 :
289 150 : FD_LOG_NOTICE(( "opened funk size %f GB, backing file %s", ((double)total_sz)/((double)(1LU<<30)), (filename ? filename : "(NULL)") ));
290 :
291 150 : if( FD_UNLIKELY( close_args_out != NULL ) ) {
292 150 : close_args_out->shmem = shmem;
293 150 : close_args_out->fd = fd;
294 150 : close_args_out->total_sz = total_sz;
295 150 : }
296 150 : return funk;
297 150 : }
298 153 : }
299 :
300 : fd_funk_t *
301 : fd_funk_recover_checkpoint( void * ljoin,
302 : const char * funk_filename,
303 : ulong wksp_tag,
304 : const char * checkpt_filename,
305 0 : fd_funk_close_file_args_t * close_args_out ) {
306 : /* Make the funk workspace match the parameters used to create the
307 : checkpoint. */
308 :
309 0 : fd_wksp_preview_t preview[1];
310 0 : int err = fd_wksp_preview( checkpt_filename, preview );
311 0 : if( FD_UNLIKELY( err ) ) {
312 0 : FD_LOG_WARNING(( "unable to preview %s (%i-%s)", checkpt_filename, err, fd_wksp_strerror( err ) ));
313 0 : return NULL;
314 0 : }
315 0 : uint seed = preview->seed;
316 0 : ulong part_max = preview->part_max;
317 0 : ulong data_max = preview->data_max;
318 :
319 0 : ulong total_sz = fd_wksp_footprint( part_max, data_max );
320 :
321 0 : int fd;
322 0 : if( funk_filename == NULL || funk_filename[0] == '\0' ) {
323 0 : fd = -1; /* Anonymous */
324 :
325 0 : } else {
326 :
327 : /* Open the file */
328 0 : fd = open( funk_filename, O_CREAT|O_RDWR, S_IRUSR|S_IWUSR );
329 0 : if( FD_UNLIKELY( fd < 0 ) ) {
330 0 : FD_LOG_WARNING(( "error opening %s: %s", funk_filename, strerror(errno) ));
331 0 : return NULL;
332 0 : }
333 :
334 : /* Resize the file */
335 :
336 0 : struct stat statbuf;
337 0 : int r = fstat( fd, &statbuf );
338 0 : if( FD_UNLIKELY( r < 0 ) ) {
339 0 : FD_LOG_WARNING(( "error opening %s: %s", funk_filename, strerror(errno) ));
340 0 : close( fd );
341 0 : return NULL;
342 0 : }
343 0 : if( statbuf.st_size != (off_t)total_sz ) {
344 0 : if( FD_UNLIKELY( ftruncate( fd, (off_t)total_sz ) < 0 ) ) {
345 0 : FD_LOG_WARNING(( "error resizing %s: %s", funk_filename, strerror(errno) ));
346 0 : close( fd );
347 0 : return NULL;
348 0 : }
349 0 : }
350 :
351 : /* Force all the disk blocks to be physically allocated to avoid major faults in the future */
352 :
353 0 : uchar zeros[4<<20];
354 0 : memset( zeros, 0, sizeof(zeros) );
355 0 : for( ulong i = 0; i < total_sz; ) {
356 0 : ulong sz = fd_ulong_min( sizeof(zeros), total_sz - i );
357 0 : if( FD_UNLIKELY ( pwrite( fd, zeros, sz, (__off_t)i ) < (ssize_t)sz ) ) {
358 0 : FD_LOG_WARNING(( "error zeroing %s: %s", (funk_filename ? funk_filename : "(NULL)"), strerror(errno) ));
359 0 : close( fd );
360 0 : return NULL;
361 0 : }
362 0 : sync_file_range( fd, (__off64_t)i, (__off64_t)sz, SYNC_FILE_RANGE_WRITE );
363 0 : i += sz;
364 0 : }
365 0 : }
366 :
367 : /* Create the memory map */
368 :
369 0 : void * shmem = mmap( NULL, total_sz, PROT_READ|PROT_WRITE,
370 0 : (fd == -1 ? (MAP_ANONYMOUS|MAP_PRIVATE) : MAP_SHARED), fd, 0 );
371 :
372 0 : if( FD_UNLIKELY( shmem == MAP_FAILED ) ) {
373 0 : FD_LOG_WARNING(( "error mapping %s: %s", (funk_filename ? funk_filename : "(NULL)"), strerror(errno) ));
374 0 : close( fd );
375 0 : return NULL;
376 0 : }
377 :
378 : /* Create the workspace */
379 :
380 0 : void * shwksp = fd_wksp_new( shmem, "funk", seed, part_max, data_max );
381 0 : if( FD_UNLIKELY( !shwksp ) ) {
382 0 : FD_LOG_WARNING(( "fd_wksp_new(%p,\"%s\",%u,%lu,%lu) failed", shmem, "funk", seed, part_max, data_max ));
383 0 : munmap( shmem, total_sz );
384 0 : close( fd );
385 0 : return NULL;
386 0 : }
387 :
388 0 : fd_wksp_t * wksp = fd_wksp_join( shwksp );
389 0 : if( FD_UNLIKELY( !wksp ) ) {
390 0 : FD_LOG_WARNING(( "fd_wksp_join(%p) failed", shwksp ));
391 0 : munmap( shmem, total_sz );
392 0 : close( fd );
393 0 : return NULL;
394 0 : }
395 :
396 0 : ulong page_sz = PAGESIZE;
397 0 : ulong page_cnt = total_sz/PAGESIZE;
398 0 : int join_err = fd_shmem_join_anonymous( "funk", FD_SHMEM_JOIN_MODE_READ_WRITE, wksp, shmem, page_sz, page_cnt );
399 0 : if( FD_UNLIKELY( join_err ) ) {
400 0 : FD_LOG_WARNING(( "fd_shmem_join_anonymous failed" ));
401 0 : munmap( shmem, total_sz );
402 0 : close( fd );
403 0 : return NULL;
404 0 : }
405 :
406 : /* Restore the checkpoint */
407 :
408 0 : if( fd_wksp_restore( wksp, checkpt_filename, seed ) ) {
409 0 : FD_LOG_WARNING(( "restoring %s failed", checkpt_filename ));
410 0 : munmap( shmem, total_sz );
411 0 : close( fd );
412 0 : return NULL;
413 0 : }
414 :
415 : /* Let's play find the funk */
416 :
417 0 : fd_wksp_tag_query_info_t info;
418 0 : if( FD_UNLIKELY( !fd_wksp_tag_query( wksp, &wksp_tag, 1, &info, 1 ) ) ) {
419 0 : FD_LOG_WARNING(( "%s does not contain a funky", checkpt_filename ));
420 0 : munmap( shmem, total_sz );
421 0 : close( fd );
422 0 : return NULL;
423 0 : }
424 :
425 0 : void * funk_shmem = fd_wksp_laddr_fast( wksp, info.gaddr_lo );
426 0 : fd_funk_t * funk = fd_funk_join( ljoin, funk_shmem );
427 0 : if( FD_UNLIKELY( funk == NULL ) ) {
428 0 : FD_LOG_WARNING(( "failed to join a funky" ));
429 0 : munmap( shmem, total_sz );
430 0 : close( fd );
431 0 : return NULL;
432 0 : }
433 :
434 0 : FD_LOG_NOTICE(( "opened funk size %f GB, backing file %s", ((double)total_sz)/((double)(1LU<<30)), (funk_filename ? funk_filename : "(NULL)") ));
435 :
436 0 : if( FD_UNLIKELY( close_args_out != NULL ) ) {
437 0 : close_args_out->shmem = shmem;
438 0 : close_args_out->fd = fd;
439 0 : close_args_out->total_sz = total_sz;
440 0 : }
441 0 : return funk;
442 0 : }
443 :
444 : void
445 153 : fd_funk_close_file( fd_funk_close_file_args_t * close_args ) {
446 153 : fd_shmem_leave_anonymous( close_args->shmem, NULL );
447 153 : munmap( close_args->shmem, close_args->total_sz );
448 153 : close( close_args->fd );
449 153 : }
|