Line data Source code
1 : #define _GNU_SOURCE
2 : #include "../../disco/tiles.h"
3 : #include "generated/fd_writer_tile_seccomp.h"
4 :
5 : #include "../../util/pod/fd_pod_format.h"
6 :
7 : #include "../../flamenco/runtime/fd_runtime.h"
8 : #include "../../flamenco/runtime/fd_runtime_public.h"
9 : #include "../../flamenco/runtime/fd_executor.h"
10 :
11 : #include "../../funk/fd_funk.h"
12 :
13 : struct fd_writer_tile_in_ctx {
14 : fd_wksp_t * mem;
15 : ulong chunk0;
16 : ulong wmark;
17 : };
18 : typedef struct fd_writer_tile_in_ctx fd_writer_tile_in_ctx_t;
19 :
20 : struct fd_writer_tile_ctx {
21 : fd_wksp_t * wksp;
22 : fd_spad_t * spad;
23 : ulong tile_cnt;
24 : ulong tile_idx;
25 : ulong exec_tile_cnt;
26 :
27 : /* R/W by this tile and the replay tile. */
28 : ulong * fseq;
29 :
30 : /* Local join of Funk. R/W. */
31 : fd_funk_t funk[1];
32 : fd_funk_txn_t * funk_txn;
33 :
34 : /* Link management. */
35 : fd_writer_tile_in_ctx_t exec_writer_in[ FD_PACK_MAX_BANK_TILES ];
36 :
37 : /* Runtime public and local joins of its members. */
38 : fd_wksp_t const * runtime_public_wksp;
39 : fd_runtime_public_t const * runtime_public;
40 : fd_spad_t const * runtime_spad;
41 :
42 : /* Local joins of exec spads. Read-only. */
43 : fd_spad_t * exec_spad[ FD_PACK_MAX_BANK_TILES ];
44 : fd_wksp_t * exec_spad_wksp[ FD_PACK_MAX_BANK_TILES ];
45 :
46 : /* Local joins of exec tile txn ctx. Read-only. */
47 : fd_exec_txn_ctx_t * txn_ctx[ FD_PACK_MAX_BANK_TILES ];
48 :
49 : /* Local join of bank manager. R/W */
50 : fd_banks_t * banks;
51 : fd_bank_t * bank;
52 : };
53 : typedef struct fd_writer_tile_ctx fd_writer_tile_ctx_t;
54 :
55 : FD_FN_CONST static inline ulong
56 0 : scratch_align( void ) {
57 0 : return 128UL;
58 0 : }
59 :
60 : FD_FN_PURE static inline ulong
61 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
62 0 : (void)tile;
63 0 : ulong l = FD_LAYOUT_INIT;
64 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_writer_tile_ctx_t), sizeof(fd_writer_tile_ctx_t) );
65 0 : l = FD_LAYOUT_APPEND( l, fd_spad_align(), fd_spad_footprint( FD_RUNTIME_TRANSACTION_FINALIZATION_FOOTPRINT ) );
66 0 : return FD_LAYOUT_FINI( l, scratch_align() );
67 0 : }
68 :
69 : static void
70 : join_txn_ctx( fd_writer_tile_ctx_t * ctx,
71 : ulong exec_tile_idx,
72 0 : uint txn_ctx_offset ) {
73 :
74 0 : ulong exec_spad_gaddr = fd_wksp_gaddr( ctx->exec_spad_wksp[ exec_tile_idx ], ctx->exec_spad[ exec_tile_idx ] );
75 0 : if( FD_UNLIKELY( !exec_spad_gaddr ) ) {
76 0 : FD_LOG_CRIT(( "Unable to get gaddr of exec_spad %lu", exec_tile_idx ));
77 0 : }
78 :
79 0 : ulong txn_ctx_gaddr = exec_spad_gaddr + txn_ctx_offset;
80 0 : uchar * txn_ctx_laddr = fd_wksp_laddr( ctx->exec_spad_wksp[ exec_tile_idx ], txn_ctx_gaddr );
81 0 : if( FD_UNLIKELY( !txn_ctx_laddr ) ) {
82 0 : FD_LOG_CRIT(( "Unable to get laddr of the txn ctx at gaddr 0x%lx from exec_spad %lu", txn_ctx_gaddr, exec_tile_idx ));
83 0 : }
84 :
85 0 : ctx->txn_ctx[ exec_tile_idx ] = fd_exec_txn_ctx_join( txn_ctx_laddr,
86 0 : ctx->exec_spad[ exec_tile_idx ],
87 0 : ctx->exec_spad_wksp[ exec_tile_idx ] );
88 0 : if( FD_UNLIKELY( !ctx->txn_ctx[ exec_tile_idx ] ) ) {
89 0 : FD_LOG_CRIT(( "Unable to join txn ctx at gaddr 0x%lx laddr 0x%lx from exec_spad %lu", txn_ctx_gaddr, (ulong)txn_ctx_laddr, exec_tile_idx ));
90 0 : }
91 0 : }
92 :
93 : static int
94 : before_frag( fd_writer_tile_ctx_t * ctx,
95 : ulong in_idx,
96 : ulong seq,
97 0 : ulong sig ) {
98 :
99 : /* Round-robin.
100 :
101 : The usual round-robin strategy of returning
102 : (seq % ctx->tile_cnt) != ctx->tile_idx
103 : here suffers somewhat from a sort of convoy effect.
104 : This is because exec tiles do not proceed to the next transaction
105 : until transaction finalization has been done. In other words, exec
106 : tiles block on writer tiles, rather than truly pipelining. As a
107 : result, when all the exec tiles publish to seq 0, the 0th writer
108 : tile becomes busy, and all exec tiles block on it. Then writer tile
109 : 1 becomes busy, while all other writer tiles sit idle. So on and so
110 : forth.
111 :
112 : So we offset by in_idx to try to mitigate this.
113 : */
114 0 : return ((seq+in_idx) % ctx->tile_cnt) != ctx->tile_idx && sig != FD_WRITER_BOOT_SIG; /* The boot message should go through to all writer tiles. */
115 0 : }
116 :
117 : static void
118 : during_frag( fd_writer_tile_ctx_t * ctx,
119 : ulong in_idx,
120 : ulong seq,
121 : ulong sig,
122 : ulong chunk,
123 : ulong sz,
124 0 : ulong ctl ) {
125 :
126 0 : (void)seq;
127 0 : (void)ctl;
128 :
129 : /* exec_writer is a reliable flow controlled link so we are not gonna
130 : bother with copying the incoming frag. */
131 :
132 0 : fd_writer_tile_in_ctx_t * in_ctx = &(ctx->exec_writer_in[ in_idx ]);
133 :
134 0 : if( FD_UNLIKELY( chunk < in_ctx->chunk0 || chunk > in_ctx->wmark ) ) {
135 0 : FD_LOG_CRIT(( "chunk %lu %lu corrupt, not in range [%lu,%lu]",
136 0 : chunk,
137 0 : sz,
138 0 : in_ctx->chunk0,
139 0 : in_ctx->wmark ));
140 0 : }
141 :
142 : /* Process messages from exec tiles. */
143 :
144 0 : if( FD_UNLIKELY( sig == FD_WRITER_BOOT_SIG ) ) {
145 0 : fd_runtime_public_exec_writer_boot_msg_t * msg = fd_type_pun( fd_chunk_to_laddr( in_ctx->mem, chunk ) );
146 0 : join_txn_ctx( ctx, in_idx, msg->txn_ctx_offset );
147 0 : ulong txn_ctx_cnt = 0UL;
148 0 : for( ulong i=0UL; i<ctx->exec_tile_cnt; i++ ) {
149 0 : txn_ctx_cnt += fd_ulong_if( ctx->txn_ctx[ i ]!=NULL, 1UL, 0UL );
150 0 : }
151 0 : if( txn_ctx_cnt==ctx->exec_tile_cnt ) {
152 0 : fd_fseq_update( ctx->fseq, FD_WRITER_STATE_READY );
153 0 : FD_LOG_NOTICE(( "writer tile %lu fully booted", ctx->tile_idx ));
154 0 : }
155 0 : return;
156 0 : }
157 :
158 0 : if( FD_LIKELY( sig == FD_WRITER_TXN_SIG ) ) {
159 0 : fd_runtime_public_exec_writer_txn_msg_t * msg = fd_type_pun( fd_chunk_to_laddr( in_ctx->mem, chunk ) );
160 0 : if( FD_UNLIKELY( msg->exec_tile_id!=in_idx ) ) {
161 0 : FD_LOG_CRIT(( "exec_tile_id %u should be == in_idx %lu", msg->exec_tile_id, in_idx ));
162 0 : }
163 0 : fd_execute_txn_task_info_t info = {0};
164 0 : info.txn_ctx = ctx->txn_ctx[ in_idx ];
165 0 : info.exec_res = info.txn_ctx->exec_err;
166 :
167 0 : if( !ctx->bank || info.txn_ctx->slot != ctx->bank->slot ) {
168 0 : ctx->bank = fd_banks_get_bank( ctx->banks, info.txn_ctx->slot );
169 0 : if( FD_UNLIKELY( !ctx->bank ) ) {
170 0 : FD_LOG_CRIT(( "Could not find bank for slot %lu", info.txn_ctx->slot ));
171 0 : }
172 0 : }
173 :
174 0 : if( !ctx->funk_txn || info.txn_ctx->slot != ctx->funk_txn->xid.ul[0] ) {
175 0 : fd_funk_txn_map_t * txn_map = fd_funk_txn_map( ctx->funk );
176 0 : if( FD_UNLIKELY( !txn_map->map ) ) {
177 0 : FD_LOG_CRIT(( "Could not find valid funk transaction map" ));
178 0 : }
179 0 : fd_funk_txn_xid_t xid = { .ul = { ctx->bank->slot, ctx->bank->slot } };
180 0 : fd_funk_txn_start_read( ctx->funk );
181 0 : ctx->funk_txn = fd_funk_txn_query( &xid, txn_map );
182 0 : if( FD_UNLIKELY( !ctx->funk_txn ) ) {
183 0 : FD_LOG_CRIT(( "Could not find valid funk transaction" ));
184 0 : }
185 0 : fd_funk_txn_end_read( ctx->funk );
186 0 : }
187 :
188 0 : if( FD_LIKELY( info.txn_ctx->flags & FD_TXN_P_FLAGS_EXECUTE_SUCCESS ) ) {
189 0 : while( fd_writer_fseq_get_state( fd_fseq_query( ctx->fseq ) )!=FD_WRITER_STATE_READY ) {
190 : /* Spin to wait for the replay tile to ack the previous txn
191 : done. */
192 0 : FD_SPIN_PAUSE();
193 0 : }
194 0 : FD_SPAD_FRAME_BEGIN( ctx->spad ) {
195 0 : if( FD_UNLIKELY( !ctx->bank ) ) {
196 0 : FD_LOG_CRIT(( "No bank for slot %lu", info.txn_ctx->slot ));
197 0 : }
198 :
199 0 : fd_runtime_finalize_txn( ctx->funk, ctx->funk_txn, &info, ctx->spad, ctx->bank );
200 0 : } FD_SPAD_FRAME_END;
201 0 : }
202 : /* Notify the replay tile. */
203 0 : fd_fseq_update( ctx->fseq, fd_writer_fseq_set_txn_done( msg->txn_id, msg->exec_tile_id ) );
204 0 : return;
205 0 : }
206 :
207 0 : FD_LOG_CRIT(( "Unknown sig %lu", sig ));
208 0 : }
209 :
210 : static void
211 : privileged_init( fd_topo_t * topo,
212 0 : fd_topo_tile_t * tile ) {
213 0 : (void)topo;
214 0 : (void)tile;
215 0 : }
216 :
217 : static void
218 : unprivileged_init( fd_topo_t * topo,
219 0 : fd_topo_tile_t * tile ) {
220 :
221 : /********************************************************************/
222 : /* Validate allocations */
223 : /********************************************************************/
224 :
225 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
226 :
227 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
228 0 : fd_writer_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_writer_tile_ctx_t), sizeof(fd_writer_tile_ctx_t) );
229 0 : void * spad_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_spad_align(), fd_spad_footprint( FD_RUNTIME_TRANSACTION_FINALIZATION_FOOTPRINT ) );
230 0 : ulong scratch_alloc_mem = FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
231 0 : if( FD_UNLIKELY( scratch_alloc_mem - (ulong)scratch - scratch_footprint( tile ) ) ) {
232 0 : FD_LOG_CRIT( ( "scratch_alloc_mem did not match scratch_footprint diff: %lu alloc: %lu footprint: %lu",
233 0 : scratch_alloc_mem - (ulong)scratch - scratch_footprint( tile ),
234 0 : scratch_alloc_mem,
235 0 : (ulong)scratch + scratch_footprint( tile ) ) );
236 0 : }
237 0 : fd_memset( ctx, 0, sizeof(*ctx) );
238 0 : ctx->wksp = topo->workspaces[ topo->objs[ tile->tile_obj_id ].wksp_id ].wksp;
239 0 : ctx->spad = fd_spad_join( fd_spad_new( spad_mem, FD_RUNTIME_TRANSACTION_FINALIZATION_FOOTPRINT ) );
240 :
241 : /********************************************************************/
242 : /* Links */
243 : /********************************************************************/
244 :
245 0 : ctx->tile_cnt = fd_topo_tile_name_cnt( topo, tile->name );
246 0 : ctx->tile_idx = tile->kind_id;
247 :
248 0 : ulong exec_tile_cnt = fd_topo_tile_name_cnt( topo, "exec" );
249 0 : ctx->exec_tile_cnt = exec_tile_cnt;
250 :
251 : /* Find and setup all the exec_writer links. */
252 0 : if( FD_UNLIKELY( exec_tile_cnt!=tile->in_cnt ) ) {
253 0 : FD_LOG_CRIT(( "Expecting one exec_writer link per exec tile but found %lu links and %lu tiles", tile->in_cnt, exec_tile_cnt ));
254 0 : }
255 0 : for( ulong i=0UL; i<tile->in_cnt; i++ ) {
256 0 : ulong exec_writer_idx = fd_topo_find_tile_in_link( topo, tile, "exec_writer", i );
257 0 : if( FD_UNLIKELY( exec_writer_idx==ULONG_MAX ) ) {
258 0 : FD_LOG_CRIT(( "Could not find exec_writer in-link %lu", i ));
259 0 : }
260 0 : fd_topo_link_t * exec_writer_in_link = &topo->links[ tile->in_link_id[ i ] ];
261 0 : if( FD_UNLIKELY( !exec_writer_in_link ) ) {
262 0 : FD_LOG_CRIT(( "Invalid exec_writer in-link %lu", i ));
263 0 : }
264 0 : ctx->exec_writer_in[ i ].mem = topo->workspaces[ topo->objs[ exec_writer_in_link->dcache_obj_id ].wksp_id ].wksp;
265 0 : ctx->exec_writer_in[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->exec_writer_in[ i ].mem, exec_writer_in_link->dcache );
266 0 : ctx->exec_writer_in[ i ].wmark = fd_dcache_compact_wmark( ctx->exec_writer_in[ i ].mem,
267 0 : exec_writer_in_link->dcache,
268 0 : exec_writer_in_link->mtu );
269 0 : }
270 :
271 : /********************************************************************/
272 : /* Setup runtime public */
273 : /********************************************************************/
274 :
275 0 : ulong runtime_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "runtime_pub" );
276 0 : if( FD_UNLIKELY( runtime_obj_id==ULONG_MAX ) ) {
277 0 : FD_LOG_ERR(( "Could not find topology object for runtime public" ));
278 0 : }
279 :
280 0 : ctx->runtime_public_wksp = topo->workspaces[ topo->objs[ runtime_obj_id ].wksp_id ].wksp;
281 0 : if( FD_UNLIKELY( !ctx->runtime_public_wksp ) ) {
282 0 : FD_LOG_ERR(( "No runtime_public workspace" ));
283 0 : }
284 :
285 0 : ctx->runtime_public = fd_runtime_public_join( fd_topo_obj_laddr( topo, runtime_obj_id ) );
286 0 : if( FD_UNLIKELY( !ctx->runtime_public ) ) {
287 0 : FD_LOG_ERR(( "Failed to join runtime public" ));
288 0 : }
289 :
290 0 : ctx->runtime_spad = fd_runtime_public_spad( ctx->runtime_public );
291 0 : if( FD_UNLIKELY( !ctx->runtime_spad ) ) {
292 0 : FD_LOG_ERR(( "Failed to get and join runtime spad" ));
293 0 : }
294 :
295 : /********************************************************************/
296 : /* Spad */
297 : /********************************************************************/
298 :
299 : /* Join all of the exec spads. */
300 0 : for( ulong i=0UL; i<exec_tile_cnt; i++ ) {
301 0 : ulong exec_spad_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "exec_spad.%lu", i );
302 0 : if( FD_UNLIKELY( exec_spad_obj_id==ULONG_MAX ) ) {
303 0 : FD_LOG_CRIT(( "Could not find topology object for exec_spad.%lu", i ));
304 0 : }
305 :
306 0 : ctx->exec_spad[ i ] = fd_spad_join( fd_topo_obj_laddr( topo, exec_spad_obj_id ) );
307 0 : if( FD_UNLIKELY( !ctx->exec_spad[ i ] ) ) {
308 0 : FD_LOG_CRIT(( "Failed to join exec_spad.%lu", i ));
309 0 : }
310 0 : ctx->exec_spad_wksp[ i ] = fd_wksp_containing( ctx->exec_spad[ i ] );
311 0 : if( FD_UNLIKELY( !ctx->exec_spad_wksp[ i ] ) ) {
312 0 : FD_LOG_CRIT(( "Failed to find wksp for exec_spad.%lu", i ));
313 0 : }
314 0 : }
315 :
316 : /********************************************************************/
317 : /* Funk */
318 : /********************************************************************/
319 :
320 0 : if( FD_UNLIKELY( !fd_funk_join( ctx->funk, fd_topo_obj_laddr( topo, tile->writer.funk_obj_id ) ) ) ) {
321 0 : FD_LOG_ERR(( "Failed to join database cache" ));
322 0 : }
323 :
324 : /********************************************************************/
325 : /* Setup fseq */
326 : /********************************************************************/
327 :
328 0 : ulong writer_fseq_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "writer_fseq.%lu", ctx->tile_idx );
329 0 : ctx->fseq = fd_fseq_join( fd_topo_obj_laddr( topo, writer_fseq_id ) );
330 0 : if( FD_UNLIKELY( !ctx->fseq ) ) {
331 0 : FD_LOG_CRIT(( "writer tile %lu fseq setup failed", ctx->tile_idx ));
332 0 : }
333 0 : fd_fseq_update( ctx->fseq, FD_WRITER_STATE_NOT_BOOTED );
334 :
335 : /********************************************************************/
336 : /* Bank */
337 : /********************************************************************/
338 :
339 0 : ulong banks_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "banks" );
340 0 : if( FD_UNLIKELY( banks_obj_id==ULONG_MAX ) ) {
341 0 : FD_LOG_ERR(( "Could not find topology object for banks" ));
342 0 : }
343 :
344 0 : ctx->banks = fd_banks_join( fd_topo_obj_laddr( topo, banks_obj_id ) );
345 0 : if( FD_UNLIKELY( !ctx->banks ) ) {
346 0 : FD_LOG_ERR(( "Failed to join banks" ));
347 0 : }
348 0 : }
349 :
350 : static ulong
351 : populate_allowed_seccomp( fd_topo_t const * topo,
352 : fd_topo_tile_t const * tile,
353 : ulong out_cnt,
354 0 : struct sock_filter * out ) {
355 0 : (void)topo;
356 0 : (void)tile;
357 :
358 0 : populate_sock_filter_policy_fd_writer_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() );
359 0 : return sock_filter_policy_fd_writer_tile_instr_cnt;
360 0 : }
361 :
362 : static ulong
363 : populate_allowed_fds( fd_topo_t const * topo,
364 : fd_topo_tile_t const * tile,
365 : ulong out_fds_cnt,
366 0 : int * out_fds ) {
367 0 : (void)topo;
368 0 : (void)tile;
369 :
370 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
371 :
372 0 : ulong out_cnt = 0UL;
373 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
374 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
375 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
376 0 : return out_cnt;
377 0 : }
378 :
379 0 : #define STEM_BURST (1UL)
380 :
381 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_writer_tile_ctx_t
382 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_writer_tile_ctx_t)
383 :
384 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
385 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
386 :
387 : #include "../../disco/stem/fd_stem.c"
388 :
389 : fd_topo_run_tile_t fd_tile_writer = {
390 : .name = "writer",
391 : .loose_footprint = 0UL,
392 : .populate_allowed_seccomp = populate_allowed_seccomp,
393 : .populate_allowed_fds = populate_allowed_fds,
394 : .scratch_align = scratch_align,
395 : .scratch_footprint = scratch_footprint,
396 : .privileged_init = privileged_init,
397 : .unprivileged_init = unprivileged_init,
398 : .run = stem_run,
399 : };
|