Line data Source code
1 : #include "fd_checkpt.h"
2 :
3 : char const *
4 18 : fd_checkpt_strerror( int err ) {
5 18 : switch( err ) {
6 3 : case FD_CHECKPT_SUCCESS: return "success";
7 3 : case FD_CHECKPT_ERR_INVAL: return "bad input args";
8 3 : case FD_CHECKPT_ERR_UNSUP: return "unsupported on this target";
9 3 : case FD_CHECKPT_ERR_IO: return "io error";
10 3 : case FD_CHECKPT_ERR_COMP: return "compression error";
11 3 : default: break;
12 18 : }
13 3 : return "unknown";
14 18 : }
15 :
16 : #if FD_HAS_LZ4
17 : #include <lz4.h>
18 :
19 : /* fd_checkpt_private_lz4 compresses the ubuf_usz byte size memory
20 : region pointed to by ubuf into the cbuf_max memory region pointed to
21 : by cbuf using the given lz4 compressor. Assumes lz4, ubuf and cbuf
22 : are valid. On success, returns the compressed size (will be in
23 : [4,cbuf_max]). The ubuf passed to this should not be modified
24 : until the given lz4 stream is reset / closed or there has been an
25 : additional 64 KiB passed to the stream. On failure, returns 0 and
26 : retains no interest in ubuf. In, either case, this retains no
27 : interest in cbuf on return.
28 :
29 : _gbuf, gbuf_sz, gbuf_thresh, _gbuf_cursor specify the small buf
30 : gather ring state. It is detailed below. */
31 :
32 : static ulong
33 : fd_checkpt_private_lz4( LZ4_stream_t * lz4,
34 : void const * _ubuf,
35 : ulong ubuf_usz,
36 : void * _cbuf,
37 : ulong cbuf_max,
38 : void * _gbuf,
39 : ulong gbuf_sz,
40 : ulong gbuf_thresh,
41 1681794 : ulong * _gbuf_cursor ) {
42 1681794 : char * cbuf = (char *) _cbuf;
43 1681794 : char const * ubuf = (char const *)_ubuf;
44 :
45 : /* Verify ubuf_usz is in [1,LZ4_MAX_INPUT_SIZE] and cbuf_max is large
46 : enough to store a header and a non-trivial compressed body. */
47 :
48 1681794 : if( FD_UNLIKELY( !((1UL<=ubuf_usz) & (ubuf_usz<=(ulong)LZ4_MAX_INPUT_SIZE)) ) ) {
49 0 : FD_LOG_WARNING(( "bad ubuf_usz" ));
50 0 : return 0UL;
51 0 : }
52 :
53 1681794 : if( FD_UNLIKELY( cbuf_max<4UL ) ) {
54 0 : FD_LOG_WARNING(( "not enough room to compress" ));
55 0 : return 0UL;
56 0 : }
57 :
58 : /* Small ubuf gather optimization. Though the LZ4 streaming API looks
59 : like it is designed for scatter/gather operation, the
60 : implementation under the hood is heavily optimized for the case the
61 : incoming data buffers are stored in a ring buffer (basically, the
62 : compression dictionary has a size of the most recent 64 KiB of
63 : _contiguous_ _in_ _memory_ buffers passed to it ... see
64 : lz4-1.9.4@lz4/lib/lz4.c:2636-2665 for an example).
65 :
66 : When a buffer >> 64 KiB is checkpointed, it will be compressed as
67 : CHUNK_USZ sequential chunks contiguous in memory. So outside of
68 : minor startup effects (where the initial dictionary might not be as
69 : large as it could have been), this case is optimal.
70 :
71 : But when lots of disjoint tiny buffers << 64 KiB are checkpointed,
72 : LZ4 is constantly reseting its dictionary to only use the most
73 : recently previously compressed (tiny) buffer. This case is
74 : suboptimal.
75 :
76 : At the same time, we don't want to use a ring buffer because that
77 : would imply an extra copy when compressing large data. This is a
78 : complete waste because that case was already optimal. And this is
79 : the most important case for high performance.
80 :
81 : Below, if the incoming buffer to compress is large enough
82 : (>=thresh), we compress it in place as it will be optimal as
83 : before.
84 :
85 : If not, we first copy it into a gather buffer and have LZ4 compress
86 : out of the gather buffer location. Then, when compressing lots of
87 : tiny buffer disjoint buffers, it will appear to LZ4 as though they
88 : were contiguous in memory and LZ4 will handle that optimally too.
89 :
90 : Then our dictionary size is optimal in both asymptotic regimes and
91 : we are still zero copy in the important case of compressing large
92 : data. The dictionary will also be reasonable when toggling
93 : frequently between the asymptotic regimes, as often happens in
94 : checkpointing (small metadata checkpts/large checkpt/small metadata
95 : checkpts/large data checkpt/...).
96 :
97 : The lz4 API streaming API requires up to the most recent 64 KiB of
98 : uncompressed bytes to be unmodified when called. Suppose we have
99 : only been compressing small buffers and we are trying to compress a
100 : thresh-1 byte buffer when only thresh-2 bytes of gather buffer
101 : space remains. Since we wrap at buffer granularity, we will need
102 : to put the thresh-1 bytes at the head of the buffer. To ensure
103 : this doesn't clobber any of the 64 KiB previously compressed, we
104 : need a gather buffer at least:
105 :
106 : thresh-1 + 64KiB + thresh-2 = 2 thresh + 64 KiB - 3
107 :
108 : in size. Larger is fine. Smaller will break compression.
109 :
110 : We do the corresponding in the restore and the restore
111 : configuration must match our checkpt configuration exactly in order
112 : to keep the dictionaries on both sides synchronized.
113 :
114 : TL;DR We store small buffers into a gather ring at buffer
115 : granularity for better compression and compress large buffers in
116 : place for extra performance due to the details of how LZ4 stream
117 : APIs are implemented. */
118 :
119 1681794 : int is_small = ubuf_usz<gbuf_thresh;
120 1681794 : if( is_small ) { /* app dependent branch prob */
121 1585695 : ulong gbuf_cursor = *_gbuf_cursor;
122 1585695 : if( (gbuf_sz-gbuf_cursor)<ubuf_usz ) gbuf_cursor = 0UL; /* cmov */
123 1585695 : ubuf = (char *)_gbuf + gbuf_cursor;
124 1585695 : *_gbuf_cursor = gbuf_cursor + ubuf_usz;
125 1585695 : memcpy( (char *)ubuf, _ubuf, ubuf_usz );
126 1585695 : }
127 :
128 : /* Compress ubuf into cbuf, leaving space for the header. Compression
129 : will fail if there is no room to the resulting compressed size into
130 : the header as we clamp the capacity to 2^24-1. */
131 :
132 1681794 : ulong ubuf_csz_max = fd_ulong_min( cbuf_max-3UL, (1UL<<24)-1UL ); /* In [1,2^24) */
133 :
134 1681794 : int _ubuf_csz = LZ4_compress_fast_continue( lz4, ubuf, cbuf+3UL, (int)ubuf_usz, (int)ubuf_csz_max, 1 /* default */ );
135 1681794 : if( FD_UNLIKELY( _ubuf_csz<=0 ) ) {
136 3 : FD_LOG_WARNING(( "LZ4_compress_fast_continue error (%i)", _ubuf_csz ));
137 3 : return 0UL;
138 3 : }
139 :
140 1681791 : ulong ubuf_csz = (ulong)_ubuf_csz;
141 1681791 : if( FD_UNLIKELY( ubuf_csz>ubuf_csz_max ) ) {
142 0 : FD_LOG_WARNING(( "unexpected compressed size" ));
143 0 : return 0UL;
144 0 : }
145 :
146 : /* Write compressed size we obtained into the header as a 24-bit
147 : little endian unsigned integer. This need to do this is a
148 : limitation of how the recent LZ4 APIs (>=1.9) work. */
149 :
150 1681791 : cbuf[0] = (char)( ubuf_csz & 255UL);
151 1681791 : cbuf[1] = (char)((ubuf_csz>> 8) & 255UL);
152 1681791 : cbuf[2] = (char)((ubuf_csz>>16) & 255UL);
153 :
154 1681791 : return ubuf_csz + 3UL;
155 1681791 : }
156 : #endif
157 :
158 : fd_checkpt_t *
159 : fd_checkpt_init_stream( void * mem,
160 : int fd,
161 : void * wbuf,
162 15048 : ulong wbuf_sz ) {
163 :
164 : /* Check input args */
165 :
166 15048 : if( FD_UNLIKELY( !mem ) ) {
167 3 : FD_LOG_WARNING(( "NULL mem" ));
168 3 : return NULL;
169 3 : }
170 :
171 15045 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, FD_CHECKPT_ALIGN ) ) ) {
172 3 : FD_LOG_WARNING(( "misaligned mem" ));
173 3 : return NULL;
174 3 : }
175 :
176 15042 : if( FD_UNLIKELY( fd<0 ) ) {
177 3 : FD_LOG_WARNING(( "bad fd" ));
178 3 : return NULL;
179 3 : }
180 :
181 15039 : if( FD_UNLIKELY( !wbuf ) ) {
182 3 : FD_LOG_WARNING(( "NULL wbuf" ));
183 3 : return NULL;
184 3 : }
185 :
186 15036 : if( FD_UNLIKELY( wbuf_sz<FD_CHECKPT_WBUF_MIN ) ) {
187 3 : FD_LOG_WARNING(( "wbuf_sz too small" ));
188 3 : return NULL;
189 3 : }
190 :
191 : /* Create the compressor */
192 :
193 15033 : # if FD_HAS_LZ4
194 15033 : LZ4_stream_t * lz4 = LZ4_createStream();
195 15033 : if( FD_UNLIKELY( !lz4 ) ) {
196 0 : FD_LOG_WARNING(( "lz4 error" ));
197 0 : return NULL;
198 0 : }
199 : # else
200 : void * lz4 = NULL;
201 : # endif
202 :
203 : /* Init the checkpt */
204 :
205 15033 : fd_checkpt_t * checkpt = (fd_checkpt_t *)mem;
206 :
207 15033 : checkpt->fd = fd; /* streaming mode */
208 15033 : checkpt->frame_style = 0; /* not in frame */
209 15033 : checkpt->lz4 = (void *)lz4;
210 15033 : checkpt->off = 0UL;
211 15033 : checkpt->wbuf.mem = (uchar *)wbuf;
212 15033 : checkpt->wbuf.sz = wbuf_sz;
213 15033 : checkpt->wbuf.used = 0UL;
214 :
215 15033 : return checkpt;
216 15033 : }
217 :
218 : fd_checkpt_t *
219 : fd_checkpt_init_mmio( void * mem,
220 : void * mmio,
221 15051 : ulong mmio_sz ) {
222 :
223 : /* Check input args */
224 :
225 15051 : if( FD_UNLIKELY( !mem ) ) {
226 3 : FD_LOG_WARNING(( "NULL mem" ));
227 3 : return NULL;
228 3 : }
229 :
230 15048 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, FD_CHECKPT_ALIGN ) ) ) {
231 3 : FD_LOG_WARNING(( "misaligned mem" ));
232 3 : return NULL;
233 3 : }
234 :
235 15045 : if( FD_UNLIKELY( (!mmio) & (!!mmio_sz) ) ) {
236 3 : FD_LOG_WARNING(( "NULL mmio" ));
237 3 : return NULL;
238 3 : }
239 :
240 : /* Create the compressor */
241 :
242 15042 : # if FD_HAS_LZ4
243 15042 : LZ4_stream_t * lz4 = LZ4_createStream();
244 15042 : if( FD_UNLIKELY( !lz4 ) ) {
245 0 : FD_LOG_WARNING(( "lz4 error" ));
246 0 : return NULL;
247 0 : }
248 : # else
249 : void * lz4 = NULL;
250 : # endif
251 :
252 : /* Init the checkpt */
253 :
254 15042 : fd_checkpt_t * checkpt = (fd_checkpt_t *)mem;
255 :
256 15042 : checkpt->fd = -1; /* mmio mode */
257 15042 : checkpt->frame_style = 0; /* not in frame */
258 15042 : checkpt->lz4 = (void *)lz4;
259 15042 : checkpt->gbuf_cursor = 0UL;
260 15042 : checkpt->off = 0UL;
261 15042 : checkpt->mmio.mem = (uchar *)mmio;
262 15042 : checkpt->mmio.sz = mmio_sz;
263 :
264 15042 : return checkpt;
265 15042 : }
266 :
267 : void *
268 30093 : fd_checkpt_fini( fd_checkpt_t * checkpt ) {
269 :
270 30093 : if( FD_UNLIKELY( !checkpt ) ) {
271 6 : FD_LOG_WARNING(( "NULL checkpt" ));
272 6 : return NULL;
273 6 : }
274 :
275 30087 : if( FD_UNLIKELY( fd_checkpt_private_in_frame( checkpt ) ) ) {
276 12 : FD_LOG_WARNING(( "in a frame" ));
277 12 : checkpt->frame_style = -1; /* failed */
278 12 : return NULL;
279 12 : }
280 :
281 30075 : # if FD_HAS_LZ4
282 :
283 : /* Note: Though this this doesn't seem to be officially documented,
284 : the lz4-1.9.4@lz4/lib/lz4.c:1575) suggests that this always returns
285 : 0. That is, 0 is success and non-zero is failure. */
286 :
287 30075 : if( FD_UNLIKELY( LZ4_freeStream( (LZ4_stream_t *)checkpt->lz4 ) ) )
288 0 : FD_LOG_WARNING(( "LZ4 freeStream error, attempting to continue" ));
289 :
290 30075 : # endif
291 :
292 30075 : return (void *)checkpt;
293 30087 : }
294 :
295 : int
296 : fd_checkpt_frame_open_advanced( fd_checkpt_t * checkpt,
297 : int frame_style,
298 407457 : ulong * _off ) {
299 :
300 407457 : if( FD_UNLIKELY( !checkpt ) ) {
301 6 : FD_LOG_WARNING(( "NULL checkpt" ));
302 6 : return FD_CHECKPT_ERR_INVAL;
303 6 : }
304 :
305 407451 : if( FD_UNLIKELY( !fd_checkpt_private_can_open( checkpt ) ) ) {
306 6 : FD_LOG_WARNING(( "in a frame or failed" ));
307 6 : checkpt->frame_style = -1; /* failed */
308 6 : return FD_CHECKPT_ERR_INVAL;
309 6 : }
310 :
311 407445 : if( FD_UNLIKELY( !_off ) ) {
312 0 : FD_LOG_WARNING(( "NULL _off" ));
313 0 : checkpt->frame_style = -1; /* failed */
314 0 : return FD_CHECKPT_ERR_INVAL;
315 0 : }
316 :
317 407445 : frame_style = fd_int_if( !!frame_style, frame_style, FD_CHECKPT_FRAME_STYLE_DEFAULT );
318 :
319 407445 : switch( frame_style ) {
320 :
321 202869 : case FD_CHECKPT_FRAME_STYLE_RAW: {
322 202869 : break;
323 0 : }
324 :
325 0 : # if FD_HAS_LZ4
326 204570 : case FD_CHECKPT_FRAME_STYLE_LZ4: {
327 204570 : LZ4_resetStream_fast( (LZ4_stream_t *)checkpt->lz4 ); /* Note: no error code for this API */
328 204570 : checkpt->gbuf_cursor = 0UL;
329 204570 : break;
330 0 : }
331 0 : # endif
332 :
333 6 : default: {
334 6 : FD_LOG_WARNING(( "unsupported frame_style" ));
335 6 : checkpt->frame_style = -1; /* failed */
336 6 : return FD_CHECKPT_ERR_UNSUP;
337 0 : }
338 :
339 407445 : }
340 :
341 407439 : checkpt->frame_style = frame_style;
342 :
343 407439 : *_off = checkpt->off;
344 407439 : return FD_CHECKPT_SUCCESS;
345 407445 : }
346 :
347 : int
348 : fd_checkpt_frame_close_advanced( fd_checkpt_t * checkpt,
349 407424 : ulong * _off ) {
350 :
351 407424 : if( FD_UNLIKELY( !checkpt ) ) {
352 6 : FD_LOG_WARNING(( "NULL checkpt" ));
353 6 : return FD_CHECKPT_ERR_INVAL;
354 6 : }
355 :
356 407418 : if( FD_UNLIKELY( !fd_checkpt_private_in_frame( checkpt ) ) ) {
357 6 : FD_LOG_WARNING(( "not in a frame" ));
358 6 : checkpt->frame_style = -1; /* failed */
359 6 : return FD_CHECKPT_ERR_INVAL;
360 6 : }
361 :
362 407412 : if( FD_UNLIKELY( !_off ) ) {
363 0 : FD_LOG_WARNING(( "NULL _off" ));
364 0 : checkpt->frame_style = -1; /* failed */
365 0 : return FD_CHECKPT_ERR_INVAL;
366 0 : }
367 :
368 407412 : ulong off = checkpt->off;
369 :
370 407412 : if( fd_checkpt_private_is_mmio( checkpt ) ) { /* mmio mode (app dependent branch prob) */
371 :
372 : /* Nothing to do */
373 :
374 203997 : } else { /* streaming mode */
375 :
376 : /* Flush out all pending bytes for this frame */
377 :
378 203997 : ulong wbuf_used = checkpt->wbuf.used;
379 :
380 203997 : if( FD_LIKELY( wbuf_used ) ) {
381 :
382 176262 : ulong wsz;
383 176262 : int err = fd_io_write( checkpt->fd, checkpt->wbuf.mem, wbuf_used, wbuf_used, &wsz );
384 176262 : if( FD_UNLIKELY( err ) ) {
385 0 : FD_LOG_WARNING(( "fd_io_write failed (%i-%s)", err, fd_io_strerror( err ) ));
386 0 : checkpt->frame_style = -1; /* failed */
387 0 : return FD_CHECKPT_ERR_IO;
388 0 : }
389 :
390 176262 : off += wsz;
391 176262 : if( FD_UNLIKELY( off<wsz ) ) {
392 0 : FD_LOG_WARNING(( "checkpt sz overflow" ));
393 0 : checkpt->frame_style = -1; /* failed */
394 0 : return FD_CHECKPT_ERR_IO;
395 0 : }
396 :
397 176262 : }
398 :
399 203997 : checkpt->wbuf.used = 0UL;
400 :
401 203997 : }
402 :
403 407412 : checkpt->off = off;
404 407412 : checkpt->frame_style = 0; /* not in frame */
405 :
406 407412 : *_off = off;
407 407412 : return FD_CHECKPT_SUCCESS;
408 407412 : }
409 :
410 : int
411 : fd_checkpt_buf( fd_checkpt_t * checkpt,
412 : void const * buf,
413 3554178 : ulong sz ) {
414 :
415 3554178 : if( FD_UNLIKELY( !checkpt ) ) {
416 6 : FD_LOG_WARNING(( "NULL checkpt" ));
417 6 : return FD_CHECKPT_ERR_INVAL;
418 6 : }
419 :
420 3554172 : if( FD_UNLIKELY( !fd_checkpt_private_in_frame( checkpt ) ) ) {
421 18 : FD_LOG_WARNING(( "not in a frame" ));
422 18 : checkpt->frame_style = -1; /* failed */
423 18 : return FD_CHECKPT_ERR_INVAL;
424 18 : }
425 :
426 3554154 : if( FD_UNLIKELY( !sz ) ) return FD_CHECKPT_SUCCESS; /* nothing to do */
427 :
428 3159726 : if( FD_UNLIKELY( !buf ) ) {
429 0 : FD_LOG_WARNING(( "NULL buf with non-zero sz" ));
430 0 : checkpt->frame_style = -1; /* failed */
431 0 : return FD_CHECKPT_ERR_INVAL;
432 0 : }
433 :
434 3159726 : ulong off = checkpt->off;
435 :
436 3159726 : switch( checkpt->frame_style ) {
437 :
438 1570899 : case FD_CHECKPT_FRAME_STYLE_RAW: {
439 :
440 1570899 : if( fd_checkpt_private_is_mmio( checkpt ) ) { /* mmio mode (app dependent branch prob) */
441 :
442 780414 : if( FD_UNLIKELY( sz > (checkpt->mmio.sz-off) ) ) {
443 6 : FD_LOG_WARNING(( "mmio_sz too small" ));
444 6 : checkpt->frame_style = -1; /* failed */
445 6 : return FD_CHECKPT_ERR_IO;
446 6 : }
447 :
448 780408 : memcpy( checkpt->mmio.mem + off, buf, sz );
449 :
450 780408 : off += sz; /* at most mmio.sz */
451 :
452 790485 : } else { /* streaming mode */
453 :
454 790485 : ulong wbuf_used = checkpt->wbuf.used;
455 :
456 790485 : ulong wsz_max = wbuf_used + sz;
457 790485 : if( FD_UNLIKELY( wsz_max<sz ) ) {
458 0 : FD_LOG_WARNING(( "sz overflow" ));
459 0 : checkpt->frame_style = -1; /* failed */
460 0 : return FD_CHECKPT_ERR_IO;
461 0 : }
462 :
463 790485 : int err = fd_io_buffered_write( checkpt->fd, buf, sz, checkpt->wbuf.mem, checkpt->wbuf.sz, &wbuf_used );
464 790485 : if( FD_UNLIKELY( err ) ) {
465 0 : FD_LOG_WARNING(( "fd_io_buffered_write failed (%i-%s)", err, fd_io_strerror( err ) ));
466 0 : checkpt->frame_style = -1; /* failed */
467 0 : return FD_CHECKPT_ERR_IO;
468 0 : }
469 :
470 790485 : if( FD_UNLIKELY( wsz_max<wbuf_used ) ) {
471 0 : FD_LOG_WARNING(( "unexpected buffered write size" ));
472 0 : checkpt->frame_style = -1; /* failed */
473 0 : return FD_CHECKPT_ERR_IO;
474 0 : }
475 :
476 790485 : ulong wsz = wsz_max - wbuf_used;
477 :
478 790485 : off += wsz;
479 790485 : if( FD_UNLIKELY( off<wsz ) ) {
480 0 : FD_LOG_WARNING(( "checkpt sz overflow" ));
481 0 : checkpt->frame_style = -1; /* failed */
482 0 : return FD_CHECKPT_ERR_IO;
483 0 : }
484 :
485 790485 : checkpt->wbuf.used = wbuf_used;
486 :
487 790485 : }
488 :
489 1570893 : break;
490 1570899 : }
491 :
492 1570893 : # if FD_HAS_LZ4
493 1588827 : case FD_CHECKPT_FRAME_STYLE_LZ4: {
494 1588827 : LZ4_stream_t * lz4 = (LZ4_stream_t *)checkpt->lz4;
495 :
496 1588827 : if( fd_checkpt_private_is_mmio( checkpt ) ) { /* mmio mode, app dependent branch prob */
497 :
498 797334 : uchar * mmio = checkpt->mmio.mem;
499 797334 : ulong mmio_sz = checkpt->mmio.sz;
500 :
501 797334 : uchar const * chunk = (uchar const *)buf;
502 844884 : do {
503 844884 : ulong chunk_usz = fd_ulong_min( sz, FD_CHECKPT_PRIVATE_CHUNK_USZ_MAX );
504 :
505 844884 : ulong chunk_csz = fd_checkpt_private_lz4( lz4, chunk, chunk_usz, mmio + off, mmio_sz - off,
506 844884 : checkpt->gbuf, FD_CHECKPT_PRIVATE_GBUF_SZ, FD_CHECKPT_PRIVATE_GBUF_THRESH,
507 844884 : &checkpt->gbuf_cursor ); /* logs details */
508 844884 : if( FD_UNLIKELY( !chunk_csz ) ) {
509 3 : checkpt->frame_style = -1; /* failed */
510 3 : return FD_CHECKPT_ERR_COMP;
511 3 : }
512 :
513 844881 : off += chunk_csz; /* at most mmio_sz */
514 :
515 844881 : chunk += chunk_usz;
516 844881 : sz -= chunk_usz;
517 844881 : } while( sz );
518 :
519 797334 : } else { /* streaming mode */
520 :
521 791493 : int fd = checkpt->fd;
522 791493 : uchar * wbuf = checkpt->wbuf.mem;
523 791493 : ulong wbuf_sz = checkpt->wbuf.sz;
524 791493 : ulong wbuf_used = checkpt->wbuf.used;
525 :
526 791493 : uchar const * chunk = (uchar const *)buf;
527 836910 : do {
528 836910 : ulong chunk_usz = fd_ulong_min( sz, FD_CHECKPT_PRIVATE_CHUNK_USZ_MAX );
529 :
530 : /* If we are not guaranteed to have enough room in the write
531 : buffer to hold the compressed chunk, flush it to make room. */
532 :
533 836910 : ulong chunk_csz_max = FD_CHECKPT_PRIVATE_CSZ_MAX( chunk_usz );
534 836910 : ulong wbuf_free = wbuf_sz - wbuf_used;
535 836910 : if( FD_UNLIKELY( chunk_csz_max > wbuf_free ) ) {
536 :
537 12642 : ulong wsz;
538 12642 : int err = fd_io_write( fd, wbuf, wbuf_used, wbuf_used, &wsz );
539 12642 : if( FD_UNLIKELY( err ) ) {
540 0 : FD_LOG_WARNING(( "fd_io_write failed (%i-%s)", err, fd_io_strerror( err ) ));
541 0 : checkpt->frame_style = -1; /* failed */
542 0 : return FD_CHECKPT_ERR_IO;
543 0 : }
544 :
545 12642 : off += wsz;
546 12642 : if( FD_UNLIKELY( off<wsz ) ) {
547 0 : FD_LOG_WARNING(( "checkpt sz overflow" ));
548 0 : checkpt->frame_style = -1; /* failed */
549 0 : return FD_CHECKPT_ERR_IO;
550 0 : }
551 :
552 12642 : wbuf_used = 0UL;
553 12642 : wbuf_free = wbuf_sz; /* >= WBUF_MIN >= CSZ_MAX( CHUNK_USZ_MAX ) >= CSZ_MAX( chunk_usz ) */
554 :
555 12642 : }
556 :
557 : /* At this point, wbuf_free >= chunk_csz_max */
558 :
559 836910 : ulong chunk_csz = fd_checkpt_private_lz4( lz4, chunk, chunk_usz, wbuf + wbuf_used, wbuf_free,
560 836910 : checkpt->gbuf, FD_CHECKPT_PRIVATE_GBUF_SZ, FD_CHECKPT_PRIVATE_GBUF_THRESH,
561 836910 : &checkpt->gbuf_cursor ); /* logs details */
562 836910 : if( FD_UNLIKELY( !chunk_csz ) ) {
563 0 : checkpt->frame_style = -1; /* failed */
564 0 : return FD_CHECKPT_ERR_COMP;
565 0 : }
566 :
567 836910 : wbuf_used += chunk_csz;
568 :
569 836910 : chunk += chunk_usz;
570 836910 : sz -= chunk_usz;
571 :
572 836910 : } while( sz );
573 :
574 791493 : checkpt->wbuf.used = wbuf_used;
575 :
576 791493 : }
577 :
578 1588824 : break;
579 1588827 : }
580 1588824 : # endif
581 :
582 1588824 : default: { /* never get here */
583 0 : FD_LOG_WARNING(( "unsupported frame style" ));
584 0 : checkpt->frame_style = -1; /* failed */
585 0 : return FD_CHECKPT_ERR_UNSUP;
586 1588827 : }
587 :
588 3159726 : }
589 :
590 3159717 : checkpt->off = off;
591 3159717 : return FD_CHECKPT_SUCCESS;
592 3159726 : }
|