Line data Source code
1 : #define _GNU_SOURCE
2 : #define _FILE_OFFSET_BITS 64
3 : #include "fd_funk_filemap.h"
4 : #include <sys/types.h>
5 : #include <sys/stat.h>
6 : #include <unistd.h>
7 : #include <errno.h>
8 : #include <fcntl.h>
9 : #include <sys/mman.h>
10 :
11 0 : #define PAGESIZE (1UL<<12) /* 4 KiB */
12 :
13 : fd_funk_t *
14 : fd_funk_open_file( const char * filename,
15 : ulong wksp_tag,
16 : ulong seed,
17 : ulong txn_max,
18 : ulong rec_max,
19 : ulong total_sz,
20 : fd_funk_file_mode_t mode,
21 0 : fd_funk_close_file_args_t * close_args_out ) {
22 :
23 : /* See if we already have the file open */
24 :
25 0 : if( mode == FD_FUNK_READONLY || mode == FD_FUNK_READ_WRITE ) {
26 0 : fd_shmem_join_info_t info;
27 0 : if( !fd_shmem_join_query_by_name("funk", &info) ) {
28 0 : void * shmem = info.join;
29 0 : fd_wksp_t * wksp = fd_wksp_join( shmem );
30 0 : if( FD_UNLIKELY( !wksp ) ) {
31 0 : FD_LOG_WARNING(( "fd_wksp_join(%p) failed", shmem ));
32 0 : return NULL;
33 0 : }
34 :
35 0 : fd_wksp_tag_query_info_t info2;
36 0 : if( FD_UNLIKELY( !fd_wksp_tag_query( wksp, &wksp_tag, 1, &info2, 1 ) ) ) {
37 0 : FD_LOG_WARNING(( "%s does not contain a funky", filename ));
38 0 : return NULL;
39 0 : }
40 :
41 0 : void * funk_shmem = fd_wksp_laddr_fast( wksp, info2.gaddr_lo );
42 0 : fd_funk_t * funk = fd_funk_join( funk_shmem );
43 0 : if( FD_UNLIKELY( funk == NULL ) ) {
44 0 : FD_LOG_WARNING(( "failed to join a funky" ));
45 0 : return NULL;
46 0 : }
47 :
48 0 : FD_LOG_NOTICE(( "reopened funk with %lu records", fd_funk_rec_cnt( fd_funk_rec_map( funk, wksp ) ) ));
49 :
50 0 : if( FD_UNLIKELY( close_args_out != NULL ) ) {
51 0 : close_args_out->shmem = shmem;
52 0 : close_args_out->fd = -1;
53 0 : close_args_out->total_sz = 0;
54 0 : }
55 0 : return funk;
56 0 : }
57 0 : }
58 :
59 : /* Open the file */
60 :
61 0 : int open_flags, can_resize, can_create, do_new;
62 0 : switch (mode) {
63 0 : case FD_FUNK_READONLY:
64 0 : if( filename == NULL || filename[0] == '\0' ) {
65 0 : FD_LOG_WARNING(( "mode FD_FUNK_READONLY can not be used with an anonymous workspace, funk file required" ));
66 0 : return NULL;
67 0 : }
68 0 : open_flags = O_RDWR; /* We mark the memory as read-only after we are done setting up */
69 0 : can_create = 0;
70 0 : can_resize = 0;
71 0 : do_new = 0;
72 0 : break;
73 0 : case FD_FUNK_READ_WRITE:
74 0 : if( filename == NULL || filename[0] == '\0' ) {
75 0 : FD_LOG_WARNING(( "mode FD_FUNK_READ_WRITE can not be used with an anonymous workspace, funk file required" ));
76 0 : return NULL;
77 0 : }
78 0 : open_flags = O_RDWR;
79 0 : can_create = 0;
80 0 : can_resize = 0;
81 0 : do_new = 0;
82 0 : break;
83 0 : case FD_FUNK_CREATE:
84 0 : open_flags = O_CREAT|O_RDWR;
85 0 : can_create = 1;
86 0 : can_resize = 0;
87 0 : do_new = 0;
88 0 : break;
89 0 : case FD_FUNK_OVERWRITE:
90 0 : open_flags = O_CREAT|O_RDWR;
91 0 : can_create = 1;
92 0 : can_resize = 1;
93 0 : do_new = 1;
94 0 : break;
95 0 : case FD_FUNK_CREATE_EXCL:
96 0 : open_flags = O_CREAT|O_EXCL|O_RDWR;
97 0 : can_create = 1;
98 0 : can_resize = 1;
99 0 : do_new = 1;
100 0 : break;
101 0 : default:
102 0 : FD_LOG_WARNING(( "invalid mode when opening %s", filename ));
103 0 : return NULL;
104 0 : }
105 :
106 0 : int fd;
107 0 : if( FD_UNLIKELY( filename == NULL || filename[0] == '\0' ) ) {
108 0 : fd = -1; /* Anonymous */
109 0 : do_new = 1;
110 0 : } else {
111 :
112 : /* Open the file */
113 0 : FD_LOG_DEBUG(( "opening %s", filename ));
114 0 : fd = open( filename, open_flags, S_IRUSR|S_IWUSR );
115 0 : if( FD_UNLIKELY( fd < 0 ) ) {
116 0 : FD_LOG_WARNING(( "error opening %s: %s", filename, strerror(errno) ));
117 0 : return NULL;
118 0 : }
119 :
120 : /* Resize the file */
121 :
122 0 : struct stat statbuf;
123 0 : int r = fstat( fd, &statbuf );
124 0 : if( FD_UNLIKELY( r < 0 ) ) {
125 0 : FD_LOG_WARNING(( "error opening %s: %s", filename, strerror(errno) ));
126 0 : close( fd );
127 0 : return NULL;
128 0 : }
129 0 : if( (can_create && statbuf.st_size == 0) ||
130 0 : (can_resize && statbuf.st_size != (off_t)total_sz) ) {
131 0 : FD_LOG_DEBUG(( "resizing %s to %lu", filename, total_sz ));
132 0 : if( FD_UNLIKELY( ftruncate( fd, (off_t)total_sz ) < 0 ) ) {
133 0 : FD_LOG_WARNING(( "error resizing %s: %s", filename, strerror(errno) ));
134 0 : close( fd );
135 0 : return NULL;
136 0 : }
137 0 : do_new = 1;
138 0 : } else {
139 0 : total_sz = (ulong)statbuf.st_size;
140 0 : }
141 0 : }
142 :
143 0 : if( FD_UNLIKELY( total_sz & (PAGESIZE-1) ) ) {
144 0 : FD_LOG_WARNING(( "file size must be a multiple of a %lu", PAGESIZE ));
145 0 : close( fd );
146 0 : return NULL;
147 0 : }
148 :
149 : /* Force all the disk blocks to be physically allocated to avoid major faults in the future */
150 :
151 0 : if( do_new & (fd != -1) ) {
152 0 : FD_LOG_DEBUG(( "zeroing %s", (filename ? filename : "(NULL)") ));
153 0 : uchar zeros[4<<20];
154 0 : memset( zeros, 0, sizeof(zeros) );
155 0 : for( ulong i = 0; i < total_sz; ) {
156 0 : ulong sz = fd_ulong_min( sizeof(zeros), total_sz - i );
157 0 : if( FD_UNLIKELY( pwrite( fd, zeros, sz, (__off_t)i ) < (ssize_t)sz ) ) {
158 0 : FD_LOG_WARNING(( "error zeroing %s: %s", (filename ? filename : "(NULL)"), strerror(errno) ));
159 0 : close( fd );
160 0 : return NULL;
161 0 : }
162 0 : sync_file_range( fd, (__off64_t)i, (__off64_t)sz, SYNC_FILE_RANGE_WRITE );
163 0 : i += sz;
164 0 : }
165 0 : }
166 :
167 : /* Create the memory map */
168 :
169 0 : FD_LOG_DEBUG(( "mapping %s", (filename ? filename : "(NULL)") ));
170 0 : void * shmem = mmap( NULL, total_sz, (PROT_READ|PROT_WRITE),
171 0 : (fd == -1 ? (MAP_ANONYMOUS|MAP_PRIVATE) : MAP_SHARED), fd, 0 );
172 0 : if( FD_UNLIKELY ( shmem == MAP_FAILED ) ) {
173 0 : FD_LOG_WARNING(( "error mapping %s: %s", (filename ? filename : "(NULL)"), strerror(errno) ));
174 0 : close( fd );
175 0 : return NULL;
176 0 : }
177 :
178 0 : if( do_new ) {
179 :
180 : /* Create the data structures */
181 :
182 0 : ulong part_max = fd_wksp_part_max_est( total_sz, 1U<<18U );
183 0 : if( FD_UNLIKELY( !part_max ) ) {
184 0 : FD_LOG_WARNING(( "fd_wksp_part_max_est(%lu,64KiB) failed", total_sz ));
185 0 : munmap( shmem, total_sz );
186 0 : close( fd );
187 0 : return NULL;
188 0 : }
189 :
190 0 : ulong data_max = fd_wksp_data_max_est( total_sz, part_max );
191 0 : if( FD_UNLIKELY( !data_max ) ) {
192 0 : FD_LOG_WARNING(( "part_max (%lu) too large for footprint %lu", part_max, total_sz ));
193 0 : munmap( shmem, total_sz );
194 0 : close( fd );
195 0 : return NULL;
196 0 : }
197 :
198 0 : FD_LOG_DEBUG(( "creating workspace in %s", (filename ? filename : "(NULL)") ));
199 0 : void * shwksp = fd_wksp_new( shmem, "funk", (uint)seed, part_max, data_max );
200 0 : if( FD_UNLIKELY( !shwksp ) ) {
201 0 : FD_LOG_WARNING(( "fd_wksp_new(%p,\"%s\",%lu,%lu,%lu) failed", shmem, "funk", seed, part_max, data_max ));
202 0 : munmap( shmem, total_sz );
203 0 : close( fd );
204 0 : return NULL;
205 0 : }
206 :
207 0 : fd_wksp_t * wksp = fd_wksp_join( shwksp );
208 0 : if( FD_UNLIKELY( !wksp ) ) {
209 0 : FD_LOG_WARNING(( "fd_wksp_join(%p) failed", shwksp ));
210 0 : munmap( shmem, total_sz );
211 0 : close( fd );
212 0 : return NULL;
213 0 : }
214 :
215 0 : ulong page_sz = PAGESIZE;
216 0 : ulong page_cnt = total_sz/PAGESIZE;
217 0 : int join_err = fd_shmem_join_anonymous( "funk", FD_SHMEM_JOIN_MODE_READ_WRITE, wksp, shmem, page_sz, page_cnt );
218 0 : if( join_err ) {
219 0 : FD_LOG_WARNING(( "fd_shmem_join_anonymous failed" ));
220 0 : }
221 :
222 0 : FD_LOG_DEBUG(( "creating funk in %s", (filename ? filename : "(NULL)") ));
223 0 : void * funk_shmem = fd_wksp_alloc_laddr( wksp, fd_funk_align(), fd_funk_footprint(), wksp_tag );
224 0 : if( FD_UNLIKELY(funk_shmem == NULL ) ) {
225 0 : FD_LOG_WARNING(( "failed to allocate a funky" ));
226 0 : munmap( shmem, total_sz );
227 0 : close( fd );
228 0 : return NULL;
229 0 : }
230 :
231 0 : fd_funk_t * funk = fd_funk_join( fd_funk_new( funk_shmem, wksp_tag, seed, txn_max, rec_max ) );
232 0 : if( FD_UNLIKELY( funk == NULL ) ) {
233 0 : FD_LOG_WARNING(( "failed to allocate a funky" ));
234 0 : munmap( shmem, total_sz );
235 0 : close( fd );
236 0 : return NULL;
237 0 : }
238 :
239 0 : FD_LOG_NOTICE(( "opened funk size %f GB, %lu records, backing file %s", ((double)total_sz)/((double)(1LU<<30)), fd_funk_rec_cnt( fd_funk_rec_map( funk, wksp ) ), (filename ? filename : "(NULL)") ));
240 :
241 0 : if( FD_UNLIKELY( close_args_out != NULL ) ) {
242 0 : close_args_out->shmem = shmem;
243 0 : close_args_out->fd = fd;
244 0 : close_args_out->total_sz = total_sz;
245 0 : }
246 0 : return funk;
247 :
248 0 : } else {
249 :
250 : /* Join the data existing structures */
251 :
252 0 : fd_wksp_t * wksp = fd_wksp_join( shmem );
253 0 : if( FD_UNLIKELY( !wksp ) ) {
254 0 : FD_LOG_WARNING(( "fd_wksp_join(%p) failed", shmem ));
255 0 : munmap( shmem, total_sz );
256 0 : close( fd );
257 0 : return NULL;
258 0 : }
259 :
260 0 : ulong page_sz = PAGESIZE;
261 0 : ulong page_cnt = total_sz/PAGESIZE;
262 0 : int join_err = fd_shmem_join_anonymous( "funk", FD_SHMEM_JOIN_MODE_READ_WRITE, wksp, shmem, page_sz, page_cnt );
263 0 : if( FD_UNLIKELY( join_err ) ) {
264 0 : FD_LOG_WARNING(( "fd_shmem_join_anonymous failed" ));
265 0 : }
266 :
267 0 : fd_wksp_tag_query_info_t info;
268 0 : if( FD_UNLIKELY( !fd_wksp_tag_query( wksp, &wksp_tag, 1, &info, 1 ) ) ) {
269 0 : FD_LOG_WARNING(( "%s does not contain a funky", filename ));
270 0 : munmap( shmem, total_sz );
271 0 : close( fd );
272 0 : return NULL;
273 0 : }
274 :
275 0 : void * funk_shmem = fd_wksp_laddr_fast( wksp, info.gaddr_lo );
276 0 : fd_funk_t * funk = fd_funk_join( funk_shmem );
277 0 : if( FD_UNLIKELY( funk == NULL ) ) {
278 0 : FD_LOG_WARNING(( "failed to join a funky" ));
279 0 : munmap( shmem, total_sz );
280 0 : close( fd );
281 0 : return NULL;
282 0 : }
283 :
284 0 : if( mode == FD_FUNK_READONLY ) {
285 0 : if( FD_UNLIKELY( mprotect( shmem, total_sz, PROT_READ ) ) ) {
286 0 : FD_LOG_WARNING(( "mprotect failed (%i-%s)", errno, fd_io_strerror( errno ) ));
287 0 : }
288 0 : }
289 :
290 0 : FD_LOG_NOTICE(( "opened funk size %f GB, %lu records, backing file %s", ((double)total_sz)/((double)(1LU<<30)), fd_funk_rec_cnt( fd_funk_rec_map( funk, wksp ) ), (filename ? filename : "(NULL)") ));
291 :
292 0 : if( FD_UNLIKELY( close_args_out != NULL ) ) {
293 0 : close_args_out->shmem = shmem;
294 0 : close_args_out->fd = fd;
295 0 : close_args_out->total_sz = total_sz;
296 0 : }
297 0 : return funk;
298 0 : }
299 0 : }
300 :
301 : fd_funk_t *
302 : fd_funk_recover_checkpoint( const char * funk_filename,
303 : ulong wksp_tag,
304 : const char * checkpt_filename,
305 0 : fd_funk_close_file_args_t * close_args_out ) {
306 : /* Make the funk workspace match the parameters used to create the
307 : checkpoint. */
308 :
309 0 : fd_wksp_preview_t preview[1];
310 0 : int err = fd_wksp_preview( checkpt_filename, preview );
311 0 : if( FD_UNLIKELY( err ) ) {
312 0 : FD_LOG_WARNING(( "unable to preview %s (%i-%s)", checkpt_filename, err, fd_wksp_strerror( err ) ));
313 0 : return NULL;
314 0 : }
315 0 : uint seed = preview->seed;
316 0 : ulong part_max = preview->part_max;
317 0 : ulong data_max = preview->data_max;
318 :
319 0 : ulong total_sz = fd_wksp_footprint( part_max, data_max );
320 :
321 0 : int fd;
322 0 : if( funk_filename == NULL || funk_filename[0] == '\0' ) {
323 0 : fd = -1; /* Anonymous */
324 :
325 0 : } else {
326 :
327 : /* Open the file */
328 0 : fd = open( funk_filename, O_CREAT|O_RDWR, S_IRUSR|S_IWUSR );
329 0 : if( FD_UNLIKELY( fd < 0 ) ) {
330 0 : FD_LOG_WARNING(( "error opening %s: %s", funk_filename, strerror(errno) ));
331 0 : return NULL;
332 0 : }
333 :
334 : /* Resize the file */
335 :
336 0 : struct stat statbuf;
337 0 : int r = fstat( fd, &statbuf );
338 0 : if( FD_UNLIKELY( r < 0 ) ) {
339 0 : FD_LOG_WARNING(( "error opening %s: %s", funk_filename, strerror(errno) ));
340 0 : close( fd );
341 0 : return NULL;
342 0 : }
343 0 : if( statbuf.st_size != (off_t)total_sz ) {
344 0 : if( FD_UNLIKELY( ftruncate( fd, (off_t)total_sz ) < 0 ) ) {
345 0 : FD_LOG_WARNING(( "error resizing %s: %s", funk_filename, strerror(errno) ));
346 0 : close( fd );
347 0 : return NULL;
348 0 : }
349 0 : }
350 :
351 : /* Force all the disk blocks to be physically allocated to avoid major faults in the future */
352 :
353 0 : uchar zeros[4<<20];
354 0 : memset( zeros, 0, sizeof(zeros) );
355 0 : for( ulong i = 0; i < total_sz; ) {
356 0 : ulong sz = fd_ulong_min( sizeof(zeros), total_sz - i );
357 0 : if( FD_UNLIKELY ( pwrite( fd, zeros, sz, (__off_t)i ) < (ssize_t)sz ) ) {
358 0 : FD_LOG_WARNING(( "error zeroing %s: %s", (funk_filename ? funk_filename : "(NULL)"), strerror(errno) ));
359 0 : close( fd );
360 0 : return NULL;
361 0 : }
362 0 : sync_file_range( fd, (__off64_t)i, (__off64_t)sz, SYNC_FILE_RANGE_WRITE );
363 0 : i += sz;
364 0 : }
365 0 : }
366 :
367 : /* Create the memory map */
368 :
369 0 : void * shmem = mmap( NULL, total_sz, PROT_READ|PROT_WRITE,
370 0 : (fd == -1 ? (MAP_ANONYMOUS|MAP_PRIVATE) : MAP_SHARED), fd, 0 );
371 :
372 0 : if( FD_UNLIKELY( shmem == MAP_FAILED ) ) {
373 0 : FD_LOG_WARNING(( "error mapping %s: %s", (funk_filename ? funk_filename : "(NULL)"), strerror(errno) ));
374 0 : close( fd );
375 0 : return NULL;
376 0 : }
377 :
378 : /* Create the workspace */
379 :
380 0 : void * shwksp = fd_wksp_new( shmem, "funk", seed, part_max, data_max );
381 0 : if( FD_UNLIKELY( !shwksp ) ) {
382 0 : FD_LOG_WARNING(( "fd_wksp_new(%p,\"%s\",%u,%lu,%lu) failed", shmem, "funk", seed, part_max, data_max ));
383 0 : munmap( shmem, total_sz );
384 0 : close( fd );
385 0 : return NULL;
386 0 : }
387 :
388 0 : fd_wksp_t * wksp = fd_wksp_join( shwksp );
389 0 : if( FD_UNLIKELY( !wksp ) ) {
390 0 : FD_LOG_WARNING(( "fd_wksp_join(%p) failed", shwksp ));
391 0 : munmap( shmem, total_sz );
392 0 : close( fd );
393 0 : return NULL;
394 0 : }
395 :
396 0 : ulong page_sz = PAGESIZE;
397 0 : ulong page_cnt = total_sz/PAGESIZE;
398 0 : int join_err = fd_shmem_join_anonymous( "funk", FD_SHMEM_JOIN_MODE_READ_WRITE, wksp, shmem, page_sz, page_cnt );
399 0 : if( FD_UNLIKELY( join_err ) ) {
400 0 : FD_LOG_WARNING(( "fd_shmem_join_anonymous failed" ));
401 0 : munmap( shmem, total_sz );
402 0 : close( fd );
403 0 : return NULL;
404 0 : }
405 :
406 : /* Restore the checkpoint */
407 :
408 0 : if( fd_wksp_restore( wksp, checkpt_filename, seed ) ) {
409 0 : FD_LOG_WARNING(( "restoring %s failed", checkpt_filename ));
410 0 : munmap( shmem, total_sz );
411 0 : close( fd );
412 0 : return NULL;
413 0 : }
414 :
415 : /* Let's play find the funk */
416 :
417 0 : fd_wksp_tag_query_info_t info;
418 0 : if( FD_UNLIKELY( !fd_wksp_tag_query( wksp, &wksp_tag, 1, &info, 1 ) ) ) {
419 0 : FD_LOG_WARNING(( "%s does not contain a funky", checkpt_filename ));
420 0 : munmap( shmem, total_sz );
421 0 : close( fd );
422 0 : return NULL;
423 0 : }
424 :
425 0 : void * funk_shmem = fd_wksp_laddr_fast( wksp, info.gaddr_lo );
426 0 : fd_funk_t * funk = fd_funk_join( funk_shmem );
427 0 : if( FD_UNLIKELY( funk == NULL ) ) {
428 0 : FD_LOG_WARNING(( "failed to join a funky" ));
429 0 : munmap( shmem, total_sz );
430 0 : close( fd );
431 0 : return NULL;
432 0 : }
433 :
434 0 : FD_LOG_NOTICE(( "opened funk size %f GB, %lu records, backing file %s", ((double)total_sz)/((double)(1LU<<30)), fd_funk_rec_cnt( fd_funk_rec_map( funk, wksp ) ), (funk_filename ? funk_filename : "(NULL)") ));
435 :
436 0 : if( FD_UNLIKELY( close_args_out != NULL ) ) {
437 0 : close_args_out->shmem = shmem;
438 0 : close_args_out->fd = fd;
439 0 : close_args_out->total_sz = total_sz;
440 0 : }
441 0 : return funk;
442 0 : }
443 :
444 : void
445 0 : fd_funk_close_file( fd_funk_close_file_args_t * close_args ) {
446 0 : fd_shmem_leave_anonymous( close_args->shmem, NULL );
447 0 : munmap( close_args->shmem, close_args->total_sz );
448 0 : close( close_args->fd );
449 0 : }
|