Line data Source code
1 : #include "fd_funk.h"
2 : #include <sys/types.h>
3 : #include <sys/stat.h>
4 : #include <fcntl.h>
5 : #include <string.h>
6 : #include <errno.h>
7 : #include <unistd.h>
8 : #include "../util/io/fd_io.h"
9 :
10 600 : #define FD_ARCH_MAGIC 0x92a1235fU
11 :
12 : int
13 : fd_funk_archive( fd_funk_t * funk,
14 300 : char const * filename ) {
15 300 : fd_wksp_t * wksp = fd_funk_wksp( funk );
16 300 : fd_funk_rec_t * rec_map = fd_funk_rec_map( funk, wksp );
17 300 : fd_funk_partvec_t * partvec = fd_funk_get_partvec( funk, wksp );
18 :
19 300 : FD_LOG_NOTICE(( "writing %s ...", filename ));
20 :
21 300 : int fd = open( filename, O_CREAT | O_RDWR | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH );
22 300 : if( fd == -1 ) {
23 0 : FD_LOG_WARNING(( "failed to open %s: %s", filename, strerror(errno) ));
24 0 : return FD_FUNK_ERR_SYS;
25 0 : }
26 :
27 300 : fd_io_buffered_ostream_t str;
28 300 : uchar wbuf[1<<17];
29 300 : fd_io_buffered_ostream_init( &str, fd, wbuf, sizeof(wbuf) );
30 300 : ulong tot = 0;
31 :
32 300 : #define ARCH_WRITE(buf, sz) \
33 154452 : do { \
34 154452 : int err = fd_io_buffered_ostream_write( &str, buf, sz); \
35 154452 : if( err ) { \
36 0 : FD_LOG_WARNING(( "failed to write %s: %s", filename, fd_io_strerror(err) )); \
37 0 : close( fd ); \
38 0 : unlink( filename ); \
39 0 : return FD_FUNK_ERR_SYS; \
40 0 : } \
41 154452 : tot += sz; \
42 154452 : } while(0)
43 :
44 300 : uint magic = FD_ARCH_MAGIC;
45 300 : ARCH_WRITE( &magic, sizeof(magic) );
46 300 : ARCH_WRITE( &partvec->num_part, sizeof(partvec->num_part) );
47 :
48 300 : for( fd_funk_rec_map_iter_t iter = fd_funk_rec_map_iter_init( rec_map );
49 29754 : !fd_funk_rec_map_iter_done( rec_map, iter );
50 29454 : iter = fd_funk_rec_map_iter_next( rec_map, iter ) ) {
51 29454 : fd_funk_rec_t * rec = fd_funk_rec_map_iter_ele( rec_map, iter );
52 29454 : ulong txn_idx = fd_funk_txn_idx( rec->txn_cidx );
53 29454 : if( fd_funk_txn_idx_is_null( txn_idx ) ) { /* This is a record from the last published transaction */
54 29454 : uchar type = (uchar)0xa5;
55 29454 : ARCH_WRITE( &type, sizeof(type) );
56 29454 : ARCH_WRITE( rec->pair.key, sizeof(rec->pair.key) );
57 29454 : ARCH_WRITE( &rec->part, sizeof(rec->part) );
58 29454 : ARCH_WRITE( &rec->val_sz, sizeof(rec->val_sz) );
59 29454 : ARCH_WRITE( &rec->flags, sizeof(rec->flags) );
60 29454 : if( rec->val_sz ) {
61 6282 : ARCH_WRITE( fd_wksp_laddr_fast( wksp, rec->val_gaddr ), rec->val_sz );
62 6282 : }
63 29454 : }
64 29454 : }
65 :
66 300 : uchar type = (uchar)0x5a;
67 300 : ARCH_WRITE( &type, sizeof(type) );
68 :
69 300 : int err = fd_io_buffered_ostream_flush( &str );
70 300 : if( err ) {
71 0 : FD_LOG_WARNING(( "failed to write %s: %s", filename, fd_io_strerror(err) ));
72 0 : close( fd );
73 0 : unlink( filename );
74 0 : return FD_FUNK_ERR_SYS;
75 0 : }
76 300 : close( fd );
77 :
78 300 : FD_LOG_NOTICE(( "wrote %lu bytes to %s", tot, filename ));
79 :
80 300 : return FD_FUNK_SUCCESS;
81 300 : }
82 :
83 : int
84 : fd_funk_unarchive( fd_funk_t * funk,
85 300 : char const * filename ) {
86 300 : fd_wksp_t * wksp = fd_funk_wksp( funk );
87 300 : fd_funk_rec_t * rec_map = fd_funk_rec_map( funk, wksp );
88 300 : ulong rec_max = funk->rec_max;
89 300 : fd_alloc_t * alloc = fd_funk_alloc( funk, wksp );
90 :
91 300 : FD_LOG_NOTICE(( "reading %s ...", filename ));
92 :
93 300 : int fd = open( filename, O_RDONLY );
94 300 : if( fd == -1 ) {
95 0 : FD_LOG_WARNING(( "failed to open %s: %s", filename, strerror(errno) ));
96 0 : return FD_FUNK_ERR_SYS;
97 0 : }
98 :
99 300 : fd_io_buffered_istream_t str;
100 300 : uchar rbuf[1<<17];
101 300 : fd_io_buffered_istream_init( &str, fd, rbuf, sizeof(rbuf) );
102 300 : ulong tot = 0;
103 :
104 300 : #define ARCH_READ(buf, sz) \
105 154452 : do { \
106 154452 : int err = fd_io_buffered_istream_read( &str, buf, sz); \
107 154452 : if( err ) { \
108 0 : FD_LOG_WARNING(( "failed to read %s: %s", filename, fd_io_strerror(err) )); \
109 0 : close( fd ); \
110 0 : return FD_FUNK_ERR_SYS; \
111 0 : } \
112 154452 : tot += sz; \
113 154452 : } while(0)
114 :
115 300 : uint magic;
116 300 : ARCH_READ( &magic, sizeof(magic) );
117 300 : if( magic != FD_ARCH_MAGIC ) {
118 0 : FD_LOG_WARNING(( "archive %s has wrong magic number", filename ));
119 0 : close( fd );
120 0 : return FD_FUNK_ERR_SYS;
121 0 : }
122 300 : uint num_part;
123 300 : ARCH_READ( &num_part, sizeof(num_part) );
124 300 : fd_funk_set_num_partitions( funk, num_part );
125 300 : fd_funk_partvec_t * partvec = fd_funk_get_partvec( funk, wksp );
126 :
127 300 : uchar type;
128 300 : fd_funk_xid_key_pair_t pair;
129 300 : fd_memset( &pair, 0, sizeof(pair) );
130 300 : uint part;
131 300 : uint val_sz;
132 300 : ulong flags;
133 :
134 29754 : for(;;) {
135 29754 : ARCH_READ( &type, sizeof(type) );
136 29754 : if( type == (uchar)0x5a ) break;
137 29454 : switch( type ) {
138 :
139 29454 : case (uchar)0xa5: {
140 29454 : ARCH_READ( pair.key, sizeof(pair.key) );
141 29454 : ARCH_READ( &part, sizeof(part) );
142 29454 : ARCH_READ( &val_sz, sizeof(val_sz) );
143 29454 : ARCH_READ( &flags, sizeof(flags) );
144 :
145 29454 : if( FD_UNLIKELY( fd_funk_rec_map_is_full( rec_map ) ) ) {
146 0 : FD_LOG_WARNING(( "archive %s has too many records to fit in given funk", filename ));
147 0 : close( fd );
148 0 : return FD_FUNK_ERR_MEM;
149 0 : }
150 :
151 29454 : fd_funk_rec_t * rec = fd_funk_rec_map_insert( rec_map, &pair );
152 29454 : ulong rec_idx = (ulong)(rec - rec_map);
153 29454 : if( FD_UNLIKELY( rec_idx>=rec_max ) ) FD_LOG_CRIT(( "memory corruption detected (bad idx)" ));
154 :
155 29454 : ulong rec_prev_idx = funk->rec_tail_idx;
156 :
157 29454 : rec->prev_idx = rec_prev_idx;
158 29454 : rec->next_idx = FD_FUNK_REC_IDX_NULL;
159 29454 : rec->txn_cidx = fd_funk_txn_cidx( FD_FUNK_TXN_IDX_NULL );
160 29454 : rec->tag = 0U;
161 29454 : rec->flags = flags;
162 :
163 29454 : int first_born = fd_funk_rec_idx_is_null( rec_prev_idx );
164 29454 : if( first_born ) funk->rec_head_idx = rec_idx;
165 29154 : else rec_map[ rec_prev_idx ].next_idx = rec_idx;
166 :
167 29454 : funk->rec_tail_idx = rec_idx;
168 :
169 29454 : fd_funk_val_init( rec );
170 29454 : if( val_sz ) {
171 6282 : int err;
172 6282 : if( !fd_funk_val_truncate( rec, val_sz, alloc, wksp, &err ) ) {
173 0 : FD_LOG_WARNING(( "archive %s has too much data to fit in given funk wksp", filename ));
174 0 : close( fd );
175 0 : return err;
176 0 : }
177 6282 : ARCH_READ( fd_wksp_laddr_fast( wksp, rec->val_gaddr ), val_sz );
178 6282 : }
179 :
180 29454 : fd_funk_part_init( rec );
181 29454 : if( part != FD_FUNK_PART_NULL ) {
182 5034 : fd_funk_part_set_intern( partvec, rec_map, rec, part );
183 5034 : }
184 29454 : break;
185 29454 : }
186 :
187 0 : default:
188 0 : FD_LOG_WARNING(( "archive %s has unknown record type", filename ));
189 0 : close( fd );
190 0 : return FD_FUNK_ERR_SYS;
191 29454 : }
192 29454 : }
193 :
194 300 : close( fd );
195 :
196 300 : FD_LOG_NOTICE(( "read %lu bytes from %s", tot, filename ));
197 :
198 300 : return FD_FUNK_SUCCESS;
199 300 : }
|