Line data Source code
1 : #define _GNU_SOURCE
2 : #define _FILE_OFFSET_BITS 64
3 : #include "fd_funk_filemap.h"
4 : #include <sys/types.h>
5 : #include <sys/stat.h>
6 : #include <unistd.h>
7 : #include <errno.h>
8 : #include <fcntl.h>
9 : #include <sys/mman.h>
10 :
11 0 : #define PAGESIZE (1UL<<12) /* 4 KiB */
12 :
13 : fd_funk_t *
14 : fd_funk_open_file( const char * filename,
15 : ulong wksp_tag,
16 : ulong seed,
17 : ulong txn_max,
18 : ulong rec_max,
19 : ulong total_sz,
20 : fd_funk_file_mode_t mode,
21 0 : fd_funk_close_file_args_t * close_args_out ) {
22 :
23 : /* See if we already have the file open */
24 :
25 0 : if( mode == FD_FUNK_READONLY || mode == FD_FUNK_READ_WRITE ) {
26 0 : fd_shmem_join_info_t info;
27 0 : if( !fd_shmem_join_query_by_name("funk", &info) ) {
28 0 : void * shmem = info.join;
29 0 : fd_wksp_t * wksp = fd_wksp_join( shmem );
30 0 : if( FD_UNLIKELY( !wksp ) ) {
31 0 : FD_LOG_ERR(( "fd_wksp_join(%p) failed", shmem ));
32 0 : return NULL;
33 0 : }
34 :
35 0 : fd_wksp_tag_query_info_t info2;
36 0 : if( !fd_wksp_tag_query( wksp, &wksp_tag, 1, &info2, 1 ) ) {
37 0 : FD_LOG_ERR(( "%s does not contain a funky", filename ));
38 0 : return NULL;
39 0 : }
40 :
41 0 : void * funk_shmem = fd_wksp_laddr_fast( wksp, info2.gaddr_lo );
42 0 : fd_funk_t * funk = fd_funk_join( funk_shmem );
43 0 : if( funk == NULL ) {
44 0 : FD_LOG_ERR(( "failed to join a funky" ));
45 0 : return NULL;
46 0 : }
47 :
48 0 : FD_LOG_NOTICE(( "reopened funk with %lu records", fd_funk_rec_cnt( fd_funk_rec_map( funk, wksp ) ) ));
49 :
50 0 : if( close_args_out != NULL ) {
51 0 : close_args_out->shmem = shmem;
52 0 : close_args_out->fd = -1;
53 0 : close_args_out->total_sz = 0;
54 0 : }
55 0 : return funk;
56 0 : }
57 0 : }
58 :
59 : /* Open the file */
60 :
61 0 : int open_flags, can_resize, can_create, do_new;
62 0 : switch (mode) {
63 0 : case FD_FUNK_READONLY:
64 0 : if( filename == NULL || filename[0] == '\0' ) {
65 0 : FD_LOG_ERR(( "mode FD_FUNK_READONLY can not be used with an anonymous workspace, funk file required" ));
66 0 : return NULL;
67 0 : }
68 0 : open_flags = O_RDWR; /* We mark the memory as read-only after we are done setting up */
69 0 : can_create = 0;
70 0 : can_resize = 0;
71 0 : do_new = 0;
72 0 : break;
73 0 : case FD_FUNK_READ_WRITE:
74 0 : if( filename == NULL || filename[0] == '\0' ) {
75 0 : FD_LOG_ERR(( "mode FD_FUNK_READ_WRITE can not be used with an anonymous workspace, funk file required" ));
76 0 : return NULL;
77 0 : }
78 0 : open_flags = O_RDWR;
79 0 : can_create = 0;
80 0 : can_resize = 0;
81 0 : do_new = 0;
82 0 : break;
83 0 : case FD_FUNK_CREATE:
84 0 : open_flags = O_CREAT|O_RDWR;
85 0 : can_create = 1;
86 0 : can_resize = 0;
87 0 : do_new = 0;
88 0 : break;
89 0 : case FD_FUNK_OVERWRITE:
90 0 : open_flags = O_CREAT|O_RDWR;
91 0 : can_create = 1;
92 0 : can_resize = 1;
93 0 : do_new = 1;
94 0 : break;
95 0 : case FD_FUNK_CREATE_EXCL:
96 0 : open_flags = O_CREAT|O_EXCL|O_RDWR;
97 0 : can_create = 1;
98 0 : can_resize = 1;
99 0 : do_new = 1;
100 0 : break;
101 0 : default:
102 0 : FD_LOG_ERR(( "invalid mode when opening %s", filename ));
103 0 : return NULL;
104 0 : }
105 :
106 0 : int fd;
107 0 : if( filename == NULL || filename[0] == '\0' ) {
108 0 : fd = -1; /* Anonymous */
109 0 : do_new = 1;
110 0 : } else {
111 :
112 : /* Open the file */
113 0 : FD_LOG_DEBUG(( "opening %s", filename ));
114 0 : fd = open( filename, open_flags, S_IRUSR|S_IWUSR );
115 0 : if( fd < 0 ) {
116 0 : FD_LOG_ERR(( "error opening %s: %s", filename, strerror(errno) ));
117 0 : return NULL;
118 0 : }
119 :
120 : /* Resize the file */
121 :
122 0 : struct stat statbuf;
123 0 : int r = fstat( fd, &statbuf );
124 0 : if( r < 0 ) {
125 0 : FD_LOG_ERR(( "error opening %s: %s", filename, strerror(errno) ));
126 0 : close( fd );
127 0 : return NULL;
128 0 : }
129 0 : if( (can_create && statbuf.st_size == 0) ||
130 0 : (can_resize && statbuf.st_size != (off_t)total_sz) ) {
131 0 : FD_LOG_DEBUG(( "resizing %s to %lu", filename, total_sz ));
132 0 : if( ftruncate( fd, (off_t)total_sz ) < 0 ) {
133 0 : FD_LOG_ERR(( "error resizing %s: %s", filename, strerror(errno) ));
134 0 : close( fd );
135 0 : return NULL;
136 0 : }
137 0 : do_new = 1;
138 0 : } else {
139 0 : total_sz = (ulong)statbuf.st_size;
140 0 : }
141 0 : }
142 :
143 0 : if( total_sz & (PAGESIZE-1) ) {
144 0 : FD_LOG_ERR(( "file size must be a multiple of a %lu", PAGESIZE ));
145 0 : close( fd );
146 0 : return NULL;
147 0 : }
148 :
149 : /* Force all the disk blocks to be physically allocated to avoid major faults in the future */
150 :
151 0 : if( do_new && fd != -1 ) {
152 0 : FD_LOG_DEBUG(( "zeroing %s", (filename ? filename : "(NULL)") ));
153 0 : uchar zeros[4<<20];
154 0 : memset( zeros, 0, sizeof(zeros) );
155 0 : for( ulong i = 0; i < total_sz; ) {
156 0 : ulong sz = fd_ulong_min( sizeof(zeros), total_sz - i );
157 0 : if( pwrite( fd, zeros, sz, (__off_t)i ) < (ssize_t)sz ) {
158 0 : FD_LOG_ERR(( "error zeroing %s: %s", (filename ? filename : "(NULL)"), strerror(errno) ));
159 0 : close( fd );
160 0 : return NULL;
161 0 : }
162 0 : sync_file_range( fd, (__off64_t)i, (__off64_t)sz, SYNC_FILE_RANGE_WRITE );
163 0 : i += sz;
164 0 : }
165 0 : }
166 :
167 : /* Create the memory map */
168 :
169 0 : FD_LOG_DEBUG(( "mapping %s", (filename ? filename : "(NULL)") ));
170 0 : void * shmem = mmap( NULL, total_sz, (PROT_READ|PROT_WRITE),
171 0 : (fd == -1 ? (MAP_ANONYMOUS|MAP_PRIVATE) : MAP_SHARED), fd, 0 );
172 0 : if( shmem == NULL || shmem == MAP_FAILED ) {
173 0 : FD_LOG_ERR(( "error mapping %s: %s", (filename ? filename : "(NULL)"), strerror(errno) ));
174 0 : close( fd );
175 0 : return NULL;
176 0 : }
177 :
178 0 : if( do_new ) {
179 :
180 : /* Create the data structures */
181 :
182 0 : ulong part_max = fd_wksp_part_max_est( total_sz, 1U<<18U );
183 0 : if( FD_UNLIKELY( !part_max ) ) {
184 0 : FD_LOG_ERR(( "fd_wksp_part_max_est(%lu,64KiB) failed", total_sz ));
185 0 : munmap( shmem, total_sz );
186 0 : close( fd );
187 0 : return NULL;
188 0 : }
189 :
190 0 : ulong data_max = fd_wksp_data_max_est( total_sz, part_max );
191 0 : if( FD_UNLIKELY( !data_max ) ) {
192 0 : FD_LOG_ERR(( "part_max (%lu) too large for footprint %lu", part_max, total_sz ));
193 0 : munmap( shmem, total_sz );
194 0 : close( fd );
195 0 : return NULL;
196 0 : }
197 :
198 0 : FD_LOG_DEBUG(( "creating workspace in %s", (filename ? filename : "(NULL)") ));
199 0 : void * shwksp = fd_wksp_new( shmem, "funk", (uint)seed, part_max, data_max );
200 0 : if( FD_UNLIKELY( !shwksp ) ) {
201 0 : FD_LOG_ERR(( "fd_wksp_new(%p,\"%s\",%lu,%lu,%lu) failed", shmem, "funk", seed, part_max, data_max ));
202 0 : munmap( shmem, total_sz );
203 0 : close( fd );
204 0 : return NULL;
205 0 : }
206 :
207 0 : fd_wksp_t * wksp = fd_wksp_join( shwksp );
208 0 : if( FD_UNLIKELY( !wksp ) ) {
209 0 : FD_LOG_ERR(( "fd_wksp_join(%p) failed", shwksp ));
210 0 : munmap( shmem, total_sz );
211 0 : close( fd );
212 0 : return NULL;
213 0 : }
214 :
215 0 : ulong page_sz = PAGESIZE;
216 0 : ulong page_cnt = total_sz/PAGESIZE;
217 0 : int join_err = fd_shmem_join_anonymous( "funk", FD_SHMEM_JOIN_MODE_READ_WRITE, wksp, shmem, page_sz, page_cnt );
218 0 : if( join_err ) {
219 0 : FD_LOG_WARNING(( "fd_shmem_join_anonymous failed" ));
220 0 : }
221 :
222 0 : FD_LOG_DEBUG(( "creating funk in %s", (filename ? filename : "(NULL)") ));
223 0 : void * funk_shmem = fd_wksp_alloc_laddr( wksp, fd_funk_align(), fd_funk_footprint(), wksp_tag );
224 0 : if( funk_shmem == NULL ) {
225 0 : FD_LOG_ERR(( "failed to allocate a funky" ));
226 0 : munmap( shmem, total_sz );
227 0 : close( fd );
228 0 : return NULL;
229 0 : }
230 :
231 0 : fd_funk_t * funk = fd_funk_join( fd_funk_new( funk_shmem, wksp_tag, seed, txn_max, rec_max ) );
232 0 : if( funk == NULL ) {
233 0 : FD_LOG_ERR(( "failed to allocate a funky" ));
234 0 : munmap( shmem, total_sz );
235 0 : close( fd );
236 0 : return NULL;
237 0 : }
238 :
239 0 : FD_LOG_NOTICE(( "opened funk size %f GB, %lu records, backing file %s", ((double)total_sz)/((double)(1LU<<30)), fd_funk_rec_cnt( fd_funk_rec_map( funk, wksp ) ), (filename ? filename : "(NULL)") ));
240 :
241 0 : if( close_args_out != NULL ) {
242 0 : close_args_out->shmem = shmem;
243 0 : close_args_out->fd = fd;
244 0 : close_args_out->total_sz = total_sz;
245 0 : }
246 0 : return funk;
247 :
248 0 : } else {
249 :
250 : /* Join the data existiing structures */
251 :
252 0 : fd_wksp_t * wksp = fd_wksp_join( shmem );
253 0 : if( FD_UNLIKELY( !wksp ) ) {
254 0 : FD_LOG_ERR(( "fd_wksp_join(%p) failed", shmem ));
255 0 : munmap( shmem, total_sz );
256 0 : close( fd );
257 0 : return NULL;
258 0 : }
259 :
260 0 : ulong page_sz = PAGESIZE;
261 0 : ulong page_cnt = total_sz/PAGESIZE;
262 0 : int join_err = fd_shmem_join_anonymous( "funk", FD_SHMEM_JOIN_MODE_READ_WRITE, wksp, shmem, page_sz, page_cnt );
263 0 : if( join_err ) {
264 0 : FD_LOG_WARNING(( "fd_shmem_join_anonymous failed" ));
265 0 : }
266 :
267 0 : fd_wksp_tag_query_info_t info;
268 0 : if( !fd_wksp_tag_query( wksp, &wksp_tag, 1, &info, 1 ) ) {
269 0 : FD_LOG_ERR(( "%s does not contain a funky", filename ));
270 0 : munmap( shmem, total_sz );
271 0 : close( fd );
272 0 : return NULL;
273 0 : }
274 :
275 0 : void * funk_shmem = fd_wksp_laddr_fast( wksp, info.gaddr_lo );
276 0 : fd_funk_t * funk = fd_funk_join( funk_shmem );
277 0 : if( funk == NULL ) {
278 0 : FD_LOG_ERR(( "failed to join a funky" ));
279 0 : munmap( shmem, total_sz );
280 0 : close( fd );
281 0 : return NULL;
282 0 : }
283 :
284 0 : if( mode == FD_FUNK_READONLY ) {
285 0 : if( FD_UNLIKELY( mprotect( shmem, total_sz, PROT_READ ) ) ) {
286 0 : FD_LOG_WARNING(( "mprotect failed (%i-%s)", errno, fd_io_strerror( errno ) ));
287 0 : }
288 0 : }
289 :
290 0 : FD_LOG_NOTICE(( "opened funk size %f GB, %lu records, backing file %s", ((double)total_sz)/((double)(1LU<<30)), fd_funk_rec_cnt( fd_funk_rec_map( funk, wksp ) ), (filename ? filename : "(NULL)") ));
291 :
292 0 : if( close_args_out != NULL ) {
293 0 : close_args_out->shmem = shmem;
294 0 : close_args_out->fd = fd;
295 0 : close_args_out->total_sz = total_sz;
296 0 : }
297 0 : return funk;
298 0 : }
299 0 : }
300 :
301 : fd_funk_t *
302 : fd_funk_recover_checkpoint( const char * funk_filename,
303 : ulong wksp_tag,
304 : const char * checkpt_filename,
305 0 : fd_funk_close_file_args_t * close_args_out ) {
306 : /* Make the funk workspace match the parameters used to create the
307 : checkpoint. */
308 0 : uint seed;
309 0 : ulong part_max;
310 0 : ulong data_max;
311 0 : int err = fd_wksp_restore_preview( checkpt_filename, &seed, &part_max, &data_max );
312 0 : if( err ) {
313 0 : FD_LOG_ERR(( "unable to preview %s", checkpt_filename ));
314 0 : return NULL;
315 0 : }
316 0 : ulong total_sz = fd_wksp_footprint( part_max, data_max );
317 :
318 0 : int fd;
319 0 : if( funk_filename == NULL || funk_filename[0] == '\0' ) {
320 0 : fd = -1; /* Anonymous */
321 :
322 0 : } else {
323 :
324 : /* Open the file */
325 0 : fd = open( funk_filename, O_CREAT|O_RDWR, S_IRUSR|S_IWUSR );
326 0 : if( fd < 0 ) {
327 0 : FD_LOG_ERR(( "error opening %s: %s", funk_filename, strerror(errno) ));
328 0 : return NULL;
329 0 : }
330 :
331 : /* Resize the file */
332 :
333 0 : struct stat statbuf;
334 0 : int r = fstat( fd, &statbuf );
335 0 : if( r < 0 ) {
336 0 : FD_LOG_ERR(( "error opening %s: %s", funk_filename, strerror(errno) ));
337 0 : close( fd );
338 0 : return NULL;
339 0 : }
340 0 : if( statbuf.st_size != (off_t)total_sz ) {
341 0 : if( ftruncate( fd, (off_t)total_sz ) < 0 ) {
342 0 : FD_LOG_ERR(( "error resizing %s: %s", funk_filename, strerror(errno) ));
343 0 : close( fd );
344 0 : return NULL;
345 0 : }
346 0 : }
347 :
348 : /* Force all the disk blocks to be physically allocated to avoid major faults in the future */
349 :
350 0 : uchar zeros[4<<20];
351 0 : memset( zeros, 0, sizeof(zeros) );
352 0 : for( ulong i = 0; i < total_sz; ) {
353 0 : ulong sz = fd_ulong_min( sizeof(zeros), total_sz - i );
354 0 : if( pwrite( fd, zeros, sz, (__off_t)i ) < (ssize_t)sz ) {
355 0 : FD_LOG_ERR(( "error zeroing %s: %s", (funk_filename ? funk_filename : "(NULL)"), strerror(errno) ));
356 0 : close( fd );
357 0 : return NULL;
358 0 : }
359 0 : sync_file_range( fd, (__off64_t)i, (__off64_t)sz, SYNC_FILE_RANGE_WRITE );
360 0 : i += sz;
361 0 : }
362 0 : }
363 :
364 : /* Create the memory map */
365 :
366 0 : void * shmem = mmap( NULL, total_sz, PROT_READ|PROT_WRITE,
367 0 : (fd == -1 ? (MAP_ANONYMOUS|MAP_PRIVATE) : MAP_SHARED), fd, 0 );
368 :
369 0 : if( shmem == NULL || shmem == MAP_FAILED ) {
370 0 : FD_LOG_ERR(( "error mapping %s: %s", (funk_filename ? funk_filename : "(NULL)"), strerror(errno) ));
371 0 : close( fd );
372 0 : return NULL;
373 0 : }
374 :
375 : /* Create the workspace */
376 :
377 0 : void * shwksp = fd_wksp_new( shmem, "funk", seed, part_max, data_max );
378 0 : if( FD_UNLIKELY( !shwksp ) ) {
379 0 : FD_LOG_ERR(( "fd_wksp_new(%p,\"%s\",%u,%lu,%lu) failed", shmem, "funk", seed, part_max, data_max ));
380 0 : munmap( shmem, total_sz );
381 0 : close( fd );
382 0 : return NULL;
383 0 : }
384 :
385 0 : fd_wksp_t * wksp = fd_wksp_join( shwksp );
386 0 : if( FD_UNLIKELY( !wksp ) ) {
387 0 : FD_LOG_ERR(( "fd_wksp_join(%p) failed", shwksp ));
388 0 : munmap( shmem, total_sz );
389 0 : close( fd );
390 0 : return NULL;
391 0 : }
392 :
393 0 : ulong page_sz = PAGESIZE;
394 0 : ulong page_cnt = total_sz/PAGESIZE;
395 0 : int join_err = fd_shmem_join_anonymous( "funk", FD_SHMEM_JOIN_MODE_READ_WRITE, wksp, shmem, page_sz, page_cnt );
396 0 : if( join_err ) {
397 0 : FD_LOG_ERR(( "fd_shmem_join_anonymous failed" ));
398 0 : munmap( shmem, total_sz );
399 0 : close( fd );
400 0 : return NULL;
401 0 : }
402 :
403 : /* Restore the checkpoint */
404 :
405 0 : if( fd_wksp_restore( wksp, checkpt_filename, seed ) ) {
406 0 : FD_LOG_ERR(( "restoring %s failed", checkpt_filename ));
407 0 : munmap( shmem, total_sz );
408 0 : close( fd );
409 0 : return NULL;
410 0 : }
411 :
412 : /* Let's play find the funk */
413 :
414 0 : fd_wksp_tag_query_info_t info;
415 0 : if( !fd_wksp_tag_query( wksp, &wksp_tag, 1, &info, 1 ) ) {
416 0 : FD_LOG_ERR(( "%s does not contain a funky", checkpt_filename ));
417 0 : munmap( shmem, total_sz );
418 0 : close( fd );
419 0 : return NULL;
420 0 : }
421 :
422 0 : void * funk_shmem = fd_wksp_laddr_fast( wksp, info.gaddr_lo );
423 0 : fd_funk_t * funk = fd_funk_join( funk_shmem );
424 0 : if( funk == NULL ) {
425 0 : FD_LOG_ERR(( "failed to join a funky" ));
426 0 : munmap( shmem, total_sz );
427 0 : close( fd );
428 0 : return NULL;
429 0 : }
430 :
431 0 : FD_LOG_NOTICE(( "opened funk size %f GB, %lu records, backing file %s", ((double)total_sz)/((double)(1LU<<30)), fd_funk_rec_cnt( fd_funk_rec_map( funk, wksp ) ), (funk_filename ? funk_filename : "(NULL)") ));
432 :
433 0 : if( close_args_out != NULL ) {
434 0 : close_args_out->shmem = shmem;
435 0 : close_args_out->fd = fd;
436 0 : close_args_out->total_sz = total_sz;
437 0 : }
438 0 : return funk;
439 0 : }
440 :
441 : void
442 0 : fd_funk_close_file( fd_funk_close_file_args_t * close_args ) {
443 0 : fd_shmem_leave_anonymous( close_args->shmem, NULL );
444 0 : munmap( close_args->shmem, close_args->total_sz );
445 0 : close( close_args->fd );
446 0 : }
|