Line data Source code
1 : /* The snapwr tile dispatches O_DIRECT writes of large (~O(10MiB))
2 : blocks to a vinyl bstream file. This tile practically only does
3 : blocking write(2) calls, which typically just yield to the kernel
4 : scheduler until I/O completes.
5 :
6 : Alternatives considered:
7 : - Doing blocking O_DIRECT writes in the snapin tile is possible, but
8 : starves the snapin tile off valuable CPU cycles while waiting for
9 : write completions.
10 : - Writing using the page cache (without O_DIRECT) similarly pipelines
11 : writes through background dirty cache flushing. Has a noticeable
12 : throughput cost.
13 : - io_uring with O_DIRECT has significantly lower latency (due to
14 : fewer per-op overhead, and thus smaller possible block sizes), and
15 : slightly better throughput. However, is much more complex, less
16 : portable, and less secure (harder to sandbox).
17 :
18 : While writing, under the hood, the following happens on a fast NVMe
19 : paired with an optimized file system (e.g. XFS):
20 : - Userland context switches to kernel context via pwrite64
21 : - Kernel sets up IOMMU page table entries, allowing NVMe device to
22 : read userland memory
23 : - Kernel sends write commands to NVMe device
24 : - Kernel suspends thread
25 : ...
26 : - NVMe device does DMA reads, writes to disk
27 : ...
28 : - NVMe device sends completions to kernel
29 : - Kernel removes IOMMU page table entries (might send IPIs ... sad)
30 : - Kernel swaps back to userland and resumes
31 : The above is a *lot* of overhead per-operation, which is the reason
32 : for multiple megabyte buffer sizes.
33 :
34 : The snapwr tile is thus expected to spend most of its time sleeping
35 : waiting for disk I/O to complete. The snapwr tile typically runs in
36 : "floating" mode. If there is no work to do, it saves power by going
37 : to sleep for 1 millisecond at a time.
38 :
39 : Accepted message descriptors:
40 :
41 : - ctl==FD_SNAPSHOT_MSG_DATA
42 : - chunk: compressed byte offset, relative to dcache data region (>>FD_CHUNK_LG_SZ)
43 : - sig: file offset to write to
44 : - sz: compressed write size (>>FD_VINYL_BSTREAM_BLOCK_LG_SZ)
45 :
46 : - ctl==FD_SNAPSHOT_MSG_CTRL_INIT_FULL
47 :
48 : - ctl==FD_SNAPSHOT_MSG_CTRL_INIT_INCR
49 : - sig: file offset to rewind "bytes written" metric to
50 :
51 : - ctl==FD_SNAPSHOT_MSG_CTRL_SHUTDOWN */
52 :
53 : #define _GNU_SOURCE /* O_DIRECT */
54 : #include "utils/fd_ssctrl.h"
55 : #include "utils/fd_vinyl_admin.h"
56 : #include "../../disco/topo/fd_topo.h"
57 : #include "../../disco/metrics/fd_metrics.h"
58 : #include "../../util/pod/fd_pod.h"
59 : #include "../../vinyl/bstream/fd_vinyl_bstream.h"
60 : #include "generated/fd_snapwr_tile_seccomp.h"
61 :
62 : #include <errno.h>
63 : #include <sys/stat.h>
64 : #include <fcntl.h> /* open */
65 : #include <unistd.h> /* pwrite */
66 :
67 : #define NAME "snapwr"
68 :
69 : struct fd_snapwr {
70 : uint state;
71 : int dev_fd;
72 : ulong dev_sz;
73 : ulong dev_base;
74 : void const * base;
75 : ulong * seq_sync; /* fseq->seq[0] */
76 : uint idle_cnt;
77 :
78 : fd_vinyl_admin_t * vinyl_admin;
79 : ulong * bstream_seq;
80 : int lthash_disabled;
81 :
82 : ulong req_seen;
83 : ulong tile_cnt;
84 : ulong tile_idx;
85 :
86 : struct {
87 : ulong last_off;
88 : } metrics;
89 : };
90 :
91 : typedef struct fd_snapwr fd_snapwr_t;
92 :
93 : static ulong
94 0 : scratch_align( void ) {
95 0 : return alignof(fd_snapwr_t);
96 0 : }
97 :
98 : static ulong
99 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
100 0 : (void)tile;
101 0 : return sizeof(fd_snapwr_t);
102 0 : }
103 :
104 : static void
105 : privileged_init( fd_topo_t * topo,
106 0 : fd_topo_tile_t * tile ) {
107 0 : fd_snapwr_t * snapwr = fd_topo_obj_laddr( topo, tile->tile_obj_id );
108 0 : memset( snapwr, 0, sizeof(fd_snapwr_t) );
109 :
110 0 : char const * vinyl_path = tile->snapwr.vinyl_path;
111 0 : int vinyl_fd = open( vinyl_path, O_RDWR|O_DIRECT|O_CLOEXEC, 0644 );
112 0 : if( FD_UNLIKELY( vinyl_fd<0 ) ) FD_LOG_ERR(( "open(%s,O_RDWR|O_DIRECT|O_CLOEXEC,0644) failed (%i-%s)", vinyl_path, errno, strerror( errno ) ));
113 :
114 0 : struct stat st;
115 0 : if( FD_UNLIKELY( 0!=fstat( vinyl_fd, &st ) ) ) FD_LOG_ERR(( "fstat(%s) failed (%i-%s)", vinyl_path, errno, strerror( errno ) ));
116 :
117 0 : snapwr->dev_fd = vinyl_fd;
118 0 : snapwr->dev_sz = fd_ulong_align_dn( (ulong)st.st_size, FD_VINYL_BSTREAM_BLOCK_SZ );
119 0 : snapwr->dev_base = FD_VINYL_BSTREAM_BLOCK_SZ;
120 0 : }
121 :
122 : static void
123 : unprivileged_init( fd_topo_t * topo,
124 0 : fd_topo_tile_t * tile ) {
125 0 : fd_snapwr_t * snapwr = fd_topo_obj_laddr( topo, tile->tile_obj_id );
126 0 : memset( &snapwr->metrics, 0, sizeof(snapwr->metrics) );
127 :
128 0 : if( FD_UNLIKELY( tile->in_cnt < 1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 1 or 2", tile->in_cnt ));
129 0 : if( FD_UNLIKELY( tile->in_cnt > 2UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 1 or 2", tile->in_cnt ));
130 0 : if( FD_UNLIKELY( tile->out_cnt!=0UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 0", tile->out_cnt ));
131 :
132 0 : if( FD_UNLIKELY( !tile->in_link_reliable[ 0 ] ) ) FD_LOG_ERR(( "tile `" NAME "` in link 0 must be reliable" ));
133 0 : FD_TEST( tile->snapwr.dcache_obj_id!=ULONG_MAX );
134 :
135 0 : uchar const * in_dcache = fd_dcache_join( fd_topo_obj_laddr( topo, tile->snapwr.dcache_obj_id ) );
136 0 : FD_TEST( in_dcache );
137 0 : snapwr->base = in_dcache;
138 0 : snapwr->seq_sync = tile->in_link_fseq[ 0 ];
139 :
140 0 : snapwr->bstream_seq = NULL; /* set to NULL by default, before checking input links. */
141 0 : for( ulong i=0UL; i<tile->in_cnt; i++ ) {
142 0 : fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ i ] ];
143 :
144 0 : if( FD_LIKELY( 0==strcmp( in_link->name, "snapwh_wr" ) ) ) {
145 0 : if( FD_UNLIKELY( !tile->in_link_reliable[ i ] ) ) FD_LOG_ERR(( "tile `" NAME "` in link %lu must be reliable", i ));
146 :
147 0 : } else if( FD_LIKELY( 0==strcmp( in_link->name, "snaplv_wr" ) ) ) {
148 0 : if( FD_UNLIKELY( !tile->in_link_reliable[ i ] ) ) FD_LOG_ERR(( "tile `" NAME "` in link %lu must be reliable", i ));
149 0 : snapwr->bstream_seq = fd_mcache_seq_laddr( fd_mcache_join( fd_topo_obj_laddr( topo, in_link->mcache_obj_id ) ) ) + tile->kind_id;
150 :
151 0 : } else {
152 0 : FD_LOG_ERR(( "tile `" NAME "` has unexpected in link name `%s`", in_link->name ));
153 0 : }
154 0 : }
155 :
156 0 : snapwr->vinyl_admin = NULL;
157 0 : snapwr->bstream_seq = NULL;
158 0 : snapwr->lthash_disabled = !!tile->snapwr.lthash_disabled;
159 0 : if( !snapwr->lthash_disabled ) {
160 0 : ulong vinyl_admin_obj_id = fd_pod_query_ulong( topo->props, "vinyl_admin", ULONG_MAX );
161 0 : FD_TEST( vinyl_admin_obj_id!=ULONG_MAX );
162 0 : fd_vinyl_admin_t * vinyl_admin = fd_vinyl_admin_join( fd_topo_obj_laddr( topo, vinyl_admin_obj_id ) );
163 0 : FD_TEST( vinyl_admin );
164 0 : snapwr->vinyl_admin = vinyl_admin;
165 0 : snapwr->bstream_seq = &snapwr->vinyl_admin->wr_seq[ tile->kind_id ];
166 0 : }
167 :
168 0 : snapwr->state = FD_SNAPSHOT_STATE_IDLE;
169 :
170 0 : snapwr->req_seen = 0UL;
171 0 : snapwr->tile_cnt = fd_topo_tile_name_cnt( topo, "snapwr" );
172 0 : snapwr->tile_idx = tile->kind_id;
173 0 : }
174 :
175 : static ulong
176 : populate_allowed_fds( fd_topo_t const * topo,
177 : fd_topo_tile_t const * tile,
178 : ulong out_fds_cnt,
179 0 : int * out_fds ) {
180 0 : if( FD_UNLIKELY( out_fds_cnt<3UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
181 0 : fd_snapwr_t const * snapwr = fd_topo_obj_laddr( topo, tile->tile_obj_id );
182 :
183 0 : ulong out_cnt = 0;
184 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
185 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
186 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
187 0 : }
188 0 : out_fds[ out_cnt++ ] = snapwr->dev_fd;
189 :
190 0 : return out_cnt;
191 0 : }
192 :
193 : static ulong
194 : populate_allowed_seccomp( fd_topo_t const * topo,
195 : fd_topo_tile_t const * tile,
196 : ulong out_cnt,
197 0 : struct sock_filter * out ) {
198 0 : fd_snapwr_t const * snapwr = fd_topo_obj_laddr( topo, tile->tile_obj_id );
199 0 : populate_sock_filter_policy_fd_snapwr_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)snapwr->dev_fd );
200 0 : return sock_filter_policy_fd_snapwr_tile_instr_cnt;
201 0 : }
202 :
203 : static int
204 0 : should_shutdown( fd_snapwr_t const * ctx ) {
205 0 : return ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN;
206 0 : }
207 :
208 : static void
209 : before_credit( fd_snapwr_t * ctx,
210 : fd_stem_context_t * stem,
211 0 : int * charge_busy ) {
212 0 : (void)stem;
213 0 : if( ++ctx->idle_cnt >= 1024U ) {
214 0 : fd_log_sleep( (long)1e6 ); /* 1 millisecond */
215 0 : *charge_busy = 0;
216 0 : ctx->idle_cnt = 0U;
217 0 : }
218 0 : }
219 :
220 : static void
221 0 : metrics_write( fd_snapwr_t * ctx ) {
222 0 : FD_MGAUGE_SET( SNAPWR, STATE, ctx->state );
223 0 : FD_MGAUGE_SET( SNAPWR, VINYL_BYTES_WRITTEN, ctx->metrics.last_off );
224 0 : }
225 :
226 : /* handle_control_frag handles an administrative frag from the snapin
227 : tile. */
228 :
229 : static void
230 : handle_control_frag( fd_snapwr_t * ctx,
231 : ulong meta_ctl,
232 0 : ulong meta_sig ) {
233 0 : switch( meta_ctl ) {
234 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
235 0 : ctx->metrics.last_off = 0UL;
236 0 : ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
237 0 : break;
238 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_INCR:
239 0 : ctx->metrics.last_off = meta_sig;
240 0 : ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
241 0 : break;
242 0 : case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
243 0 : ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN;
244 0 : break;
245 0 : default:
246 0 : FD_LOG_CRIT(( "received unexpected ssctrl msg type %lu", meta_ctl ));
247 0 : break;
248 0 : }
249 0 : }
250 :
251 : static int
252 0 : should_process_wr_request( fd_snapwr_t * ctx ) {
253 0 : return ctx->req_seen%ctx->tile_cnt==ctx->tile_idx;
254 0 : }
255 :
256 : /* handle_data_frag handles a bstream block sz-aligned write request.
257 : Does a synchronous blocking O_DIRECT write. */
258 :
259 : static void
260 : handle_data_frag( fd_snapwr_t * ctx,
261 : ulong chunk, /* compressed input pointer */
262 : ulong dev_off, /* file offset */
263 0 : ulong sz_comp ) { /* compressed input size */
264 0 : ulong src_sz = sz_comp<<FD_VINYL_BSTREAM_BLOCK_LG_SZ;
265 0 : void const * src = fd_chunk_to_laddr_const( ctx->base, chunk );
266 0 : FD_CRIT( fd_ulong_is_aligned( (ulong)src, FD_VINYL_BSTREAM_BLOCK_SZ ), "misaligned write request" );
267 0 : FD_CRIT( fd_ulong_is_aligned( src_sz, FD_VINYL_BSTREAM_BLOCK_SZ ), "misaligned write request" );
268 0 : if( FD_UNLIKELY( dev_off+src_sz > ctx->dev_sz ) ) {
269 0 : FD_LOG_CRIT(( "vinyl bstream log is out of space" ));
270 0 : }
271 :
272 0 : if( FD_LIKELY( should_process_wr_request( ctx ) ) ) {
273 : /* Do a synchronous write(2) */
274 0 : ssize_t write_sz = pwrite( ctx->dev_fd, src, src_sz, (off_t)dev_off );
275 0 : if( FD_UNLIKELY( write_sz<0 ) ) {
276 0 : FD_LOG_ERR(( "pwrite(off=%lu,sz=%lu) failed (%i-%s)", dev_off, src_sz, errno, strerror( errno ) ));
277 0 : }
278 0 : }
279 0 : ctx->req_seen++;
280 :
281 0 : if( !!ctx->bstream_seq ) {
282 : /* There is a way to avoid a lock here: every write tile has its
283 : own unique location in vinyl_admin's wr_seq array, based on its
284 : tile index (kind_id). The value is a ulong, and works the same
285 : way as a stem's fseq or an mcache's seq. The only other tile
286 : that can write to that location is snapwm during init full/incr
287 : snapshot, but snapwm gurantees that all write tiles have already
288 : finished processing all pending bstream writes (and updated the
289 : bstream_seq) by the time the wr_seq array is overwritten. */
290 0 : ulong new_seq = (dev_off+src_sz)-ctx->dev_base;
291 0 : fd_vinyl_admin_ulong_update( ctx->bstream_seq, new_seq );
292 0 : }
293 :
294 0 : ctx->metrics.last_off = dev_off+src_sz;
295 0 : }
296 :
297 : static int
298 : during_frag( fd_snapwr_t * ctx,
299 : ulong in_idx,
300 : ulong meta_seq,
301 : ulong meta_sig,
302 : ulong meta_chunk,
303 : ulong meta_sz,
304 : ulong meta_ctl,
305 : ulong meta_tsorig,
306 0 : ulong meta_tspub ) {
307 0 : (void)in_idx; (void)meta_sz; (void)meta_tspub;
308 0 : ctx->idle_cnt = 0U;
309 :
310 0 : if( FD_UNLIKELY( meta_ctl==FD_SNAPSHOT_MSG_DATA ) ) {
311 0 : handle_data_frag( ctx, meta_chunk, meta_sig, meta_tsorig );
312 0 : } else {
313 0 : handle_control_frag( ctx, meta_ctl, meta_sig );
314 0 : }
315 :
316 : /* Because snapwr pacing is so loose and this tile sleeps, fd_stem
317 : will not return flow control credits fast enough.
318 : So, always update fseq (consumer progress) here. */
319 0 : fd_fseq_update( ctx->seq_sync, fd_seq_inc( meta_seq, 1UL ) );
320 :
321 0 : return 0;
322 0 : }
323 :
324 0 : #define STEM_BURST 1UL
325 0 : #define STEM_LAZY ((long)2e6)
326 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snapwr_t
327 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapwr_t)
328 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
329 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
330 0 : #define STEM_CALLBACK_BEFORE_CREDIT before_credit
331 0 : #define STEM_CALLBACK_DURING_FRAG1 during_frag
332 :
333 : #include "../../disco/stem/fd_stem.c"
334 :
335 : fd_topo_run_tile_t fd_tile_snapwr = {
336 : .name = NAME,
337 : .populate_allowed_fds = populate_allowed_fds,
338 : .populate_allowed_seccomp = populate_allowed_seccomp,
339 : .scratch_align = scratch_align,
340 : .scratch_footprint = scratch_footprint,
341 : .privileged_init = privileged_init,
342 : .unprivileged_init = unprivileged_init,
343 : .run = stem_run
344 : };
345 :
346 : #undef NAME
|