Line data Source code
1 : #include "fd_checkpt.h"
2 :
3 : int
4 342 : fd_checkpt_frame_style_is_supported( int frame_style ) {
5 342 : int supported;
6 342 : supported = (frame_style==FD_CHECKPT_FRAME_STYLE_RAW);
7 342 : # if FD_HAS_LZ4
8 342 : supported |= (frame_style==FD_CHECKPT_FRAME_STYLE_LZ4);
9 342 : # endif
10 342 : return supported;
11 342 : }
12 :
13 : char const *
14 18 : fd_checkpt_strerror( int err ) {
15 18 : switch( err ) {
16 3 : case FD_CHECKPT_SUCCESS: return "success";
17 3 : case FD_CHECKPT_ERR_INVAL: return "bad input args";
18 3 : case FD_CHECKPT_ERR_UNSUP: return "unsupported on this target";
19 3 : case FD_CHECKPT_ERR_IO: return "io error";
20 3 : case FD_CHECKPT_ERR_COMP: return "compression error";
21 3 : default: break;
22 18 : }
23 3 : return "unknown";
24 18 : }
25 :
26 : #if FD_HAS_LZ4
27 : #include <lz4.h>
28 :
29 : /* fd_checkpt_private_lz4 compresses the ubuf_usz byte size memory
30 : region pointed to by ubuf into the cbuf_max memory region pointed to
31 : by cbuf using the given lz4 compressor. Assumes lz4, ubuf and cbuf
32 : are valid. On success, returns the compressed size (will be in
33 : [4,cbuf_max]). The ubuf passed to this should not be modified
34 : until the given lz4 stream is reset / closed or there has been an
35 : additional 64 KiB passed to the stream. On failure, returns 0 and
36 : retains no interest in ubuf. In, either case, this retains no
37 : interest in cbuf on return.
38 :
39 : _gbuf, gbuf_sz, gbuf_thresh, _gbuf_cursor specify the small buf
40 : gather ring state. It is detailed below. */
41 :
42 : static ulong
43 : fd_checkpt_private_lz4( LZ4_stream_t * lz4,
44 : void const * _ubuf,
45 : ulong ubuf_usz,
46 : void * _cbuf,
47 : ulong cbuf_max,
48 : void * _gbuf,
49 : ulong gbuf_sz,
50 : ulong gbuf_thresh,
51 1729047 : ulong * _gbuf_cursor ) {
52 1729047 : char * cbuf = (char *) _cbuf;
53 1729047 : char const * ubuf = (char const *)_ubuf;
54 :
55 : /* Verify ubuf_usz is in [1,LZ4_MAX_INPUT_SIZE] and cbuf_max is large
56 : enough to store a header and a non-trivial compressed body. */
57 :
58 1729047 : if( FD_UNLIKELY( !((1UL<=ubuf_usz) & (ubuf_usz<=(ulong)LZ4_MAX_INPUT_SIZE)) ) ) {
59 0 : FD_LOG_WARNING(( "bad ubuf_usz" ));
60 0 : return 0UL;
61 0 : }
62 :
63 1729047 : if( FD_UNLIKELY( cbuf_max<4UL ) ) {
64 0 : FD_LOG_WARNING(( "not enough room to compress" ));
65 0 : return 0UL;
66 0 : }
67 :
68 : /* Small ubuf gather optimization. Though the LZ4 streaming API looks
69 : like it is designed for scatter/gather operation, the
70 : implementation under the hood is heavily optimized for the case
71 : when incoming data buffers are stored in a ring buffer (basically,
72 : the compression dictionary has a size of the most recent 64 KiB of
73 : _contiguous_ _in_ _memory_ buffers passed to it ... see
74 : lz4-1.9.4@lz4/lib/lz4.c:2636-2665 for an example).
75 :
76 : When a buffer >> 64 KiB is checkpointed, it will be compressed as
77 : CHUNK_USZ sequential chunks contiguous in memory. So outside of
78 : minor startup effects (where the initial dictionary might not be as
79 : large as it could have been), this case is optimal.
80 :
81 : But when lots of disjoint tiny buffers << 64 KiB are checkpointed,
82 : LZ4 is constantly reseting its dictionary to only use the most
83 : recently previously compressed (tiny) buffer. This case is
84 : suboptimal.
85 :
86 : At the same time, we don't want to use a ring buffer because that
87 : would imply an extra copy when compressing large data. This is a
88 : complete waste because that case was already optimal. And this is
89 : the most important case for high performance.
90 :
91 : Below, if the incoming buffer to compress is large enough
92 : (>thresh), we compress it in place as it will be optimal as before.
93 :
94 : If not, we first copy it into a gather buffer and have LZ4 compress
95 : out of the gather buffer location. Then, when compressing lots of
96 : tiny buffer disjoint buffers, it will appear to LZ4 as though they
97 : were contiguous in memory and LZ4 will handle that optimally too.
98 :
99 : Then our dictionary size is optimal in both asymptotic regimes and
100 : we are still zero copy in the important case of compressing large
101 : data. The dictionary will also be reasonable when toggling
102 : frequently between asymptotic regimes, as often happens in
103 : checkpointing (small metadata checkpts/large checkpt/small metadata
104 : checkpts/large data checkpt/...).
105 :
106 : This is also necessary to satisfy fd_checkpt_data's and
107 : fd_checkpt_meta's API guarantees. The lz4 streaming API requires
108 : the most recent 64KiB of uncompressed bytes to be unmodified and in
109 : the same place when called. If FD_CHECKPT_META_MAX<=thresh<=64KiB,
110 : the copying into the gather buffer here and out of the scatter
111 : buffer on restore means the unmodified-in-place part of the
112 : requirement can be satified even if the user passes a temporary
113 : buffer and immediately modifies/frees it on return. That is, the
114 : checkpt/restore will be prompt and retain no interest in buf on
115 : return.
116 :
117 : We also have to make the gather/scatter buffers large enough to
118 : satify the most-recent-64KiB part of the requirement. Suppose we
119 : have only been compressing small buffers and we are trying to
120 : compress a thresh byte buffer when only thresh-1 bytes of gather
121 : buffer space remains. Since we wrap at buffer granularity, we will
122 : need to put thresh bytes at the head of the buffer. To ensure this
123 : doesn't clobber any of the 64 KiB previously compressed bytes, we
124 : need a gather buffer at least:
125 :
126 : thresh + 64KiB + thresh-1 = 2 thresh + 64 KiB - 1
127 :
128 : in size. Larger is fine. Smaller will violate this part of the
129 : requirement.
130 :
131 : We do the corresponding in the restore and the restore
132 : configuration must match our checkpt configuration exactly in order
133 : to keep the dictionaries on both sides synchronized.
134 :
135 : TL;DR We store small buffers into a gather ring at buffer
136 : granularity for better compression and compress large buffers in
137 : place for extra performance due to the details of how LZ4 stream
138 : APIs are implemented We also do this to support immediate use and
139 : reuse of the metadata checkpt/restores buffers. */
140 :
141 1729047 : int is_small = ubuf_usz<=gbuf_thresh;
142 1729047 : if( is_small ) { /* app dependent branch prob */
143 1729047 : ulong gbuf_cursor = *_gbuf_cursor;
144 1729047 : if( (gbuf_sz-gbuf_cursor)<ubuf_usz ) gbuf_cursor = 0UL; /* cmov */
145 1729047 : ubuf = (char *)_gbuf + gbuf_cursor;
146 1729047 : *_gbuf_cursor = gbuf_cursor + ubuf_usz;
147 1729047 : memcpy( (char *)ubuf, _ubuf, ubuf_usz );
148 1729047 : }
149 :
150 : /* Compress ubuf into cbuf, leaving space for the header. Compression
151 : will fail if there is no room to the resulting compressed size into
152 : the header as we clamp the capacity to 2^24-1. */
153 :
154 1729047 : ulong ubuf_csz_max = fd_ulong_min( cbuf_max-3UL, (1UL<<24)-1UL ); /* In [1,2^24) */
155 :
156 1729047 : int _ubuf_csz = LZ4_compress_fast_continue( lz4, ubuf, cbuf+3UL, (int)ubuf_usz, (int)ubuf_csz_max, 1 /* default */ );
157 1729047 : if( FD_UNLIKELY( _ubuf_csz<=0 ) ) {
158 3 : FD_LOG_WARNING(( "LZ4_compress_fast_continue error (%i)", _ubuf_csz ));
159 3 : return 0UL;
160 3 : }
161 :
162 1729044 : ulong ubuf_csz = (ulong)_ubuf_csz;
163 1729044 : if( FD_UNLIKELY( ubuf_csz>ubuf_csz_max ) ) {
164 0 : FD_LOG_WARNING(( "unexpected compressed size" ));
165 0 : return 0UL;
166 0 : }
167 :
168 : /* Write compressed size we obtained into the header as a 24-bit
169 : little endian unsigned integer. This need to do this is a
170 : limitation of how the recent LZ4 APIs (>=1.9) work. */
171 :
172 1729044 : cbuf[0] = (char)( ubuf_csz & 255UL);
173 1729044 : cbuf[1] = (char)((ubuf_csz>> 8) & 255UL);
174 1729044 : cbuf[2] = (char)((ubuf_csz>>16) & 255UL);
175 :
176 1729044 : return ubuf_csz + 3UL;
177 1729044 : }
178 : #endif
179 :
180 : fd_checkpt_t *
181 : fd_checkpt_init_stream( void * mem,
182 : int fd,
183 : void * wbuf,
184 21108 : ulong wbuf_sz ) {
185 :
186 : /* Check input args */
187 :
188 21108 : if( FD_UNLIKELY( !mem ) ) {
189 3 : FD_LOG_WARNING(( "NULL mem" ));
190 3 : return NULL;
191 3 : }
192 :
193 21105 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, FD_CHECKPT_ALIGN ) ) ) {
194 3 : FD_LOG_WARNING(( "misaligned mem" ));
195 3 : return NULL;
196 3 : }
197 :
198 21102 : if( FD_UNLIKELY( fd<0 ) ) {
199 3 : FD_LOG_WARNING(( "bad fd" ));
200 3 : return NULL;
201 3 : }
202 :
203 21099 : if( FD_UNLIKELY( !wbuf ) ) {
204 3 : FD_LOG_WARNING(( "NULL wbuf" ));
205 3 : return NULL;
206 3 : }
207 :
208 21096 : if( FD_UNLIKELY( wbuf_sz<FD_CHECKPT_WBUF_MIN ) ) {
209 3 : FD_LOG_WARNING(( "wbuf_sz too small" ));
210 3 : return NULL;
211 3 : }
212 :
213 : /* Create the compressor */
214 :
215 21093 : # if FD_HAS_LZ4
216 21093 : LZ4_stream_t * lz4 = LZ4_createStream();
217 21093 : if( FD_UNLIKELY( !lz4 ) ) {
218 0 : FD_LOG_WARNING(( "lz4 error" ));
219 0 : return NULL;
220 0 : }
221 : # else
222 : void * lz4 = NULL;
223 : # endif
224 :
225 : /* Init the checkpt */
226 :
227 21093 : fd_checkpt_t * checkpt = (fd_checkpt_t *)mem;
228 :
229 21093 : checkpt->fd = fd; /* streaming mode */
230 21093 : checkpt->frame_style = 0; /* not in frame */
231 21093 : checkpt->lz4 = (void *)lz4;
232 21093 : checkpt->gbuf_cursor = 0UL;
233 21093 : checkpt->off = 0UL;
234 21093 : checkpt->wbuf.mem = (uchar *)wbuf;
235 21093 : checkpt->wbuf.sz = wbuf_sz;
236 21093 : checkpt->wbuf.used = 0UL;
237 :
238 21093 : return checkpt;
239 21093 : }
240 :
241 : fd_checkpt_t *
242 : fd_checkpt_init_mmio( void * mem,
243 : void * mmio,
244 21084 : ulong mmio_sz ) {
245 :
246 : /* Check input args */
247 :
248 21084 : if( FD_UNLIKELY( !mem ) ) {
249 3 : FD_LOG_WARNING(( "NULL mem" ));
250 3 : return NULL;
251 3 : }
252 :
253 21081 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, FD_CHECKPT_ALIGN ) ) ) {
254 3 : FD_LOG_WARNING(( "misaligned mem" ));
255 3 : return NULL;
256 3 : }
257 :
258 21078 : if( FD_UNLIKELY( (!mmio) & (!!mmio_sz) ) ) {
259 3 : FD_LOG_WARNING(( "NULL mmio" ));
260 3 : return NULL;
261 3 : }
262 :
263 : /* Create the compressor */
264 :
265 21075 : # if FD_HAS_LZ4
266 21075 : LZ4_stream_t * lz4 = LZ4_createStream();
267 21075 : if( FD_UNLIKELY( !lz4 ) ) {
268 0 : FD_LOG_WARNING(( "lz4 error" ));
269 0 : return NULL;
270 0 : }
271 : # else
272 : void * lz4 = NULL;
273 : # endif
274 :
275 : /* Init the checkpt */
276 :
277 21075 : fd_checkpt_t * checkpt = (fd_checkpt_t *)mem;
278 :
279 21075 : checkpt->fd = -1; /* mmio mode */
280 21075 : checkpt->frame_style = 0; /* not in frame */
281 21075 : checkpt->lz4 = (void *)lz4;
282 21075 : checkpt->gbuf_cursor = 0UL;
283 21075 : checkpt->off = 0UL;
284 21075 : checkpt->mmio.mem = (uchar *)mmio;
285 21075 : checkpt->mmio.sz = mmio_sz;
286 :
287 21075 : return checkpt;
288 21075 : }
289 :
290 : void *
291 42186 : fd_checkpt_fini( fd_checkpt_t * checkpt ) {
292 :
293 42186 : if( FD_UNLIKELY( !checkpt ) ) {
294 6 : FD_LOG_WARNING(( "NULL checkpt" ));
295 6 : return NULL;
296 6 : }
297 :
298 42180 : if( FD_UNLIKELY( fd_checkpt_in_frame( checkpt ) ) ) {
299 12 : FD_LOG_WARNING(( "in a frame" ));
300 12 : checkpt->frame_style = -1; /* failed */
301 12 : return NULL;
302 12 : }
303 :
304 42168 : # if FD_HAS_LZ4
305 :
306 : /* Note: Though this this doesn't seem to be officially documented,
307 : the lz4-1.9.4@lz4/lib/lz4.c:1575) suggests that this always returns
308 : 0. That is, 0 is success and non-zero is failure. */
309 :
310 42168 : if( FD_UNLIKELY( LZ4_freeStream( (LZ4_stream_t *)checkpt->lz4 ) ) )
311 0 : FD_LOG_WARNING(( "LZ4 freeStream error, attempting to continue" ));
312 :
313 42168 : # endif
314 :
315 42168 : return (void *)checkpt;
316 42180 : }
317 :
318 : int
319 : fd_checkpt_open_advanced( fd_checkpt_t * checkpt,
320 : int frame_style,
321 422133 : ulong * _off ) {
322 :
323 422133 : if( FD_UNLIKELY( !checkpt ) ) {
324 6 : FD_LOG_WARNING(( "NULL checkpt" ));
325 6 : return FD_CHECKPT_ERR_INVAL;
326 6 : }
327 :
328 422127 : if( FD_UNLIKELY( !fd_checkpt_can_open( checkpt ) ) ) {
329 6 : FD_LOG_WARNING(( "in a frame or failed" ));
330 6 : checkpt->frame_style = -1; /* failed */
331 6 : return FD_CHECKPT_ERR_INVAL;
332 6 : }
333 :
334 422121 : if( FD_UNLIKELY( !_off ) ) {
335 0 : FD_LOG_WARNING(( "NULL _off" ));
336 0 : checkpt->frame_style = -1; /* failed */
337 0 : return FD_CHECKPT_ERR_INVAL;
338 0 : }
339 :
340 422121 : frame_style = fd_int_if( !!frame_style, frame_style, FD_CHECKPT_FRAME_STYLE_DEFAULT );
341 :
342 422121 : switch( frame_style ) {
343 :
344 210360 : case FD_CHECKPT_FRAME_STYLE_RAW: {
345 210360 : break;
346 0 : }
347 :
348 0 : # if FD_HAS_LZ4
349 211755 : case FD_CHECKPT_FRAME_STYLE_LZ4: {
350 211755 : LZ4_resetStream_fast( (LZ4_stream_t *)checkpt->lz4 ); /* Note: no error code for this API */
351 211755 : checkpt->gbuf_cursor = 0UL;
352 211755 : break;
353 0 : }
354 0 : # endif
355 :
356 6 : default: {
357 6 : FD_LOG_WARNING(( "unsupported frame_style" ));
358 6 : checkpt->frame_style = -1; /* failed */
359 6 : return FD_CHECKPT_ERR_UNSUP;
360 0 : }
361 :
362 422121 : }
363 :
364 422115 : checkpt->frame_style = frame_style;
365 :
366 422115 : *_off = checkpt->off;
367 422115 : return FD_CHECKPT_SUCCESS;
368 422121 : }
369 :
370 : int
371 : fd_checkpt_close_advanced( fd_checkpt_t * checkpt,
372 422064 : ulong * _off ) {
373 :
374 422064 : if( FD_UNLIKELY( !checkpt ) ) {
375 6 : FD_LOG_WARNING(( "NULL checkpt" ));
376 6 : return FD_CHECKPT_ERR_INVAL;
377 6 : }
378 :
379 422058 : if( FD_UNLIKELY( !fd_checkpt_in_frame( checkpt ) ) ) {
380 6 : FD_LOG_WARNING(( "not in a frame" ));
381 6 : checkpt->frame_style = -1; /* failed */
382 6 : return FD_CHECKPT_ERR_INVAL;
383 6 : }
384 :
385 422052 : if( FD_UNLIKELY( !_off ) ) {
386 0 : FD_LOG_WARNING(( "NULL _off" ));
387 0 : checkpt->frame_style = -1; /* failed */
388 0 : return FD_CHECKPT_ERR_INVAL;
389 0 : }
390 :
391 422052 : ulong off = checkpt->off;
392 :
393 422052 : if( fd_checkpt_is_mmio( checkpt ) ) { /* mmio mode (app dependent branch prob) */
394 :
395 : /* Nothing to do */
396 :
397 211311 : } else { /* streaming mode */
398 :
399 : /* Flush out all pending bytes for this frame */
400 :
401 210741 : ulong wbuf_used = checkpt->wbuf.used;
402 :
403 210741 : if( FD_LIKELY( wbuf_used ) ) {
404 :
405 183201 : ulong wsz;
406 183201 : int err = fd_io_write( checkpt->fd, checkpt->wbuf.mem, wbuf_used, wbuf_used, &wsz );
407 183201 : if( FD_UNLIKELY( err ) ) {
408 0 : FD_LOG_WARNING(( "fd_io_write failed (%i-%s)", err, fd_io_strerror( err ) ));
409 0 : checkpt->frame_style = -1; /* failed */
410 0 : return FD_CHECKPT_ERR_IO;
411 0 : }
412 :
413 183201 : off += wsz;
414 183201 : if( FD_UNLIKELY( off<wsz ) ) {
415 0 : FD_LOG_WARNING(( "checkpt sz overflow" ));
416 0 : checkpt->frame_style = -1; /* failed */
417 0 : return FD_CHECKPT_ERR_IO;
418 0 : }
419 :
420 183201 : }
421 :
422 210741 : checkpt->wbuf.used = 0UL;
423 :
424 210741 : }
425 :
426 422052 : checkpt->off = off;
427 422052 : checkpt->frame_style = 0; /* not in frame */
428 :
429 422052 : *_off = off;
430 422052 : return FD_CHECKPT_SUCCESS;
431 422052 : }
432 :
433 : static int
434 : fd_checkpt_private_buf( fd_checkpt_t * checkpt,
435 : void const * buf,
436 : ulong sz,
437 3590586 : ulong max ) {
438 :
439 3590586 : if( FD_UNLIKELY( !checkpt ) ) {
440 12 : FD_LOG_WARNING(( "NULL checkpt" ));
441 12 : return FD_CHECKPT_ERR_INVAL;
442 12 : }
443 :
444 3590574 : if( FD_UNLIKELY( !fd_checkpt_in_frame( checkpt ) ) ) {
445 54 : FD_LOG_WARNING(( "not in a frame" ));
446 54 : checkpt->frame_style = -1; /* failed */
447 54 : return FD_CHECKPT_ERR_INVAL;
448 54 : }
449 :
450 3590520 : if( FD_UNLIKELY( !sz ) ) return FD_CHECKPT_SUCCESS; /* nothing to do */
451 :
452 3196674 : if( FD_UNLIKELY( sz>max ) ) {
453 12 : FD_LOG_WARNING(( "sz too large" ));
454 12 : checkpt->frame_style = -1; /* failed */
455 12 : return FD_CHECKPT_ERR_INVAL;
456 12 : }
457 :
458 3196662 : if( FD_UNLIKELY( !buf ) ) {
459 24 : FD_LOG_WARNING(( "NULL buf with non-zero sz" ));
460 24 : checkpt->frame_style = -1; /* failed */
461 24 : return FD_CHECKPT_ERR_INVAL;
462 24 : }
463 :
464 3196638 : ulong off = checkpt->off;
465 :
466 3196638 : switch( checkpt->frame_style ) {
467 :
468 1593915 : case FD_CHECKPT_FRAME_STYLE_RAW: {
469 :
470 1593915 : if( fd_checkpt_is_mmio( checkpt ) ) { /* mmio mode (app dependent branch prob) */
471 :
472 797523 : if( FD_UNLIKELY( sz > (checkpt->mmio.sz-off) ) ) {
473 6 : FD_LOG_WARNING(( "mmio_sz too small" ));
474 6 : checkpt->frame_style = -1; /* failed */
475 6 : return FD_CHECKPT_ERR_IO;
476 6 : }
477 :
478 797517 : memcpy( checkpt->mmio.mem + off, buf, sz );
479 :
480 797517 : off += sz; /* at most mmio.sz */
481 :
482 797517 : } else { /* streaming mode */
483 :
484 796392 : ulong wbuf_used = checkpt->wbuf.used;
485 :
486 796392 : ulong wsz_max = wbuf_used + sz;
487 796392 : if( FD_UNLIKELY( wsz_max<sz ) ) {
488 0 : FD_LOG_WARNING(( "sz overflow" ));
489 0 : checkpt->frame_style = -1; /* failed */
490 0 : return FD_CHECKPT_ERR_IO;
491 0 : }
492 :
493 796392 : int err = fd_io_buffered_write( checkpt->fd, buf, sz, checkpt->wbuf.mem, checkpt->wbuf.sz, &wbuf_used );
494 796392 : if( FD_UNLIKELY( err ) ) {
495 0 : FD_LOG_WARNING(( "fd_io_buffered_write failed (%i-%s)", err, fd_io_strerror( err ) ));
496 0 : checkpt->frame_style = -1; /* failed */
497 0 : return FD_CHECKPT_ERR_IO;
498 0 : }
499 :
500 796392 : if( FD_UNLIKELY( wsz_max<wbuf_used ) ) {
501 0 : FD_LOG_WARNING(( "unexpected buffered write size" ));
502 0 : checkpt->frame_style = -1; /* failed */
503 0 : return FD_CHECKPT_ERR_IO;
504 0 : }
505 :
506 796392 : ulong wsz = wsz_max - wbuf_used;
507 :
508 796392 : off += wsz;
509 796392 : if( FD_UNLIKELY( off<wsz ) ) {
510 0 : FD_LOG_WARNING(( "checkpt sz overflow" ));
511 0 : checkpt->frame_style = -1; /* failed */
512 0 : return FD_CHECKPT_ERR_IO;
513 0 : }
514 :
515 796392 : checkpt->wbuf.used = wbuf_used;
516 :
517 796392 : }
518 :
519 1593909 : break;
520 1593915 : }
521 :
522 1593909 : # if FD_HAS_LZ4
523 1602723 : case FD_CHECKPT_FRAME_STYLE_LZ4: {
524 1602723 : LZ4_stream_t * lz4 = (LZ4_stream_t *)checkpt->lz4;
525 :
526 1602723 : if( fd_checkpt_is_mmio( checkpt ) ) { /* mmio mode, app dependent branch prob */
527 :
528 801918 : uchar * mmio = checkpt->mmio.mem;
529 801918 : ulong mmio_sz = checkpt->mmio.sz;
530 :
531 801918 : uchar const * chunk = (uchar const *)buf;
532 866148 : do {
533 866148 : ulong chunk_usz = fd_ulong_min( sz, FD_CHECKPT_PRIVATE_CHUNK_USZ_MAX );
534 :
535 866148 : ulong chunk_csz = fd_checkpt_private_lz4( lz4, chunk, chunk_usz, mmio + off, mmio_sz - off,
536 866148 : checkpt->gbuf, FD_CHECKPT_PRIVATE_GBUF_SZ, FD_CHECKPT_META_MAX,
537 866148 : &checkpt->gbuf_cursor ); /* logs details */
538 866148 : if( FD_UNLIKELY( !chunk_csz ) ) {
539 3 : checkpt->frame_style = -1; /* failed */
540 3 : return FD_CHECKPT_ERR_COMP;
541 3 : }
542 :
543 866145 : off += chunk_csz; /* at most mmio_sz */
544 :
545 866145 : chunk += chunk_usz;
546 866145 : sz -= chunk_usz;
547 866145 : } while( sz );
548 :
549 801918 : } else { /* streaming mode */
550 :
551 800805 : int fd = checkpt->fd;
552 800805 : uchar * wbuf = checkpt->wbuf.mem;
553 800805 : ulong wbuf_sz = checkpt->wbuf.sz;
554 800805 : ulong wbuf_used = checkpt->wbuf.used;
555 :
556 800805 : uchar const * chunk = (uchar const *)buf;
557 862899 : do {
558 862899 : ulong chunk_usz = fd_ulong_min( sz, FD_CHECKPT_PRIVATE_CHUNK_USZ_MAX );
559 :
560 : /* If we are not guaranteed to have enough room in the write
561 : buffer to hold the compressed chunk, flush it to make room. */
562 :
563 862899 : ulong chunk_csz_max = FD_CHECKPT_PRIVATE_CSZ_MAX( chunk_usz );
564 862899 : ulong wbuf_free = wbuf_sz - wbuf_used;
565 862899 : if( FD_UNLIKELY( chunk_csz_max > wbuf_free ) ) {
566 :
567 16389 : ulong wsz;
568 16389 : int err = fd_io_write( fd, wbuf, wbuf_used, wbuf_used, &wsz );
569 16389 : if( FD_UNLIKELY( err ) ) {
570 0 : FD_LOG_WARNING(( "fd_io_write failed (%i-%s)", err, fd_io_strerror( err ) ));
571 0 : checkpt->frame_style = -1; /* failed */
572 0 : return FD_CHECKPT_ERR_IO;
573 0 : }
574 :
575 16389 : off += wsz;
576 16389 : if( FD_UNLIKELY( off<wsz ) ) {
577 0 : FD_LOG_WARNING(( "checkpt sz overflow" ));
578 0 : checkpt->frame_style = -1; /* failed */
579 0 : return FD_CHECKPT_ERR_IO;
580 0 : }
581 :
582 16389 : wbuf_used = 0UL;
583 16389 : wbuf_free = wbuf_sz; /* >= WBUF_MIN >= CSZ_MAX( CHUNK_USZ_MAX ) >= CSZ_MAX( chunk_usz ) */
584 :
585 16389 : }
586 :
587 : /* At this point, wbuf_free >= chunk_csz_max */
588 :
589 862899 : ulong chunk_csz = fd_checkpt_private_lz4( lz4, chunk, chunk_usz, wbuf + wbuf_used, wbuf_free,
590 862899 : checkpt->gbuf, FD_CHECKPT_PRIVATE_GBUF_SZ, FD_CHECKPT_META_MAX,
591 862899 : &checkpt->gbuf_cursor ); /* logs details */
592 862899 : if( FD_UNLIKELY( !chunk_csz ) ) {
593 0 : checkpt->frame_style = -1; /* failed */
594 0 : return FD_CHECKPT_ERR_COMP;
595 0 : }
596 :
597 862899 : wbuf_used += chunk_csz;
598 :
599 862899 : chunk += chunk_usz;
600 862899 : sz -= chunk_usz;
601 :
602 862899 : } while( sz );
603 :
604 800805 : checkpt->wbuf.used = wbuf_used;
605 :
606 800805 : }
607 :
608 1602720 : break;
609 1602723 : }
610 1602720 : # endif
611 :
612 1602720 : default: { /* never get here */
613 0 : FD_LOG_WARNING(( "unsupported frame style" ));
614 0 : checkpt->frame_style = -1; /* failed */
615 0 : return FD_CHECKPT_ERR_UNSUP;
616 1602723 : }
617 :
618 3196638 : }
619 :
620 3196629 : checkpt->off = off;
621 3196629 : return FD_CHECKPT_SUCCESS;
622 3196638 : }
623 :
624 : int
625 : fd_checkpt_meta( fd_checkpt_t * checkpt,
626 : void const * buf,
627 201 : ulong sz ) {
628 201 : return fd_checkpt_private_buf( checkpt, buf, sz, FD_CHECKPT_META_MAX );
629 201 : }
630 :
631 : int
632 : fd_checkpt_data( fd_checkpt_t * checkpt,
633 : void const * buf,
634 3590385 : ulong sz ) {
635 : /* TODO: optimize sz <= META_MAX better? */
636 3590385 : return fd_checkpt_private_buf( checkpt, buf, sz, ULONG_MAX );
637 3590385 : }
|