Line data Source code
1 : #include "../../util/fd_util.h"
2 : #include "../../tango/fd_tango.h"
3 : #include "../../util/archive/fd_tar.h"
4 : #include <errno.h>
5 : #include <stdio.h>
6 : #include <stdlib.h>
7 : #include <zstd.h>
8 :
9 : #define SNAPMK_MAGIC (0xf212f209fd944ba2UL)
10 0 : #define SNAPMK_PARA_ENABLE (0x72701281047a55b8UL)
11 0 : #define SNAPMK_PARA_DISABLE (0xd629be3208ad6fb4UL)
12 :
13 : #define WKSP_TAG (1UL)
14 :
15 0 : #define COMP_TILE_MAX (63UL)
16 :
17 0 : #define ORIG_PARA_ENABLE 1 /* start of parallel section */
18 0 : #define ORIG_PARA_DISABLE 2 /* end of parallel section */
19 0 : #define ORIG_SHUTDOWN 3 /* shutdown signal */
20 :
21 : struct link {
22 : fd_frag_meta_t * mcache;
23 : uchar * dcache;
24 : ulong chunk0;
25 : ulong chunk;
26 : ulong wmark;
27 : };
28 :
29 : typedef struct link link_t;
30 :
31 : static struct {
32 : fd_wksp_t * wksp;
33 : ulong comp_cnt;
34 : ulong comp_depth;
35 : ulong comp_mtu;
36 : ulong * wr_fseqs [ COMP_TILE_MAX ];
37 : link_t tar_links[ COMP_TILE_MAX ];
38 : link_t zst_links[ COMP_TILE_MAX ];
39 : ulong * comp_fseq[ COMP_TILE_MAX ];
40 : FILE * in_file;
41 : ulong in_file_sz;
42 : FILE * out_file;
43 : ulong frame_sz;
44 : } glob;
45 :
46 : static int
47 : rd_tile_exec( int argc,
48 0 : char ** argv ) {
49 0 : (void)argc; (void)argv;
50 :
51 0 : fd_wksp_t * wksp = glob.wksp;
52 0 : FILE * in_file = glob.in_file;
53 0 : ulong in_file_sz = glob.in_file_sz;
54 0 : ulong comp_cnt = glob.comp_cnt;
55 0 : ulong comp_depth = glob.comp_depth;
56 0 : link_t * tar_links = glob.tar_links;
57 0 : ulong ** comp_fseqs = glob.comp_fseq;
58 0 : ulong out_seqs [ COMP_TILE_MAX ] = {0UL};
59 0 : fd_fctl_t * fctls [ COMP_TILE_MAX ] = {0UL};
60 0 : ulong cr_avails[ COMP_TILE_MAX ] = {0UL};
61 0 : ulong mtu = glob.comp_mtu;
62 0 : ulong frame_off = 0UL;
63 :
64 0 : ulong slow_diag = 0UL;
65 0 : uchar fctl_mem[ COMP_TILE_MAX*FD_FCTL_FOOTPRINT( 1UL ) ] __attribute__((aligned(FD_FCTL_ALIGN)));
66 0 : for( ulong i=0UL; i<comp_cnt; i++ ) {
67 0 : fd_fctl_t * fctl = fd_fctl_join( fd_fctl_new( fctl_mem+(i*FD_FCTL_FOOTPRINT( 1UL )), COMP_TILE_MAX ) );
68 0 : FD_TEST( fctl );
69 0 : FD_TEST( fd_fctl_cfg_rx_add( fctl, comp_depth, comp_fseqs[i], &slow_diag ) );
70 0 : FD_TEST( fd_fctl_cfg_done( fctl, 1UL, 0UL, 0UL, 0UL ) );
71 0 : fctls[ i ] = fctl;
72 0 : }
73 :
74 : /* Enable load-balancing once first accounts/ file was found */
75 0 : _Bool enable_lb = 0;
76 :
77 0 : FD_LOG_NOTICE(( "Reader start" ));
78 :
79 0 : ulong out_idx = 0UL;
80 0 : ulong last_stat_off = 0UL;
81 0 : long last_stat = fd_log_wallclock();
82 0 : for(;;) {
83 0 : long off = ftell( in_file );
84 0 : if( FD_LIKELY( off>=0L ) ) {
85 0 : ulong since_last_stat = (ulong)off - last_stat_off;
86 0 : if( FD_UNLIKELY( since_last_stat>=(1UL<<27) ) ) {
87 0 : long now = fd_log_wallclock();
88 0 : last_stat_off = (ulong)off;
89 0 : FD_LOG_NOTICE(( "%8.3f / %8.3f GB (%4.1f %%) %8.2f MB/s",
90 0 : (double)last_stat_off/1e9,
91 0 : (double)in_file_sz /1e9,
92 0 : 100.0 * (double)last_stat_off/(double)in_file_sz,
93 0 : ((double)since_last_stat*1e3) / (double)(now - last_stat) ));
94 0 : last_stat = now;
95 0 : }
96 0 : }
97 :
98 : /* Process TAR header */
99 :
100 0 : ulong chunk = tar_links[ out_idx ].chunk;
101 0 : void * chunk_laddr = fd_chunk_to_laddr( wksp, chunk );
102 0 : union {
103 0 : fd_tar_meta_t hdr;
104 0 : uchar buf[512];
105 0 : } * tar = chunk_laddr;
106 0 : if( FD_UNLIKELY( fread( tar, sizeof(tar->hdr), 1UL, in_file )!=1UL ) ) {
107 0 : int err = ferror( in_file );
108 0 : FD_LOG_ERR(( "fread failed (%i-%s)", err, fd_io_strerror( err ) ));
109 0 : }
110 :
111 0 : if( FD_UNLIKELY( memcmp( tar->hdr.magic, FD_TAR_MAGIC, 5UL ) ) ) {
112 0 : int not_zero = 0;
113 0 : for( ulong i=0UL; i<512UL; i++ ) not_zero |= tar->buf[i];
114 0 : if( FD_UNLIKELY( not_zero ) ) FD_LOG_ERR(( "invalid tar header magic `%s`", tar->hdr.magic ));
115 :
116 : /* EOF marker reached */
117 :
118 : /* Broadcast barrier signal, for non-zero tile also shutdown signal */
119 0 : for( ulong out2=0UL; out2<comp_cnt; out2++ ) {
120 0 : while( cr_avails[ out2 ]<3 ) {
121 0 : cr_avails[ out2 ] = fd_fctl_tx_cr_update( fctls[ out2 ], cr_avails[ out2 ], out_seqs[ out2 ] );
122 0 : }
123 0 : fd_mcache_publish(
124 0 : tar_links[ out2 ].mcache,
125 0 : comp_depth,
126 0 : out_seqs[ out2 ]++,
127 0 : 0UL,
128 0 : 0UL,
129 0 : 0UL,
130 0 : fd_frag_meta_ctl( 0UL, 0, 1, 0 ),
131 0 : 0UL,
132 0 : 0UL
133 0 : );
134 0 : fd_mcache_publish(
135 0 : tar_links[ out2 ].mcache,
136 0 : comp_depth,
137 0 : out_seqs[ out2 ]++,
138 0 : 0UL,
139 0 : 0UL,
140 0 : 0UL,
141 0 : fd_frag_meta_ctl( ORIG_PARA_DISABLE, 0, 1, 0 ),
142 0 : 0UL,
143 0 : 0UL
144 0 : );
145 0 : cr_avails[ out2 ]--;
146 0 : if( out2>0UL ) {
147 0 : fd_mcache_publish(
148 0 : tar_links[ out2 ].mcache,
149 0 : comp_depth,
150 0 : out_seqs[ out2 ]++,
151 0 : 0UL,
152 0 : 0UL,
153 0 : 0UL,
154 0 : fd_frag_meta_ctl( ORIG_SHUTDOWN, 0, 1, 0 ),
155 0 : 0UL,
156 0 : 0UL
157 0 : );
158 0 : cr_avails[ out2 ]--;
159 0 : }
160 0 : }
161 :
162 : /* Seek back since we need to retransmit EOF marker */
163 0 : if( FD_UNLIKELY( fseek( in_file, -512L, SEEK_CUR ) ) ) {
164 0 : FD_LOG_ERR(( "fseek failed (%i-%s)", errno, fd_io_strerror( errno ) ));
165 0 : }
166 :
167 0 : break;
168 0 : }
169 :
170 0 : ulong const file_sz = fd_tar_meta_get_size( &tar->hdr );
171 0 : if( FD_UNLIKELY( file_sz==ULONG_MAX ) ) FD_LOG_ERR(( "invalid tar file size" ));
172 :
173 0 : if( FD_UNLIKELY( tar->hdr.typeflag!=FD_TAR_TYPE_DIR && !fd_tar_meta_is_reg( &tar->hdr ) ) ) {
174 0 : FD_LOG_WARNING(( "invalid tar header type %d", tar->hdr.typeflag ));
175 0 : }
176 0 : ulong const align_sz = fd_ulong_align_up( file_sz, 512UL );
177 :
178 : /* See if we can switch to load-balancing */
179 :
180 0 : if( FD_UNLIKELY( !enable_lb ) ) {
181 0 : if( 0==strncmp( tar->hdr.name, "accounts/", 9UL ) ) {
182 : /* Send barrier signal */
183 0 : while( !cr_avails[ out_idx ] ) {
184 0 : cr_avails[ out_idx ] = fd_fctl_tx_cr_update( fctls[ out_idx ], cr_avails[ out_idx ], out_seqs[ out_idx ] );
185 0 : }
186 0 : fd_mcache_publish(
187 0 : tar_links[ out_idx ].mcache,
188 0 : comp_depth,
189 0 : out_seqs[ out_idx ]++,
190 0 : 0UL,
191 0 : 0UL,
192 0 : 0UL,
193 0 : fd_frag_meta_ctl( ORIG_PARA_ENABLE, 0, 1, 0 ),
194 0 : 0UL,
195 0 : 0UL
196 0 : );
197 : /* Poll for barrier receive */
198 0 : ulong * wr_fseq = glob.wr_fseqs[0];
199 0 : for(;;) {
200 0 : FD_COMPILER_MFENCE();
201 0 : ulong sig = FD_VOLATILE_CONST( wr_fseq[1] );
202 0 : FD_COMPILER_MFENCE();
203 0 : if( sig==1UL ) break;
204 0 : FD_SPIN_PAUSE();
205 0 : }
206 0 : FD_LOG_NOTICE(( "Reader enabling load-balancing" ));
207 0 : enable_lb = 1;
208 0 : }
209 0 : }
210 :
211 : /* Send data frags */
212 :
213 0 : _Bool eom = 0;
214 0 : ulong rem = align_sz;
215 0 : uchar * data = (uchar *)( tar+1 );
216 0 : ulong data_max = mtu - 512UL;
217 0 : do {
218 0 : while( !cr_avails[ out_idx ] ) {
219 0 : cr_avails[ out_idx ] = fd_fctl_tx_cr_update( fctls[ out_idx ], cr_avails[ out_idx ], out_seqs[ out_idx ] );
220 0 : }
221 :
222 0 : ulong data_sz = fd_ulong_min( rem, data_max );
223 0 : if( data_sz ) {
224 0 : if( FD_UNLIKELY( fread( data, data_sz, 1UL, in_file )!=1UL ) ) {
225 0 : FD_LOG_ERR(( "fread failed (%i-%s)", errno, fd_io_strerror( errno ) ));
226 0 : }
227 0 : }
228 0 : rem -= data_sz;
229 0 : if( !rem ) {
230 0 : frame_off += 512UL + align_sz;
231 0 : eom = frame_off>=glob.frame_sz;
232 0 : }
233 :
234 : //if( eom ) FD_LOG_NOTICE(( "finished burst out_idx=%lu out_seq=%lu sz=%lu", out_idx, out_seqs[ out_idx ], frame_off ));
235 0 : ulong frag_sz = (ulong)data+data_sz-(ulong)chunk_laddr;
236 0 : fd_mcache_publish(
237 0 : tar_links[ out_idx ].mcache,
238 0 : comp_depth,
239 0 : out_seqs[ out_idx ]++,
240 0 : frag_sz,
241 0 : chunk,
242 0 : 0UL,
243 0 : fd_frag_meta_ctl( 0UL, 0, eom, 0 ),
244 0 : 0UL,
245 0 : 0UL
246 0 : );
247 0 : cr_avails[ out_idx ]--;
248 :
249 0 : chunk = fd_dcache_compact_next( chunk, frag_sz, tar_links[ out_idx ].chunk0, tar_links[ out_idx ].wmark );
250 0 : chunk_laddr = fd_chunk_to_laddr( wksp, chunk );
251 0 : data = chunk_laddr;
252 0 : data_max = mtu;
253 0 : tar_links[ out_idx ].chunk = chunk;
254 0 : } while( rem );
255 :
256 : /* Select next index */
257 :
258 0 : if( eom && enable_lb ) {
259 0 : frame_off = 0UL;
260 0 : out_idx++;
261 0 : if( out_idx>=comp_cnt ) out_idx = 0UL;
262 0 : }
263 0 : }
264 :
265 : /* Send tail end of data to tile 0 */
266 0 : for(;;) {
267 0 : size_t read_sz = fread( fd_chunk_to_laddr( wksp, tar_links[ 0UL ].chunk ), 1UL, mtu, in_file );
268 0 : if( FD_UNLIKELY( read_sz==0UL ) ) {
269 0 : if( feof( in_file ) ) break;
270 0 : FD_LOG_ERR(( "fread failed (%i-%s)", errno, fd_io_strerror( errno ) ));
271 0 : }
272 0 : while( !cr_avails[ 0UL ] ) {
273 0 : cr_avails[ 0UL ] = fd_fctl_tx_cr_update( fctls[ 0UL ], cr_avails[ 0UL ], out_seqs[ 0UL ] );
274 0 : }
275 0 : fd_mcache_publish(
276 0 : tar_links[ 0UL ].mcache,
277 0 : comp_depth,
278 0 : out_seqs[ 0UL ]++,
279 0 : read_sz,
280 0 : tar_links[ 0UL ].chunk,
281 0 : 0UL,
282 0 : fd_frag_meta_ctl( 0UL, 0, 0, 0 ),
283 0 : 0UL,
284 0 : 0UL
285 0 : );
286 0 : cr_avails[ 0UL ]--;
287 0 : tar_links[ 0UL ].chunk = fd_dcache_compact_next( tar_links[ 0UL ].chunk, read_sz, tar_links[ 0UL ].chunk0, tar_links[ 0UL ].wmark );
288 0 : }
289 :
290 : /* Write shutdown signal */
291 0 : while( cr_avails[ 0 ]<2 ) cr_avails[ 0 ] = fd_fctl_tx_cr_update( fctls[ 0 ], cr_avails[ 0 ], out_seqs[ 0 ] );
292 0 : fd_mcache_publish(
293 0 : tar_links[ 0 ].mcache,
294 0 : comp_depth,
295 0 : out_seqs[ 0 ]++,
296 0 : 0UL,
297 0 : 0UL,
298 0 : 0UL,
299 0 : fd_frag_meta_ctl( 0UL, 0, 1, 0 ),
300 0 : 0UL,
301 0 : 0UL
302 0 : );
303 0 : fd_mcache_publish(
304 0 : tar_links[ 0 ].mcache,
305 0 : comp_depth,
306 0 : out_seqs[ 0 ]++,
307 0 : 0UL,
308 0 : tar_links[ 0 ].chunk,
309 0 : 0UL,
310 0 : fd_frag_meta_ctl( ORIG_SHUTDOWN, 0, 1, 0 ),
311 0 : 0UL,
312 0 : 0UL
313 0 : );
314 0 : cr_avails[ 0 ]--;
315 :
316 0 : FD_LOG_NOTICE(( "Reader done" ));
317 :
318 0 : return 0;
319 0 : }
320 :
321 : static int
322 : comp_tile_exec( int argc,
323 0 : char ** argv ) {
324 0 : (void)argc; (void)argv;
325 :
326 0 : uint rng_seed = (uint)fd_ulong_hash( (uint)fd_tickcount()+fd_tile_idx() );
327 0 : fd_rng_t _rng[1]; fd_rng_t * rng = fd_rng_join( fd_rng_new( _rng, rng_seed, 0UL ) );
328 :
329 0 : fd_wksp_t * wksp = glob.wksp;
330 0 : ulong comp_idx = fd_tile_idx()-2UL; FD_TEST( fd_tile_idx()>=2UL );
331 0 : ulong depth = glob.comp_depth;
332 0 : fd_frag_meta_t * in_mcache = glob.tar_links[ comp_idx ].mcache;
333 0 : ulong in_seq = 0UL;
334 0 : ulong * fseq = glob.comp_fseq[ comp_idx ];
335 0 : fd_frag_meta_t * out_mcache = glob.zst_links[ comp_idx ].mcache;
336 0 : uchar * out_dcache = glob.zst_links[ comp_idx ].dcache;
337 0 : ulong out_chunk0 = glob.zst_links[ comp_idx ].chunk0;
338 0 : ulong out_seq = 0UL;
339 :
340 0 : uchar fctl_mem[ FD_FCTL_FOOTPRINT( 1UL ) ] __attribute__((aligned(FD_FCTL_ALIGN)));
341 0 : fd_fctl_t * fctl = fd_fctl_join( fd_fctl_new( fctl_mem, 1UL ) );
342 0 : FD_TEST( fctl );
343 0 : ulong slow_diag;
344 0 : FD_TEST( fd_fctl_cfg_rx_add( fctl, depth, glob.wr_fseqs[ comp_idx ], &slow_diag ) );
345 0 : FD_TEST( fd_fctl_cfg_done( fctl, 1UL, 0UL, 0UL, 0UL ) );
346 :
347 0 : ulong async_min = 1UL<<7;
348 0 : ulong async_rem = 1UL; /* Do housekeeping on first iteration */
349 0 : ulong cr_avail = 0UL;
350 :
351 0 : ZSTD_CStream * zst = ZSTD_createCStream();
352 0 : if( FD_UNLIKELY( !zst ) ) FD_LOG_ERR(( "ZSTD_createCStream() failed" ));
353 0 : ZSTD_initCStream( zst, 3 );
354 :
355 0 : ulong out_chunk = out_chunk0;
356 0 : ulong out_mtu = ZSTD_COMPRESSBOUND( glob.comp_mtu );
357 0 : ZSTD_outBuffer zst_out = {
358 0 : .dst = fd_chunk_to_laddr( wksp, out_chunk ),
359 0 : .size = out_mtu,
360 0 : .pos = 0UL
361 0 : };
362 :
363 0 : for(;;) {
364 0 : fd_frag_meta_t const * mline;
365 0 : ulong seq_found;
366 0 : long diff;
367 :
368 0 : ulong in_sig;
369 0 : ulong in_chunk;
370 0 : ulong in_sz;
371 0 : ulong in_ctl;
372 0 : ulong in_tsorig;
373 0 : ulong in_tspub;
374 0 : FD_MCACHE_WAIT_REG( in_sig, in_chunk, in_sz, in_ctl, in_tsorig, in_tspub, mline, seq_found, diff, async_rem, in_mcache, depth, in_seq );
375 0 : (void)mline; (void)seq_found; (void)in_sz; (void)in_tsorig; (void)in_tspub;
376 :
377 0 : if( FD_UNLIKELY( !async_rem ) ) {
378 0 : fd_fctl_rx_cr_return( fseq, in_seq );
379 0 : cr_avail = fd_fctl_tx_cr_update( fctl, cr_avail, out_seq );
380 0 : async_rem = fd_tempo_async_reload( rng, async_min );
381 0 : continue;
382 0 : }
383 :
384 0 : if( FD_UNLIKELY( diff>0 ) ) {
385 0 : FD_LOG_ERR(( "Overrun while polling" ));
386 0 : }
387 0 : FD_TEST( diff==0 );
388 :
389 0 : ulong in_orig = fd_frag_meta_ctl_orig( in_ctl );
390 0 : if( FD_UNLIKELY( in_orig ) ) {
391 0 : FD_TEST( zst_out.pos==0 );
392 :
393 : /* Forward control signal */
394 0 : while( !cr_avail ) cr_avail = fd_fctl_tx_cr_update( fctl, cr_avail, out_seq );
395 0 : fd_mcache_publish( out_mcache, depth, out_seq++, 0UL, 0UL, 0UL, fd_frag_meta_ctl( in_orig, 0, 0, 0 ), 0UL, 0UL );
396 0 : in_seq = fd_seq_inc( in_seq, 1UL );
397 :
398 0 : if( in_orig==ORIG_SHUTDOWN ) break;
399 0 : continue;
400 0 : }
401 :
402 0 : ZSTD_inBuffer zst_in = {
403 0 : .src = fd_chunk_to_laddr( wksp, in_chunk ),
404 0 : .size = in_sig,
405 0 : .pos = 0UL
406 0 : };
407 0 : for(;;) {
408 0 : size_t const ret = ZSTD_compressStream( zst, &zst_out, &zst_in );
409 0 : if( FD_UNLIKELY( ZSTD_isError( ret ) ) ) {
410 0 : FD_LOG_ERR(( "ZSTD_compressStream() failed: %s", ZSTD_getErrorName( ret ) ));
411 0 : }
412 0 : if( FD_LIKELY( zst_in.pos==zst_in.size ) ) break;
413 :
414 : /* Flush */
415 0 : ulong chunk_sz = zst_out.pos;
416 0 : while( !cr_avail ) cr_avail = fd_fctl_tx_cr_update( fctl, cr_avail, out_seq );
417 0 : fd_mcache_publish( out_mcache, depth, out_seq++, chunk_sz, out_chunk, 0UL, fd_frag_meta_ctl( 0UL, 0, 0, 0 ), 0UL, 0UL );
418 0 : out_chunk = fd_dcache_compact_next( out_chunk, chunk_sz, out_chunk0, glob.zst_links[ comp_idx ].wmark );
419 0 : cr_avail--;
420 0 : zst_out.dst = fd_chunk_to_laddr( wksp, out_chunk );
421 0 : zst_out.pos = 0UL;
422 0 : }
423 :
424 0 : if( fd_frag_meta_ctl_eom( in_ctl ) ) {
425 0 : for(;;) {
426 0 : ulong ret = ZSTD_endStream( zst, &zst_out );
427 0 : if( FD_UNLIKELY( ZSTD_isError( ret ) ) ) {
428 0 : FD_LOG_ERR(( "ZSTD_endStream() failed: %s", ZSTD_getErrorName( ret ) ));
429 0 : }
430 :
431 : /* Flush */
432 0 : int eom = !ret;
433 0 : ulong chunk_sz = zst_out.pos;
434 0 : while( !cr_avail ) cr_avail = fd_fctl_tx_cr_update( fctl, cr_avail, out_seq );
435 0 : fd_mcache_publish( out_mcache, depth, out_seq++, chunk_sz, out_chunk, 0UL, fd_frag_meta_ctl( 0UL, 0, eom, 0 ), 0UL, 0UL );
436 0 : out_chunk = fd_dcache_compact_next( out_chunk, chunk_sz, out_chunk0, glob.zst_links[ comp_idx ].wmark );
437 0 : cr_avail--;
438 0 : zst_out.dst = fd_chunk_to_laddr( wksp, out_chunk );
439 0 : zst_out.pos = 0UL;
440 : // if( eom ) FD_LOG_NOTICE(( "finished burst comp_idx=%lu in_seq=%lu out_seq=%lu", comp_idx, in_seq, out_seq-1UL ));
441 0 : if( eom ) break;
442 0 : }
443 0 : }
444 :
445 0 : in_seq = fd_seq_inc( in_seq, 1UL );
446 0 : }
447 :
448 0 : if( zst_out.pos < zst_out.size ) {
449 : /* Flush */
450 0 : ulong chunk_sz = zst_out.pos;
451 0 : while( !cr_avail ) cr_avail = fd_fctl_tx_cr_update( fctl, cr_avail, out_seq );
452 0 : fd_mcache_publish( out_mcache, depth, out_seq++, chunk_sz, out_chunk, 0UL, fd_frag_meta_ctl( 0UL, 0, 1, 0 ), 0UL, 0UL );
453 0 : out_chunk = fd_dcache_compact_next( out_chunk, chunk_sz, out_chunk0, glob.zst_links[ comp_idx ].wmark );
454 0 : cr_avail--;
455 0 : zst_out.dst = fd_chunk_to_laddr( out_dcache, out_chunk );
456 0 : zst_out.pos = 0UL;
457 0 : }
458 :
459 0 : fd_mcache_seq_update( fd_mcache_seq_laddr( out_mcache ), out_seq );
460 :
461 0 : fd_rng_delete( fd_rng_leave( rng ) );
462 :
463 0 : return 0;
464 0 : }
465 :
466 : static int
467 : wr_tile_exec( int argc,
468 0 : char ** argv ) {
469 0 : (void)argc; (void)argv;
470 :
471 0 : uint rng_seed = (uint)fd_ulong_hash( (uint)fd_tickcount()+fd_tile_idx() );
472 0 : fd_rng_t _rng[1]; fd_rng_t * rng = fd_rng_join( fd_rng_new( _rng, rng_seed, 0UL ) );
473 :
474 0 : fd_wksp_t * wksp = glob.wksp;
475 0 : ulong comp_cnt = glob.comp_cnt;
476 0 : ulong depth = glob.comp_depth;
477 0 : FILE * out_file = glob.out_file;
478 0 : ulong ** fseqs = glob.wr_fseqs;
479 0 : ulong in_seqs[ COMP_TILE_MAX ] = {0UL};
480 :
481 0 : ulong active_set = (1UL<<comp_cnt)-1UL;
482 0 : ulong drain_set = (1UL<<comp_cnt)-1UL;
483 0 : ulong dirty_set = 0UL;
484 :
485 0 : uchar fctl_mem[ FD_FCTL_FOOTPRINT( COMP_TILE_MAX ) ] __attribute__((aligned(FD_FCTL_ALIGN)));
486 0 : fd_fctl_t * fctl = fd_fctl_join( fd_fctl_new( fctl_mem, COMP_TILE_MAX ) );
487 0 : FD_TEST( fctl );
488 0 : ulong slow_diag;
489 0 : for( ulong i=0UL; i<comp_cnt; i++ ) {
490 0 : FD_TEST( fd_fctl_cfg_rx_add( fctl, glob.comp_depth, glob.comp_fseq[i], &slow_diag ) );
491 0 : }
492 0 : FD_TEST( fd_fctl_cfg_done( fctl, 1UL, 0UL, 0UL, 0UL ) );
493 :
494 0 : ulong async_min = 1UL<<7;
495 0 : ulong async_rem = 1UL; /* Do housekeeping on first iteration */
496 :
497 0 : ulong in_idx = 0UL;
498 :
499 0 : _Bool sent_enable = 0;
500 :
501 0 : FD_LOG_NOTICE(( "Writer running" ));
502 :
503 0 : for(;;) {
504 :
505 0 : fd_frag_meta_t * in_mcache = glob.zst_links[ in_idx ].mcache;
506 :
507 0 : fd_frag_meta_t const * mline;
508 0 : ulong seq_found;
509 0 : long diff;
510 :
511 0 : ulong in_sig;
512 0 : ulong in_chunk;
513 0 : ulong in_sz;
514 0 : ulong in_ctl;
515 0 : ulong in_tsorig;
516 0 : ulong in_tspub;
517 0 : FD_MCACHE_WAIT_REG( in_sig, in_chunk, in_sz, in_ctl, in_tsorig, in_tspub, mline, seq_found, diff, async_rem, in_mcache, depth, in_seqs[ in_idx ] );
518 0 : (void)mline; (void)seq_found; (void)in_sz; (void)in_tsorig; (void)in_tspub;
519 :
520 0 : if( FD_UNLIKELY( !async_rem ) ) {
521 0 : if( FD_UNLIKELY( !active_set ) ) break;
522 :
523 0 : if( !fd_ulong_extract_bit( dirty_set, (int)in_idx ) ) {
524 0 : in_idx++;
525 0 : if( in_idx>=comp_cnt ) in_idx = 0UL;
526 0 : }
527 :
528 0 : for( ulong i=0UL; i<comp_cnt; i++ ) {
529 0 : fd_fctl_rx_cr_return( fseqs[i], in_seqs[i] );
530 0 : }
531 0 : async_rem = fd_tempo_async_reload( rng, async_min );
532 0 : continue;
533 0 : }
534 :
535 0 : if( FD_UNLIKELY( diff>0 ) ) {
536 0 : FD_LOG_ERR(( "Overrun while polling" ));
537 0 : }
538 0 : FD_TEST( diff==0 );
539 :
540 0 : ulong in_orig = fd_frag_meta_ctl_orig( in_ctl );
541 0 : if( FD_UNLIKELY( in_orig ) ) {
542 0 : if( in_orig==ORIG_PARA_ENABLE && !sent_enable ) {
543 0 : struct __attribute__((packed)) {
544 0 : uint magic;
545 0 : uint frame_sz;
546 0 : ulong user;
547 0 : } header = {
548 0 : .magic = 0x184D2A50U,
549 0 : .frame_sz = 8U,
550 0 : .user = SNAPMK_PARA_ENABLE
551 0 : };
552 0 : if( FD_UNLIKELY( fwrite( &header, sizeof(header), 1UL, out_file )!=1UL ) ) {
553 0 : FD_LOG_ERR(( "fwrite failed (%i-%s)", errno, fd_io_strerror( errno ) ));
554 0 : }
555 0 : FD_VOLATILE( fseqs[ in_idx ][1] ) = 1UL;
556 0 : sent_enable = 1;
557 0 : } else if( in_orig==ORIG_PARA_DISABLE ) {
558 0 : drain_set = fd_ulong_clear_bit( drain_set, (int)in_idx );
559 0 : if( drain_set ) continue;
560 :
561 0 : struct __attribute__((packed)) {
562 0 : uint magic;
563 0 : uint frame_sz;
564 0 : ulong user;
565 0 : } header = {
566 0 : .magic = 0x184D2A50U,
567 0 : .frame_sz = 8U,
568 0 : .user = SNAPMK_PARA_DISABLE
569 0 : };
570 0 : if( FD_UNLIKELY( fwrite( &header, sizeof(header), 1UL, out_file )!=1UL ) ) {
571 0 : FD_LOG_ERR(( "fwrite failed (%i-%s)", errno, fd_io_strerror( errno ) ));
572 0 : }
573 :
574 0 : } else if( in_orig==ORIG_SHUTDOWN ) {
575 0 : FD_TEST( !fd_ulong_extract_bit( dirty_set, (int)in_idx ) );
576 0 : active_set = fd_ulong_clear_bit( active_set, (int)in_idx );
577 0 : }
578 0 : in_seqs[ in_idx ] = fd_seq_inc( in_seqs[ in_idx ], 1UL );
579 0 : if( in_orig==ORIG_SHUTDOWN || in_orig==ORIG_PARA_DISABLE ) {
580 0 : in_idx++;
581 0 : if( in_idx>=comp_cnt ) in_idx = 0UL;
582 0 : }
583 0 : continue;
584 0 : }
585 :
586 0 : if( in_sig ) {
587 0 : void const * in_frag = fd_chunk_to_laddr( wksp, in_chunk );
588 0 : ulong wr_sz = fwrite( in_frag, in_sig, 1UL, out_file );
589 0 : if( FD_UNLIKELY( wr_sz!=1UL ) ) {
590 0 : FD_LOG_ERR(( "fwrite failed (%i-%s)", errno, fd_io_strerror( errno ) ));
591 0 : }
592 0 : }
593 :
594 0 : in_seqs[ in_idx ] = fd_seq_inc( in_seqs[ in_idx ], 1UL );
595 :
596 0 : int eom = fd_frag_meta_ctl_eom( in_ctl );
597 0 : if( eom ) {
598 : // FD_LOG_NOTICE(( "finished write comp_idx=%lu in_seq=%lu", in_idx, in_seqs[ in_idx ]-1UL ));
599 0 : dirty_set = fd_ulong_clear_bit( dirty_set, (int)in_idx );
600 0 : in_idx++;
601 0 : if( in_idx>=comp_cnt ) in_idx = 0UL;
602 : // FD_LOG_NOTICE(( "switching to comp_idx=%lu", in_idx ));
603 0 : } else {
604 0 : dirty_set = fd_ulong_set_bit( dirty_set, (int)in_idx );
605 0 : }
606 0 : }
607 :
608 0 : FD_LOG_NOTICE(( "Writer done" ));
609 :
610 0 : return 0;
611 0 : }
612 :
613 : __attribute__((noreturn)) static void
614 0 : usage( int rc ) {
615 : fputs( "Usage: fd_snapmk_para --in FILE.tar --out FILE.tar.zst\n", stderr );
616 0 : exit( rc );
617 0 : }
618 :
619 : int
620 : main( int argc,
621 : char ** argv ) {
622 : if( fd_env_strip_cmdline_contains( &argc, &argv, "--help" ) ) {
623 : fputs( "fd_snapmk creates a backwards-compatible Firedancer-optimized Solana snapshot\n", stderr );
624 : usage( EXIT_SUCCESS );
625 : }
626 :
627 : fd_boot( &argc, &argv );
628 :
629 : char const * _page_sz = fd_env_strip_cmdline_cstr ( &argc, &argv, "--page-sz", NULL, "gigantic" );
630 : ulong page_cnt = fd_env_strip_cmdline_ulong ( &argc, &argv, "--page-cnt", NULL, 1UL );
631 : ulong near_cpu = fd_env_strip_cmdline_ulong ( &argc, &argv, "--near-cpu", NULL, fd_log_cpu_id() );
632 : char const * in_path = fd_env_strip_cmdline_cstr ( &argc, &argv, "--in", NULL, NULL );
633 : char const * out_path = fd_env_strip_cmdline_cstr ( &argc, &argv, "--out", NULL, NULL );
634 : ulong frame_sz = fd_env_strip_cmdline_ulong( &argc, &argv, "--frame-sz", NULL, 33554432UL );
635 : ulong depth = fd_env_strip_cmdline_ulong( &argc, &argv, "--depth", NULL, 32UL );
636 : ulong mtu = fd_env_strip_cmdline_ulong( &argc, &argv, "--mtu", NULL, 1UL<<20 );
637 :
638 : if( FD_UNLIKELY( !in_path ) ) usage( EXIT_FAILURE );
639 : if( !out_path ) {
640 : ulong in_len = strlen( in_path );
641 : if( FD_UNLIKELY( in_len+strlen( ".zst" )+1UL>PATH_MAX ) ) FD_LOG_ERR(( "--in argument is too long" ));
642 : static char output_path[ PATH_MAX ];
643 : fd_cstr_fini( fd_cstr_append_cstr( fd_cstr_append_text( fd_cstr_init( output_path ), in_path, in_len ), ".zst" ) );
644 : out_path = output_path;
645 : }
646 :
647 : if( FD_UNLIKELY( fd_tile_cnt()<3 ) ) FD_LOG_ERR(( "This program requires at least 3 tiles" ));
648 : ulong comp_cnt = fd_tile_cnt() - 2UL;
649 : comp_cnt = fd_ulong_min( comp_cnt, COMP_TILE_MAX );
650 :
651 : FILE * in_file = fopen( in_path, "rb" );
652 : if( FD_UNLIKELY( !in_file ) ) {
653 : FD_LOG_ERR(( "fopen(%s,\"rb\") failed (%i-%s)", in_path, errno, fd_io_strerror( errno ) ));
654 : }
655 : ulong in_file_sz;
656 : if( FD_UNLIKELY( fseek( in_file, 0L, SEEK_END )!=0 ) ) {
657 : FD_LOG_ERR(( "fseek(%s,0,SEEK_END) failed (%i-%s)", in_path, errno, fd_io_strerror( errno ) ));
658 : }
659 : long ftell_res = ftell( in_file );
660 : if( FD_UNLIKELY( ftell_res<0L ) ) {
661 : FD_LOG_ERR(( "ftell(%s) failed (%i-%s)", in_path, errno, fd_io_strerror( errno ) ));
662 : }
663 : in_file_sz = (ulong)ftell_res;
664 : if( FD_UNLIKELY( fseek( in_file, 0L, SEEK_SET )!=0 ) ) {
665 : FD_LOG_ERR(( "fseek(%s,0,SEEK_SET) failed (%i-%s)", in_path, errno, fd_io_strerror( errno ) ));
666 : }
667 : glob.in_file = in_file;
668 : glob.in_file_sz = in_file_sz;
669 :
670 : FILE * out_file = fopen( out_path, "wb" );
671 : if( FD_UNLIKELY( !out_file ) ) {
672 : FD_LOG_ERR(( "fopen(%s,\"wb\") failed (%i-%s)", out_path, errno, fd_io_strerror( errno ) ));
673 : }
674 : glob.out_file = out_file;
675 :
676 : struct __attribute__((packed)) {
677 : uint magic;
678 : uint frame_sz;
679 : ulong user;
680 : } header = {
681 : .magic = 0x184D2A50U,
682 : .frame_sz = 8U,
683 : .user = SNAPMK_MAGIC
684 : };
685 : if( FD_UNLIKELY( fwrite( &header, sizeof(header), 1UL, out_file )!=1UL ) ) {
686 : FD_LOG_ERR(( "fwrite header to %s failed (%i-%s)", out_path, errno, fd_io_strerror( errno ) ));
687 : }
688 :
689 : FD_LOG_NOTICE(( "--wksp not specified, using an anonymous local workspace, --page-sz %s, --page-cnt %lu, --near-cpu %lu",
690 : _page_sz, page_cnt, near_cpu ));
691 : fd_wksp_t * wksp = fd_wksp_new_anonymous( fd_cstr_to_shmem_page_sz( _page_sz ), page_cnt, near_cpu, "wksp", 0UL );
692 :
693 : if( FD_UNLIKELY( !wksp ) ) FD_LOG_ERR(( "Unable to attach to wksp" ));
694 : glob.wksp = wksp;
695 : glob.comp_cnt = comp_cnt;
696 : glob.comp_depth = depth;
697 : glob.frame_sz = frame_sz;
698 : glob.comp_mtu = mtu;
699 :
700 : ulong tar_dcache_sz = fd_dcache_req_data_sz( mtu, depth, 1UL, 1 );
701 : ulong zst_dcache_sz = fd_dcache_req_data_sz( ZSTD_COMPRESSBOUND( mtu ), depth, 1UL, 1 );
702 :
703 : for( ulong i=0UL; i<comp_cnt; i++ ) {
704 : link_t * tar = &glob.tar_links[i];
705 : tar->mcache = fd_mcache_join( fd_mcache_new( fd_wksp_alloc_laddr( wksp, fd_mcache_align(), fd_mcache_footprint( depth, 0UL ), WKSP_TAG ), depth, 0UL, 0UL ) );
706 : FD_TEST( tar->mcache );
707 : tar->dcache = fd_dcache_join( fd_dcache_new( fd_wksp_alloc_laddr( wksp, fd_dcache_align(), fd_dcache_footprint( tar_dcache_sz, 0UL ), WKSP_TAG ), tar_dcache_sz, 0UL ) );
708 : FD_TEST( tar->dcache );
709 : tar->chunk0 = fd_dcache_compact_chunk0( wksp, tar->dcache );
710 : tar->chunk = tar->chunk0;
711 : tar->wmark = fd_dcache_compact_wmark ( wksp, tar->dcache, mtu );
712 :
713 : link_t * zst = &glob.zst_links[i];
714 : zst->mcache = fd_mcache_join( fd_mcache_new( fd_wksp_alloc_laddr( wksp, fd_mcache_align(), fd_mcache_footprint( depth, 0UL ), WKSP_TAG ), depth, 0UL, 0UL ) );
715 : FD_TEST( zst->mcache );
716 : zst->dcache = fd_dcache_join( fd_dcache_new( fd_wksp_alloc_laddr( wksp, fd_dcache_align(), fd_dcache_footprint( zst_dcache_sz, 0UL ), WKSP_TAG ), zst_dcache_sz, 0UL ) );
717 : FD_TEST( zst->dcache );
718 : zst->chunk0 = fd_dcache_compact_chunk0( wksp, zst->dcache );
719 : zst->chunk = zst->chunk0;
720 : zst->wmark = fd_dcache_compact_wmark ( wksp, zst->dcache, mtu );
721 :
722 : glob.wr_fseqs[i] = fd_fseq_join( fd_fseq_new( fd_wksp_alloc_laddr( wksp, fd_fseq_align(), fd_fseq_footprint(), WKSP_TAG ), 0UL ) );
723 : FD_TEST( glob.wr_fseqs[i] );
724 :
725 : glob.comp_fseq[i] = fd_fseq_join( fd_fseq_new( fd_wksp_alloc_laddr( wksp, fd_fseq_align(), fd_fseq_footprint(), WKSP_TAG ), 0UL ) );
726 : FD_TEST( glob.comp_fseq[i] );
727 : }
728 :
729 : fd_tile_exec_t * wr_exec = fd_tile_exec_new( 1UL, wr_tile_exec, 0, NULL );
730 : FD_TEST( wr_exec );
731 :
732 : fd_tile_exec_t * comp_exec[ COMP_TILE_MAX ];
733 : for( ulong i=0UL; i<comp_cnt; i++ ) {
734 : comp_exec[ i ] = fd_tile_exec_new( 2UL+i, comp_tile_exec, 0, NULL );
735 : }
736 :
737 : long dt = -fd_log_wallclock();
738 : rd_tile_exec( 0, NULL );
739 :
740 : for( ulong i=0UL; i<comp_cnt; i++ ) {
741 : FD_TEST( !fd_tile_exec_delete( comp_exec[ i ], NULL ) );
742 : }
743 : FD_TEST( !fd_tile_exec_delete( wr_exec, NULL ) );
744 :
745 : if( FD_UNLIKELY( 0!=fclose( in_file ) ) ) {
746 : FD_LOG_ERR(( "fclose(%s) failed (%i-%s)", in_path, errno, fd_io_strerror( errno ) ));
747 : }
748 :
749 : ftell_res = ftell( out_file );
750 : if( FD_UNLIKELY( ftell_res<0L ) ) {
751 : FD_LOG_ERR(( "ftell(%s) failed (%i-%s)", out_path, errno, fd_io_strerror( errno ) ));
752 : }
753 : ulong out_file_sz = (ulong)ftell_res;
754 :
755 : if( FD_UNLIKELY( 0!=fclose( out_file ) ) ) {
756 : FD_LOG_ERR(( "fclose(%s) failed (%i-%s)", out_path, errno, fd_io_strerror( errno ) ));
757 : }
758 : dt += fd_log_wallclock();
759 : FD_LOG_NOTICE(( "Compressed %.3f GiB to %.3f GiB in %.3f s (%.3f GB/s, ratio %.2f)",
760 : (double)in_file_sz/(double)(1UL<<30),
761 : (double)out_file_sz/(double)(1UL<<30),
762 : (double)dt/1e9,
763 : ((double)in_file_sz)/((double)dt),
764 : (double)in_file_sz/(double)out_file_sz ));
765 :
766 : fd_wksp_delete_anonymous( wksp );
767 :
768 : fd_halt();
769 : return 0;
770 : }
|