Line data Source code
1 : /* The snapwr tile dispatches O_DIRECT writes of large (~O(10MiB))
2 : blocks to a vinyl bstream file. This tile practically only does
3 : blocking write(2) calls, which typically just yield to the kernel
4 : scheduler until I/O completes.
5 :
6 : Alternatives considered:
7 : - Doing blocking O_DIRECT writes in the snapin tile is possible, but
8 : starves the snapin tile off valuable CPU cycles while waiting for
9 : write completions.
10 : - Writing using the page cache (without O_DIRECT) similarly pipelines
11 : writes through background dirty cache flushing. Has a noticeable
12 : throughput cost.
13 : - io_uring with O_DIRECT has significantly lower latency (due to
14 : fewer per-op overhead, and thus smaller possible block sizes), and
15 : slightly better throughput. However, is much more complex, less
16 : portable, and less secure (harder to sandbox).
17 :
18 : While writing, under the hood, the following happens on a fast NVMe
19 : paired with an optimized file system (e.g. XFS):
20 : - Userland context switches to kernel context via pwrite64
21 : - Kernel sets up IOMMU page table entries, allowing NVMe device to
22 : read userland memory
23 : - Kernel sends write commands to NVMe device
24 : - Kernel suspends thread
25 : ...
26 : - NVMe device does DMA reads, writes to disk
27 : ...
28 : - NVMe device sends completions to kernel
29 : - Kernel removes IOMMU page table entries (might send IPIs ... sad)
30 : - Kernel swaps back to userland and resumes
31 : The above is a *lot* of overhead per-operation, which is the reason
32 : for multiple megabyte buffer sizes.
33 :
34 : The snapwr tile is thus expected to spend most of its time sleeping
35 : waiting for disk I/O to complete. The snapwr tile typically runs in
36 : "floating" mode. If there is no work to do, it saves power by going
37 : to sleep for 1 millisecond at a time.
38 :
39 : Accepted message descriptors:
40 :
41 : - ctl==FD_SNAPSHOT_MSG_DATA
42 : - chunk: compressed byte offset, relative to dcache data region (>>FD_CHUNK_LG_SZ)
43 : - sig: file offset to write to
44 : - sz: compressed write size (>>FD_VINYL_BSTREAM_BLOCK_LG_SZ)
45 :
46 : - ctl==FD_SNAPSHOT_MSG_CTRL_INIT_FULL
47 :
48 : - ctl==FD_SNAPSHOT_MSG_CTRL_INIT_INCR
49 : - sig: file offset to rewind "bytes written" metric to
50 :
51 : - ctl==FD_SNAPSHOT_MSG_CTRL_SHUTDOWN */
52 :
53 : #define _GNU_SOURCE /* O_DIRECT */
54 : #include "utils/fd_ssctrl.h"
55 : #include "../../disco/topo/fd_topo.h"
56 : #include "../../disco/metrics/fd_metrics.h"
57 : #include "../../vinyl/bstream/fd_vinyl_bstream.h"
58 : #include "generated/fd_snapwr_tile_seccomp.h"
59 :
60 : #include <errno.h>
61 : #include <sys/stat.h>
62 : #include <fcntl.h> /* open */
63 : #include <unistd.h> /* pwrite */
64 :
65 : #define NAME "snapwr"
66 :
67 : struct fd_snapwr {
68 : uint state;
69 : int dev_fd;
70 : ulong dev_sz;
71 : void const * base;
72 : ulong * seq_sync; /* fseq->seq[0] */
73 : uint idle_cnt;
74 :
75 : struct {
76 : ulong last_off;
77 : } metrics;
78 : };
79 :
80 : typedef struct fd_snapwr fd_snapwr_t;
81 :
82 : static ulong
83 0 : scratch_align( void ) {
84 0 : return alignof(fd_snapwr_t);
85 0 : }
86 :
87 : static ulong
88 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
89 0 : (void)tile;
90 0 : return sizeof(fd_snapwr_t);
91 0 : }
92 :
93 : static void
94 : privileged_init( fd_topo_t * topo,
95 0 : fd_topo_tile_t * tile ) {
96 0 : fd_snapwr_t * snapwr = fd_topo_obj_laddr( topo, tile->tile_obj_id );
97 0 : memset( snapwr, 0, sizeof(fd_snapwr_t) );
98 :
99 0 : char const * vinyl_path = tile->snapwr.vinyl_path;
100 0 : int vinyl_fd = open( vinyl_path, O_RDWR|O_DIRECT|O_CLOEXEC, 0644 );
101 0 : if( FD_UNLIKELY( vinyl_fd<0 ) ) FD_LOG_ERR(( "open(%s,O_RDWR|O_DIRECT|O_CLOEXEC,0644) failed (%i-%s)", vinyl_path, errno, strerror( errno ) ));
102 :
103 0 : struct stat st;
104 0 : if( FD_UNLIKELY( 0!=fstat( vinyl_fd, &st ) ) ) FD_LOG_ERR(( "fstat(%s) failed (%i-%s)", vinyl_path, errno, strerror( errno ) ));
105 :
106 0 : snapwr->dev_fd = vinyl_fd;
107 0 : snapwr->dev_sz = fd_ulong_align_dn( (ulong)st.st_size, FD_VINYL_BSTREAM_BLOCK_SZ );
108 0 : }
109 :
110 : static void
111 : unprivileged_init( fd_topo_t * topo,
112 0 : fd_topo_tile_t * tile ) {
113 0 : fd_snapwr_t * snapwr = fd_topo_obj_laddr( topo, tile->tile_obj_id );
114 0 : memset( &snapwr->metrics, 0, sizeof(snapwr->metrics) );
115 :
116 0 : if( FD_UNLIKELY( tile->kind_id ) ) FD_LOG_ERR(( "There can only be one `" NAME "` tile" ));
117 0 : if( FD_UNLIKELY( tile->in_cnt !=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 1", tile->in_cnt ));
118 0 : if( FD_UNLIKELY( tile->out_cnt!=0UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 0", tile->out_cnt ));
119 :
120 0 : if( FD_UNLIKELY( !tile->in_link_reliable[ 0 ] ) ) FD_LOG_ERR(( "tile `" NAME "` in link 0 must be reliable" ));
121 0 : FD_TEST( tile->snapwr.dcache_obj_id!=ULONG_MAX );
122 :
123 0 : uchar const * in_dcache = fd_dcache_join( fd_topo_obj_laddr( topo, tile->snapwr.dcache_obj_id ) );
124 0 : FD_TEST( in_dcache );
125 0 : snapwr->base = in_dcache;
126 0 : snapwr->seq_sync = tile->in_link_fseq[ 0 ];
127 :
128 0 : snapwr->state = FD_SNAPSHOT_STATE_IDLE;
129 0 : }
130 :
131 : static ulong
132 : populate_allowed_fds( fd_topo_t const * topo,
133 : fd_topo_tile_t const * tile,
134 : ulong out_fds_cnt,
135 0 : int * out_fds ) {
136 0 : if( FD_UNLIKELY( out_fds_cnt<3UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
137 0 : fd_snapwr_t const * snapwr = fd_topo_obj_laddr( topo, tile->tile_obj_id );
138 :
139 0 : ulong out_cnt = 0;
140 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
141 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
142 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
143 0 : }
144 0 : out_fds[ out_cnt++ ] = snapwr->dev_fd;
145 :
146 0 : return out_cnt;
147 0 : }
148 :
149 : static ulong
150 : populate_allowed_seccomp( fd_topo_t const * topo,
151 : fd_topo_tile_t const * tile,
152 : ulong out_cnt,
153 0 : struct sock_filter * out ) {
154 0 : fd_snapwr_t const * snapwr = fd_topo_obj_laddr( topo, tile->tile_obj_id );
155 0 : populate_sock_filter_policy_fd_snapwr_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)snapwr->dev_fd );
156 0 : return sock_filter_policy_fd_snapwr_tile_instr_cnt;
157 0 : }
158 :
159 : static int
160 0 : should_shutdown( fd_snapwr_t const * ctx ) {
161 0 : return ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN;
162 0 : }
163 :
164 : static void
165 : before_credit( fd_snapwr_t * ctx,
166 : fd_stem_context_t * stem,
167 0 : int * charge_busy ) {
168 0 : (void)stem;
169 0 : if( ++ctx->idle_cnt >= 1024U ) {
170 0 : fd_log_sleep( (long)1e6 ); /* 1 millisecond */
171 0 : *charge_busy = 0;
172 0 : ctx->idle_cnt = 0U;
173 0 : }
174 0 : }
175 :
176 : static void
177 0 : metrics_write( fd_snapwr_t * ctx ) {
178 0 : FD_MGAUGE_SET( SNAPWR, STATE, ctx->state );
179 0 : FD_MGAUGE_SET( SNAPWR, VINYL_BYTES_WRITTEN, ctx->metrics.last_off );
180 0 : }
181 :
182 : /* handle_control_frag handles an administrative frag from the snapin
183 : tile. */
184 :
185 : static void
186 : handle_control_frag( fd_snapwr_t * ctx,
187 : ulong meta_ctl,
188 0 : ulong meta_sig ) {
189 0 : switch( meta_ctl ) {
190 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
191 0 : ctx->metrics.last_off = 0UL;
192 0 : ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
193 0 : break;
194 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_INCR:
195 0 : ctx->metrics.last_off = meta_sig;
196 0 : ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
197 0 : break;
198 0 : case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
199 0 : ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN;
200 0 : break;
201 0 : default:
202 0 : FD_LOG_CRIT(( "received unexpected ssctrl msg type %lu", meta_ctl ));
203 0 : }
204 0 : }
205 :
206 : /* handle_data_frag handles a bstream block sz-aligned write request.
207 : Does a synchronous blocking O_DIRECT write. */
208 :
209 : static void
210 : handle_data_frag( fd_snapwr_t * ctx,
211 : ulong chunk, /* compressed input pointer */
212 : ulong dev_off, /* file offset */
213 0 : ulong sz_comp ) { /* compressed input size */
214 0 : ulong src_sz = sz_comp<<FD_VINYL_BSTREAM_BLOCK_LG_SZ;
215 0 : void const * src = fd_chunk_to_laddr_const( ctx->base, chunk );
216 0 : FD_CRIT( fd_ulong_is_aligned( (ulong)src, FD_VINYL_BSTREAM_BLOCK_SZ ), "misaligned write request" );
217 0 : FD_CRIT( fd_ulong_is_aligned( src_sz, FD_VINYL_BSTREAM_BLOCK_SZ ), "misaligned write request" );
218 0 : if( FD_UNLIKELY( dev_off+src_sz > ctx->dev_sz ) ) {
219 0 : FD_LOG_CRIT(( "vinyl bstream log is out of space" ));
220 0 : }
221 :
222 : /* Do a synchronous write(2) */
223 0 : ssize_t write_sz = pwrite( ctx->dev_fd, src, src_sz, (off_t)dev_off );
224 0 : if( FD_UNLIKELY( write_sz<0 ) ) {
225 0 : FD_LOG_ERR(( "pwrite(off=%lu,sz=%lu) failed (%i-%s)", dev_off, src_sz, errno, strerror( errno ) ));
226 0 : }
227 0 : ctx->metrics.last_off = dev_off+src_sz;
228 0 : }
229 :
230 : static int
231 : during_frag( fd_snapwr_t * ctx,
232 : ulong in_idx,
233 : ulong meta_seq,
234 : ulong meta_sig,
235 : ulong meta_chunk,
236 : ulong meta_sz,
237 0 : ulong meta_ctl ) {
238 0 : (void)in_idx;
239 0 : ctx->idle_cnt = 0U;
240 :
241 0 : if( FD_UNLIKELY( meta_ctl==FD_SNAPSHOT_MSG_DATA ) ) {
242 0 : handle_data_frag( ctx, meta_chunk, meta_sig, meta_sz );
243 0 : } else {
244 0 : handle_control_frag( ctx, meta_ctl, meta_sig );
245 0 : }
246 :
247 : /* Because snapwr pacing is so loose and this tile sleeps, fd_stem
248 : will not return flow control credits fast enough.
249 : So, always update fseq (consumer progress) here. */
250 0 : fd_fseq_update( ctx->seq_sync, fd_seq_inc( meta_seq, 1UL ) );
251 :
252 0 : return 0;
253 0 : }
254 :
255 0 : #define STEM_BURST 1UL
256 0 : #define STEM_LAZY ((long)2e6)
257 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snapwr_t
258 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapwr_t)
259 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
260 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
261 0 : #define STEM_CALLBACK_BEFORE_CREDIT before_credit
262 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
263 :
264 : #include "../../disco/stem/fd_stem.c"
265 :
266 : fd_topo_run_tile_t fd_tile_snapwr = {
267 : .name = NAME,
268 : .populate_allowed_fds = populate_allowed_fds,
269 : .populate_allowed_seccomp = populate_allowed_seccomp,
270 : .scratch_align = scratch_align,
271 : .scratch_footprint = scratch_footprint,
272 : .privileged_init = privileged_init,
273 : .unprivileged_init = unprivileged_init,
274 : .run = stem_run
275 : };
276 :
277 : #undef NAME
|