Line data Source code
1 : /* The snapwr tile dispatches O_DIRECT writes of large (~O(10MiB))
2 : blocks to a vinyl bstream file. This tile practically only does
3 : blocking write(2) calls, which typically just yield to the kernel
4 : scheduler until I/O completes.
5 :
6 : Alternatives considered:
7 : - Doing blocking O_DIRECT writes in the snapin tile is possible, but
8 : starves the snapin tile off valuable CPU cycles while waiting for
9 : write completions.
10 : - Writing using the page cache (without O_DIRECT) similarly pipelines
11 : writes through background dirty cache flushing. Has a noticeable
12 : throughput cost.
13 : - io_uring with O_DIRECT has significantly lower latency (due to
14 : fewer per-op overhead, and thus smaller possible block sizes), and
15 : slightly better throughput. However, is much more complex, less
16 : portable, and less secure (harder to sandbox).
17 :
18 : While writing, under the hood, the following happens on a fast NVMe
19 : paired with an optimized file system (e.g. XFS):
20 : - Userland context switches to kernel context via pwrite64
21 : - Kernel sets up IOMMU page table entries, allowing NVMe device to
22 : read userland memory
23 : - Kernel sends write commands to NVMe device
24 : - Kernel suspends thread
25 : ...
26 : - NVMe device does DMA reads, writes to disk
27 : ...
28 : - NVMe device sends completions to kernel
29 : - Kernel removes IOMMU page table entries (might send IPIs ... sad)
30 : - Kernel swaps back to userland and resumes
31 : The above is a *lot* of overhead per-operation, which is the reason
32 : for multiple megabyte buffer sizes.
33 :
34 : The snapwr tile is thus expected to spend most of its time sleeping
35 : waiting for disk I/O to complete. The snapwr tile typically runs in
36 : "floating" mode. If there is no work to do, it saves power by going
37 : to sleep for 1 millisecond at a time.
38 :
39 : Accepted message descriptors:
40 :
41 : - ctl==FD_SNAPSHOT_MSG_DATA
42 : - chunk: compressed byte offset, relative to dcache data region (>>FD_CHUNK_LG_SZ)
43 : - sig: file offset to write to
44 : - sz: compressed write size (>>FD_VINYL_BSTREAM_BLOCK_LG_SZ)
45 :
46 : - ctl==FD_SNAPSHOT_MSG_CTRL_INIT_FULL
47 :
48 : - ctl==FD_SNAPSHOT_MSG_CTRL_INIT_INCR
49 : - sig: file offset to rewind "bytes written" metric to
50 :
51 : - ctl==FD_SNAPSHOT_MSG_CTRL_SHUTDOWN */
52 :
53 : #define _GNU_SOURCE /* O_DIRECT */
54 : #include "utils/fd_ssctrl.h"
55 : #include "../../disco/topo/fd_topo.h"
56 : #include "../../disco/metrics/fd_metrics.h"
57 : #include "../../vinyl/bstream/fd_vinyl_bstream.h"
58 : #include "generated/fd_snapwr_tile_seccomp.h"
59 :
60 : #include <errno.h>
61 : #include <sys/stat.h>
62 : #include <fcntl.h> /* open */
63 : #include <unistd.h> /* pwrite */
64 :
65 : #define NAME "snapwr"
66 :
67 : struct fd_snapwr {
68 : uint state;
69 : int dev_fd;
70 : ulong dev_sz;
71 : ulong dev_base;
72 : void const * base;
73 : ulong * seq_sync; /* fseq->seq[0] */
74 : uint idle_cnt;
75 : ulong * bstream_seq;
76 :
77 : ulong req_seen;
78 : ulong tile_cnt;
79 : ulong tile_idx;
80 :
81 : struct {
82 : ulong last_off;
83 : } metrics;
84 : };
85 :
86 : typedef struct fd_snapwr fd_snapwr_t;
87 :
88 : static ulong
89 0 : scratch_align( void ) {
90 0 : return alignof(fd_snapwr_t);
91 0 : }
92 :
93 : static ulong
94 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
95 0 : (void)tile;
96 0 : return sizeof(fd_snapwr_t);
97 0 : }
98 :
99 : static void
100 : privileged_init( fd_topo_t * topo,
101 0 : fd_topo_tile_t * tile ) {
102 0 : fd_snapwr_t * snapwr = fd_topo_obj_laddr( topo, tile->tile_obj_id );
103 0 : memset( snapwr, 0, sizeof(fd_snapwr_t) );
104 :
105 0 : char const * vinyl_path = tile->snapwr.vinyl_path;
106 0 : int vinyl_fd = open( vinyl_path, O_RDWR|O_DIRECT|O_CLOEXEC, 0644 );
107 0 : if( FD_UNLIKELY( vinyl_fd<0 ) ) FD_LOG_ERR(( "open(%s,O_RDWR|O_DIRECT|O_CLOEXEC,0644) failed (%i-%s)", vinyl_path, errno, strerror( errno ) ));
108 :
109 0 : struct stat st;
110 0 : if( FD_UNLIKELY( 0!=fstat( vinyl_fd, &st ) ) ) FD_LOG_ERR(( "fstat(%s) failed (%i-%s)", vinyl_path, errno, strerror( errno ) ));
111 :
112 0 : snapwr->dev_fd = vinyl_fd;
113 0 : snapwr->dev_sz = fd_ulong_align_dn( (ulong)st.st_size, FD_VINYL_BSTREAM_BLOCK_SZ );
114 0 : snapwr->dev_base = FD_VINYL_BSTREAM_BLOCK_SZ;
115 0 : }
116 :
117 : static void
118 : unprivileged_init( fd_topo_t * topo,
119 0 : fd_topo_tile_t * tile ) {
120 0 : fd_snapwr_t * snapwr = fd_topo_obj_laddr( topo, tile->tile_obj_id );
121 0 : memset( &snapwr->metrics, 0, sizeof(snapwr->metrics) );
122 :
123 0 : if( FD_UNLIKELY( tile->in_cnt < 1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 1 or 2", tile->in_cnt ));
124 0 : if( FD_UNLIKELY( tile->in_cnt > 2UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 1 or 2", tile->in_cnt ));
125 0 : if( FD_UNLIKELY( tile->out_cnt!=0UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 0", tile->out_cnt ));
126 :
127 0 : if( FD_UNLIKELY( !tile->in_link_reliable[ 0 ] ) ) FD_LOG_ERR(( "tile `" NAME "` in link 0 must be reliable" ));
128 0 : FD_TEST( tile->snapwr.dcache_obj_id!=ULONG_MAX );
129 :
130 0 : uchar const * in_dcache = fd_dcache_join( fd_topo_obj_laddr( topo, tile->snapwr.dcache_obj_id ) );
131 0 : FD_TEST( in_dcache );
132 0 : snapwr->base = in_dcache;
133 0 : snapwr->seq_sync = tile->in_link_fseq[ 0 ];
134 :
135 0 : snapwr->bstream_seq = NULL; /* set to NULL by default, before checking input links. */
136 0 : for( ulong i=0UL; i<tile->in_cnt; i++ ) {
137 0 : fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ i ] ];
138 :
139 0 : if( FD_LIKELY( 0==strcmp( in_link->name, "snapwh_wr" ) ) ) {
140 0 : if( FD_UNLIKELY( !tile->in_link_reliable[ i ] ) ) FD_LOG_ERR(( "tile `" NAME "` in link %lu must be reliable", i ));
141 :
142 0 : } else if( FD_LIKELY( 0==strcmp( in_link->name, "snaplv_wr" ) ) ) {
143 0 : if( FD_UNLIKELY( !tile->in_link_reliable[ i ] ) ) FD_LOG_ERR(( "tile `" NAME "` in link %lu must be reliable", i ));
144 0 : snapwr->bstream_seq = fd_mcache_seq_laddr( fd_mcache_join( fd_topo_obj_laddr( topo, in_link->mcache_obj_id ) ) ) + tile->kind_id;
145 0 : fd_mcache_seq_update( snapwr->bstream_seq, 0UL );
146 :
147 0 : } else {
148 0 : FD_LOG_ERR(( "tile `" NAME "` has unexpected in link name `%s`", in_link->name ));
149 0 : }
150 0 : }
151 :
152 0 : snapwr->state = FD_SNAPSHOT_STATE_IDLE;
153 :
154 0 : snapwr->req_seen = 0UL;
155 0 : snapwr->tile_cnt = fd_topo_tile_name_cnt( topo, "snapwr" );
156 0 : snapwr->tile_idx = tile->kind_id;
157 0 : }
158 :
159 : static ulong
160 : populate_allowed_fds( fd_topo_t const * topo,
161 : fd_topo_tile_t const * tile,
162 : ulong out_fds_cnt,
163 0 : int * out_fds ) {
164 0 : if( FD_UNLIKELY( out_fds_cnt<3UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
165 0 : fd_snapwr_t const * snapwr = fd_topo_obj_laddr( topo, tile->tile_obj_id );
166 :
167 0 : ulong out_cnt = 0;
168 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
169 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
170 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
171 0 : }
172 0 : out_fds[ out_cnt++ ] = snapwr->dev_fd;
173 :
174 0 : return out_cnt;
175 0 : }
176 :
177 : static ulong
178 : populate_allowed_seccomp( fd_topo_t const * topo,
179 : fd_topo_tile_t const * tile,
180 : ulong out_cnt,
181 0 : struct sock_filter * out ) {
182 0 : fd_snapwr_t const * snapwr = fd_topo_obj_laddr( topo, tile->tile_obj_id );
183 0 : populate_sock_filter_policy_fd_snapwr_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)snapwr->dev_fd );
184 0 : return sock_filter_policy_fd_snapwr_tile_instr_cnt;
185 0 : }
186 :
187 : static int
188 0 : should_shutdown( fd_snapwr_t const * ctx ) {
189 0 : return ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN;
190 0 : }
191 :
192 : static void
193 : before_credit( fd_snapwr_t * ctx,
194 : fd_stem_context_t * stem,
195 0 : int * charge_busy ) {
196 0 : (void)stem;
197 0 : if( ++ctx->idle_cnt >= 1024U ) {
198 0 : fd_log_sleep( (long)1e6 ); /* 1 millisecond */
199 0 : *charge_busy = 0;
200 0 : ctx->idle_cnt = 0U;
201 0 : }
202 0 : }
203 :
204 : static void
205 0 : metrics_write( fd_snapwr_t * ctx ) {
206 0 : FD_MGAUGE_SET( SNAPWR, STATE, ctx->state );
207 0 : FD_MGAUGE_SET( SNAPWR, VINYL_BYTES_WRITTEN, ctx->metrics.last_off );
208 0 : }
209 :
210 : /* handle_control_frag handles an administrative frag from the snapin
211 : tile. */
212 :
213 : static void
214 : handle_control_frag( fd_snapwr_t * ctx,
215 : ulong meta_ctl,
216 0 : ulong meta_sig ) {
217 0 : switch( meta_ctl ) {
218 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
219 0 : ctx->metrics.last_off = 0UL;
220 0 : ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
221 0 : break;
222 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_INCR:
223 0 : ctx->metrics.last_off = meta_sig;
224 0 : ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
225 0 : break;
226 0 : case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
227 0 : ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN;
228 0 : break;
229 0 : default:
230 0 : FD_LOG_CRIT(( "received unexpected ssctrl msg type %lu", meta_ctl ));
231 0 : }
232 0 : }
233 :
234 : static int
235 0 : should_process_wr_request( fd_snapwr_t * ctx ) {
236 0 : return ctx->req_seen%ctx->tile_cnt==ctx->tile_idx;
237 0 : }
238 :
239 : /* handle_data_frag handles a bstream block sz-aligned write request.
240 : Does a synchronous blocking O_DIRECT write. */
241 :
242 : static void
243 : handle_data_frag( fd_snapwr_t * ctx,
244 : ulong chunk, /* compressed input pointer */
245 : ulong dev_off, /* file offset */
246 0 : ulong sz_comp ) { /* compressed input size */
247 0 : ulong src_sz = sz_comp<<FD_VINYL_BSTREAM_BLOCK_LG_SZ;
248 0 : void const * src = fd_chunk_to_laddr_const( ctx->base, chunk );
249 0 : FD_CRIT( fd_ulong_is_aligned( (ulong)src, FD_VINYL_BSTREAM_BLOCK_SZ ), "misaligned write request" );
250 0 : FD_CRIT( fd_ulong_is_aligned( src_sz, FD_VINYL_BSTREAM_BLOCK_SZ ), "misaligned write request" );
251 0 : if( FD_UNLIKELY( dev_off+src_sz > ctx->dev_sz ) ) {
252 0 : FD_LOG_CRIT(( "vinyl bstream log is out of space" ));
253 0 : }
254 :
255 0 : if( FD_LIKELY( should_process_wr_request( ctx ) ) ) {
256 : /* Do a synchronous write(2) */
257 0 : ssize_t write_sz = pwrite( ctx->dev_fd, src, src_sz, (off_t)dev_off );
258 0 : if( FD_UNLIKELY( write_sz<0 ) ) {
259 0 : FD_LOG_ERR(( "pwrite(off=%lu,sz=%lu) failed (%i-%s)", dev_off, src_sz, errno, strerror( errno ) ));
260 0 : }
261 0 : }
262 0 : ctx->req_seen++;
263 :
264 0 : if( !!ctx->bstream_seq ) {
265 0 : fd_mcache_seq_update( ctx->bstream_seq, (dev_off+src_sz)-ctx->dev_base );
266 0 : }
267 :
268 0 : ctx->metrics.last_off = dev_off+src_sz;
269 0 : }
270 :
271 : static int
272 : during_frag( fd_snapwr_t * ctx,
273 : ulong in_idx,
274 : ulong meta_seq,
275 : ulong meta_sig,
276 : ulong meta_chunk,
277 : ulong meta_sz,
278 0 : ulong meta_ctl ) {
279 0 : (void)in_idx;
280 0 : ctx->idle_cnt = 0U;
281 :
282 0 : if( FD_UNLIKELY( meta_ctl==FD_SNAPSHOT_MSG_DATA ) ) {
283 0 : handle_data_frag( ctx, meta_chunk, meta_sig, meta_sz );
284 0 : } else {
285 0 : handle_control_frag( ctx, meta_ctl, meta_sig );
286 0 : }
287 :
288 : /* Because snapwr pacing is so loose and this tile sleeps, fd_stem
289 : will not return flow control credits fast enough.
290 : So, always update fseq (consumer progress) here. */
291 0 : fd_fseq_update( ctx->seq_sync, fd_seq_inc( meta_seq, 1UL ) );
292 :
293 0 : return 0;
294 0 : }
295 :
296 0 : #define STEM_BURST 1UL
297 0 : #define STEM_LAZY ((long)2e6)
298 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snapwr_t
299 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapwr_t)
300 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
301 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
302 0 : #define STEM_CALLBACK_BEFORE_CREDIT before_credit
303 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
304 :
305 : #include "../../disco/stem/fd_stem.c"
306 :
307 : fd_topo_run_tile_t fd_tile_snapwr = {
308 : .name = NAME,
309 : .populate_allowed_fds = populate_allowed_fds,
310 : .populate_allowed_seccomp = populate_allowed_seccomp,
311 : .scratch_align = scratch_align,
312 : .scratch_footprint = scratch_footprint,
313 : .privileged_init = privileged_init,
314 : .unprivileged_init = unprivileged_init,
315 : .run = stem_run
316 : };
317 :
318 : #undef NAME
|