Line data Source code
1 : #include "../../disco/topo/fd_topo.h"
2 : #include "../../disco/topo/fd_pod_format.h"
3 : #include "../../funk/fd_funk.h"
4 : #include "../../funk/fd_funk_filemap.h"
5 : #include "../../flamenco/runtime/fd_hashes.h"
6 : #include "../../flamenco/runtime/fd_txncache.h"
7 : #include "../../flamenco/snapshot/fd_snapshot_create.h"
8 : #include "../../flamenco/runtime/fd_runtime.h"
9 :
10 : #include "generated/fd_batch_tile_seccomp.h"
11 :
12 : #include <errno.h>
13 : #include <unistd.h>
14 :
15 0 : #define REPLAY_OUT_IDX (0UL)
16 0 : #define EAH_REPLAY_OUT_SIG (0UL)
17 :
18 0 : #define MEM_FOOTPRINT (8UL<<30)
19 :
20 : struct fd_snapshot_tile_ctx {
21 : /* User defined parameters. */
22 : ulong full_interval;
23 : ulong incremental_interval;
24 : char const * out_dir;
25 : char funk_file[ PATH_MAX ];
26 :
27 : /* Shared data structures. */
28 : fd_txncache_t * status_cache;
29 : ulong * is_constipated;
30 : fd_funk_t * funk;
31 :
32 : /* File descriptors used for snapshot generation. */
33 : int tmp_fd;
34 : int tmp_inc_fd;
35 : int full_snapshot_fd;
36 : int incremental_snapshot_fd;
37 :
38 : /* Thread pool used for account hash calculation. */
39 : uchar tpool_mem[ FD_TPOOL_FOOTPRINT( FD_TILE_MAX ) ] __attribute__( ( aligned( FD_TPOOL_ALIGN ) ) );
40 : fd_tpool_t * tpool;
41 :
42 : /* Only join funk after tiles start spinning. */
43 : int is_funk_active;
44 :
45 : /* Metadata from the full snapshot used for incremental snapshots. */
46 : ulong last_full_snap_slot;
47 : fd_hash_t last_hash;
48 : ulong last_capitalization;
49 :
50 : /* Replay out link fields for epoch account hash. */
51 : fd_wksp_t * replay_out_mem;
52 : ulong replay_out_chunk;
53 :
54 : fd_wksp_t * replay_public_wksp;
55 : fd_runtime_public_t * replay_public;
56 :
57 : /* Bump allocator */
58 : fd_spad_t * spad;
59 : };
60 : typedef struct fd_snapshot_tile_ctx fd_snapshot_tile_ctx_t;
61 :
62 : void
63 0 : tpool_batch_boot( fd_topo_t * topo, ulong total_thread_count ) {
64 0 : ushort tile_to_cpu[ FD_TILE_MAX ] = {0};
65 0 : ulong thread_count = 0UL;
66 0 : ulong main_thread_seen = 0UL;
67 :
68 0 : for( ulong i=0UL; i<topo->tile_cnt; i++ ) {
69 0 : if( strcmp( topo->tiles[i].name, "btpool" ) == 0 ) {
70 0 : tile_to_cpu[ 1+thread_count ] = (ushort)topo->tiles[i].cpu_idx;
71 0 : thread_count++;
72 0 : }
73 0 : if( strcmp( topo->tiles[i].name, "batch" ) == 0 ) {
74 0 : tile_to_cpu[ 0 ] = (ushort)topo->tiles[i].cpu_idx;
75 0 : main_thread_seen = 1;
76 0 : }
77 0 : }
78 :
79 0 : if( main_thread_seen ) {
80 0 : thread_count++;
81 0 : }
82 :
83 0 : if( thread_count != total_thread_count )
84 0 : FD_LOG_ERR(( "thread count mismatch thread_count=%lu total_thread_count=%lu main_thread_seen=%lu",
85 0 : thread_count,
86 0 : total_thread_count,
87 0 : main_thread_seen ));
88 :
89 0 : fd_tile_private_map_boot( tile_to_cpu, thread_count );
90 0 : }
91 :
92 : FD_FN_CONST static inline ulong
93 0 : scratch_align( void ) {
94 0 : return 128UL;
95 0 : }
96 :
97 : FD_FN_PURE static inline ulong
98 0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
99 0 : ulong l = FD_LAYOUT_INIT;
100 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_snapshot_tile_ctx_t), sizeof(fd_snapshot_tile_ctx_t) );
101 0 : l = FD_LAYOUT_APPEND( l, fd_spad_align(), fd_ulong_align_up( MEM_FOOTPRINT, fd_spad_align() ) );
102 0 : return FD_LAYOUT_FINI( l, scratch_align() );
103 0 : }
104 :
105 : static void
106 : privileged_init( fd_topo_t * topo FD_PARAM_UNUSED,
107 0 : fd_topo_tile_t * tile ) {
108 :
109 : /* First open the relevant files here. TODO: We eventually want to extend
110 : this to support multiple files. */
111 :
112 0 : char tmp_dir_buf[ FD_SNAPSHOT_DIR_MAX ];
113 0 : int err = snprintf( tmp_dir_buf, FD_SNAPSHOT_DIR_MAX, "%s/%s", tile->batch.out_dir, FD_SNAPSHOT_TMP_ARCHIVE );
114 0 : if( FD_UNLIKELY( err<0 ) ) {
115 0 : FD_LOG_ERR(( "Failed to format directory string" ));
116 0 : }
117 :
118 0 : char tmp_inc_dir_buf[ FD_SNAPSHOT_DIR_MAX ];
119 0 : err = snprintf( tmp_inc_dir_buf, FD_SNAPSHOT_DIR_MAX, "%s/%s", tile->batch.out_dir, FD_SNAPSHOT_TMP_INCR_ARCHIVE );
120 0 : if( FD_UNLIKELY( err<0 ) ) {
121 0 : FD_LOG_ERR(( "Failed to format directory string" ));
122 0 : }
123 :
124 0 : char zstd_dir_buf[ FD_SNAPSHOT_DIR_MAX ];
125 0 : err = snprintf( zstd_dir_buf, FD_SNAPSHOT_DIR_MAX, "%s/%s", tile->batch.out_dir, FD_SNAPSHOT_TMP_FULL_ARCHIVE_ZSTD );
126 0 : if( FD_UNLIKELY( err<0 ) ) {
127 0 : FD_LOG_ERR(( "Failed to format directory string" ));
128 0 : }
129 :
130 0 : char zstd_inc_dir_buf[ FD_SNAPSHOT_DIR_MAX ];
131 0 : err = snprintf( zstd_inc_dir_buf, FD_SNAPSHOT_DIR_MAX, "%s/%s", tile->batch.out_dir, FD_SNAPSHOT_TMP_INCR_ARCHIVE_ZSTD );
132 0 : if( FD_UNLIKELY( err<0 ) ) {
133 0 : FD_LOG_ERR(( "Failed to format directory string" ));
134 0 : }
135 :
136 : /* Create and open the relevant files for snapshots. */
137 :
138 0 : tile->batch.tmp_fd = open( tmp_dir_buf, O_CREAT | O_RDWR | O_TRUNC, 0644 );
139 0 : if( FD_UNLIKELY( tile->batch.tmp_fd==-1 ) ) {
140 0 : FD_LOG_ERR(( "Failed to open and create tarball for file=%s (%i-%s)", tmp_dir_buf, errno, fd_io_strerror( errno ) ));
141 0 : }
142 :
143 0 : tile->batch.tmp_inc_fd = open( tmp_inc_dir_buf, O_CREAT | O_RDWR | O_TRUNC, 0644 );
144 0 : if( FD_UNLIKELY( tile->batch.tmp_inc_fd==-1 ) ) {
145 0 : FD_LOG_ERR(( "Failed to open and create tarball for file=%s (%i-%s)", tmp_inc_dir_buf, errno, fd_io_strerror( errno ) ));
146 0 : }
147 :
148 0 : tile->batch.full_snapshot_fd = open( zstd_dir_buf, O_RDWR | O_CREAT | O_TRUNC, 0644 );
149 0 : if( FD_UNLIKELY( tile->batch.full_snapshot_fd==-1 ) ) {
150 0 : FD_LOG_WARNING(( "Failed to open the snapshot file (%i-%s)", errno, fd_io_strerror( errno ) ));
151 0 : }
152 :
153 0 : tile->batch.incremental_snapshot_fd = open( zstd_inc_dir_buf, O_RDWR | O_CREAT | O_TRUNC, 0644 );
154 0 : if( FD_UNLIKELY( tile->batch.incremental_snapshot_fd==-1 ) ) {
155 0 : FD_LOG_WARNING(( "Failed to open the snapshot file (%i-%s)", errno, fd_io_strerror( errno ) ));
156 0 : }
157 0 : }
158 :
159 : static void
160 : unprivileged_init( fd_topo_t * topo,
161 0 : fd_topo_tile_t * tile ) {
162 :
163 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
164 :
165 0 : if( FD_UNLIKELY( tile->out_cnt!=1UL || strcmp( topo->links[ tile->out_link_id[ REPLAY_OUT_IDX ] ].name, "batch_replay" ) ) ) {
166 0 : FD_LOG_ERR(( "batch tile has none or unexpected output links %lu %s",
167 0 : tile->out_cnt, topo->links[ tile->out_link_id[ REPLAY_OUT_IDX ] ].name ));
168 0 : }
169 :
170 : /**********************************************************************/
171 : /* scratch (bump)-allocate memory owned by the replay tile */
172 : /**********************************************************************/
173 :
174 : /* Do not modify order! This is join-order in unprivileged_init. */
175 :
176 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
177 0 : fd_snapshot_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapshot_tile_ctx_t), sizeof(fd_snapshot_tile_ctx_t) );
178 0 : memset( ctx, 0, sizeof(fd_snapshot_tile_ctx_t) );
179 0 : void * spad_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_spad_align(), fd_ulong_align_up( MEM_FOOTPRINT, fd_spad_align() ) );
180 0 : ulong scratch_alloc_mem = FD_SCRATCH_ALLOC_FINI ( l, scratch_align() );
181 :
182 0 : if( FD_UNLIKELY( scratch_alloc_mem > (ulong)scratch + scratch_footprint(tile) ) ) {
183 0 : FD_LOG_ERR(( "scratch overflow" ));
184 0 : }
185 :
186 0 : ctx->full_interval = tile->batch.full_interval;
187 0 : ctx->incremental_interval = tile->batch.incremental_interval;
188 0 : ctx->out_dir = tile->batch.out_dir;
189 0 : ctx->tmp_fd = tile->batch.tmp_fd;
190 0 : ctx->tmp_inc_fd = tile->batch.tmp_inc_fd;
191 0 : ctx->full_snapshot_fd = tile->batch.full_snapshot_fd;
192 0 : ctx->incremental_snapshot_fd = tile->batch.incremental_snapshot_fd;
193 :
194 : /**********************************************************************/
195 : /* tpool */
196 : /**********************************************************************/
197 :
198 0 : FD_LOG_NOTICE(( "Number of threads in hash tpool: %lu", tile->batch.hash_tpool_thread_count ));
199 :
200 0 : if( FD_LIKELY( tile->batch.hash_tpool_thread_count>1UL ) ) {
201 0 : tpool_batch_boot( topo, tile->batch.hash_tpool_thread_count );
202 0 : ctx->tpool = fd_tpool_init( ctx->tpool_mem, tile->batch.hash_tpool_thread_count );
203 0 : } else {
204 0 : ctx->tpool = NULL;
205 0 : }
206 :
207 0 : if( FD_LIKELY( tile->batch.hash_tpool_thread_count>1UL ) ) {
208 : /* Start the tpool workers */
209 0 : for( ulong i=1UL; i<tile->batch.hash_tpool_thread_count; i++ ) {
210 0 : if( FD_UNLIKELY( !fd_tpool_worker_push( ctx->tpool, i, NULL, 0UL ) ) ) {
211 0 : FD_LOG_ERR(( "failed to launch worker" ));
212 0 : }
213 0 : }
214 0 : }
215 :
216 : /**********************************************************************/
217 : /* spads */
218 : /**********************************************************************/
219 : /* FIXME: Define a bound for the size of the spad. It likely needs to be
220 : larger than this. */
221 0 : uchar * spad_mem_cur = spad_mem;
222 0 : ctx->spad = fd_spad_join( fd_spad_new( spad_mem_cur, MEM_FOOTPRINT ) );
223 :
224 : /**********************************************************************/
225 : /* funk */
226 : /**********************************************************************/
227 :
228 : /* We only want to join funk after it has been setup and joined in the
229 : replay tile.
230 : TODO: Eventually funk will be joined via a shared topology object. */
231 0 : ctx->is_funk_active = 0;
232 0 : memcpy( ctx->funk_file, tile->replay.funk_file, sizeof(tile->replay.funk_file) );
233 :
234 : /**********************************************************************/
235 : /* status cache */
236 : /**********************************************************************/
237 :
238 0 : ulong status_cache_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "txncache" );
239 0 : if( FD_UNLIKELY( status_cache_obj_id==ULONG_MAX ) ) {
240 0 : FD_LOG_ERR(( "no status cache object id" ));
241 0 : }
242 :
243 0 : ctx->status_cache = fd_txncache_join( fd_topo_obj_laddr( topo, status_cache_obj_id ) );
244 0 : if( FD_UNLIKELY( !ctx->status_cache ) ) {
245 0 : FD_LOG_ERR(( "no status cache" ));
246 0 : }
247 :
248 : /**********************************************************************/
249 : /* constipated fseq */
250 : /**********************************************************************/
251 :
252 0 : ulong constipated_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "constipate" );
253 0 : if( FD_UNLIKELY( constipated_obj_id==ULONG_MAX ) ) {
254 0 : FD_LOG_ERR(( "no constipated object id" ));
255 0 : }
256 0 : ctx->is_constipated = fd_fseq_join( fd_topo_obj_laddr( topo, constipated_obj_id ) );
257 0 : if( FD_UNLIKELY( !ctx->is_constipated ) ) {
258 0 : FD_LOG_ERR(( "replay tile has no constipated fseq" ));
259 0 : }
260 0 : fd_fseq_update( ctx->is_constipated, 0UL );
261 0 : if( FD_UNLIKELY( fd_fseq_query( ctx->is_constipated ) ) ) {
262 0 : FD_LOG_ERR(( "constipated fseq is not zero" ));
263 0 : }
264 :
265 : /**********************************************************************/
266 : /* snapshot */
267 : /**********************************************************************/
268 :
269 : /* Zero init all of the fields used for incremental snapshot generation
270 : that must be persisted across snapshot creation. */
271 :
272 0 : ctx->last_full_snap_slot = 0UL;
273 0 : ctx->last_capitalization = 0UL;
274 0 : fd_memset( &ctx->last_hash, 0, sizeof(fd_hash_t) );
275 :
276 : /****************************************************************************/
277 : /* Replay Tile Link */
278 : /****************************************************************************/
279 :
280 : /* Set up replay output */
281 0 : fd_topo_link_t * replay_out = &topo->links[ tile->out_link_id[ REPLAY_OUT_IDX ] ];
282 0 : ctx->replay_out_mem = topo->workspaces[ topo->objs[ replay_out->dcache_obj_id ].wksp_id ].wksp;
283 0 : ctx->replay_out_chunk = fd_dcache_compact_chunk0( ctx->replay_out_mem, replay_out->dcache );;
284 :
285 : /* replay public setup */
286 0 : ulong replay_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "replay_pub" );
287 0 : FD_TEST( replay_obj_id!=ULONG_MAX );
288 0 : ctx->replay_public_wksp = topo->workspaces[ topo->objs[ replay_obj_id ].wksp_id ].wksp;
289 :
290 0 : if( ctx->replay_public_wksp==NULL ) {
291 0 : FD_LOG_ERR(( "no replay_public workspace" ));
292 0 : }
293 :
294 0 : ctx->replay_public = fd_runtime_public_join( fd_topo_obj_laddr( topo, replay_obj_id ) );
295 0 : FD_TEST( ctx->replay_public!=NULL );
296 0 : }
297 :
298 : static void
299 0 : produce_snapshot( fd_snapshot_tile_ctx_t * ctx, ulong batch_fseq ) {
300 :
301 0 : ulong is_incremental = fd_batch_fseq_is_incremental( batch_fseq );
302 0 : ulong snapshot_slot = fd_batch_fseq_get_slot( batch_fseq );
303 :
304 0 : if( !is_incremental ) {
305 0 : ctx->last_full_snap_slot = snapshot_slot;
306 0 : }
307 :
308 0 : FD_LOG_WARNING(( "Creating snapshot incremental=%lu slot=%lu", is_incremental, snapshot_slot ));
309 :
310 0 : fd_snapshot_ctx_t snapshot_ctx = {
311 0 : .features = &ctx->replay_public->features,
312 0 : .slot = snapshot_slot,
313 0 : .out_dir = ctx->out_dir,
314 0 : .is_incremental = (uchar)is_incremental,
315 0 : .funk = ctx->funk,
316 0 : .status_cache = ctx->status_cache,
317 0 : .tmp_fd = is_incremental ? ctx->tmp_inc_fd : ctx->tmp_fd,
318 0 : .snapshot_fd = is_incremental ? ctx->incremental_snapshot_fd : ctx->full_snapshot_fd,
319 0 : .tpool = ctx->tpool,
320 : /* These parameters are ignored if the snapshot is not incremental. */
321 0 : .last_snap_slot = ctx->last_full_snap_slot,
322 0 : .last_snap_acc_hash = &ctx->last_hash,
323 0 : .last_snap_capitalization = ctx->last_capitalization,
324 0 : .spad = ctx->spad
325 0 : };
326 :
327 : /* If this isn't the first snapshot that this tile is creating, the
328 : permissions should be made to not acessible by users and should be
329 : renamed to the constant file that is expected. */
330 :
331 0 : char proc_filename[ FD_SNAPSHOT_DIR_MAX ];
332 0 : char prev_filename[ FD_SNAPSHOT_DIR_MAX ];
333 0 : char new_filename [ FD_SNAPSHOT_DIR_MAX ];
334 0 : snprintf( proc_filename, FD_SNAPSHOT_DIR_MAX, "/proc/self/fd/%d", is_incremental ? ctx->incremental_snapshot_fd : ctx->full_snapshot_fd );
335 0 : long len = readlink( proc_filename, prev_filename, FD_SNAPSHOT_DIR_MAX );
336 0 : if( FD_UNLIKELY( len<=0L ) ) {
337 0 : FD_LOG_ERR(( "Failed to readlink the snapshot file" ));
338 0 : }
339 0 : prev_filename[ len ] = '\0';
340 :
341 0 : int err = snprintf( new_filename, FD_SNAPSHOT_DIR_MAX, "%s/%s", ctx->out_dir, is_incremental ? FD_SNAPSHOT_TMP_INCR_ARCHIVE_ZSTD : FD_SNAPSHOT_TMP_FULL_ARCHIVE_ZSTD );
342 0 : if( FD_UNLIKELY( err<0 ) ) {
343 0 : FD_LOG_ERR(( "Can't format filename" ));
344 0 : return;
345 0 : }
346 :
347 0 : err = rename( prev_filename, new_filename );
348 0 : if( FD_UNLIKELY( err ) ) {
349 0 : FD_LOG_ERR(( "Failed to rename file from %s to %s", prev_filename, new_filename ));
350 0 : }
351 0 : FD_LOG_NOTICE(( "Renaming file from %s to %s", prev_filename, new_filename ));
352 :
353 0 : err = ftruncate( snapshot_ctx.tmp_fd, 0UL );
354 0 : if( FD_UNLIKELY( err==-1 ) ) {
355 0 : FD_LOG_ERR(( "Failed to truncate the temporary file (%i-%s)", errno, fd_io_strerror( errno ) ));
356 0 : }
357 :
358 0 : err = ftruncate( snapshot_ctx.snapshot_fd, 0UL );
359 0 : if( FD_UNLIKELY( err==-1 ) ) {
360 0 : FD_LOG_ERR(( "Failed to truncate the snapshot file (%i-%s)", errno, fd_io_strerror( errno ) ));
361 0 : }
362 :
363 0 : long seek = lseek( snapshot_ctx.tmp_fd, 0UL, SEEK_SET );
364 0 : if( FD_UNLIKELY( seek!=0L ) ) {
365 0 : FD_LOG_ERR(( "Failed to seek to the beginning of the file" ));
366 0 : }
367 :
368 : /* Now that the files are in an expected state, create the snapshot. */
369 0 : FD_SPAD_FRAME_BEGIN( snapshot_ctx.spad ) {
370 0 : fd_snapshot_create_new_snapshot( &snapshot_ctx, &ctx->last_hash, &ctx->last_capitalization );
371 0 : } FD_SPAD_FRAME_END;
372 :
373 0 : if( is_incremental ) {
374 0 : FD_LOG_NOTICE(( "Done creating a snapshot in %s", snapshot_ctx.out_dir ));
375 0 : FD_LOG_ERR(("Successful exit" ));
376 0 : }
377 :
378 0 : FD_LOG_NOTICE(( "Done creating a snapshot in %s", snapshot_ctx.out_dir ));
379 :
380 : /* At this point the snapshot has been succesfully created, so we can
381 : unconstipate funk and any related data structures in the replay tile. */
382 :
383 0 : fd_fseq_update( ctx->is_constipated, 0UL );
384 :
385 0 : }
386 :
387 : static fd_funk_txn_t*
388 0 : get_eah_txn( fd_funk_t * funk, ulong slot ) {
389 :
390 0 : fd_funk_txn_t * txn_map = fd_funk_txn_map( funk, fd_funk_wksp( funk ) );
391 0 : for( fd_funk_txn_map_iter_t iter = fd_funk_txn_map_iter_init( txn_map );
392 0 : !fd_funk_txn_map_iter_done( txn_map, iter );
393 0 : iter = fd_funk_txn_map_iter_next( txn_map, iter ) ) {
394 0 : fd_funk_txn_t * txn = fd_funk_txn_map_iter_ele( txn_map, iter );
395 0 : if( txn->xid.ul[0]==slot ) {
396 0 : FD_LOG_NOTICE(( "Found transaction for eah" ));
397 0 : return txn;
398 0 : }
399 0 : }
400 0 : FD_LOG_NOTICE(( "Calculating eah from root" ));
401 0 : return NULL;
402 0 : }
403 :
404 : static void
405 0 : produce_eah( fd_snapshot_tile_ctx_t * ctx, fd_stem_context_t * stem, ulong batch_fseq ) {
406 0 : ulong eah_slot = fd_batch_fseq_get_slot( batch_fseq );
407 :
408 0 : if( FD_FEATURE_ACTIVE_( eah_slot, ctx->replay_public->features, accounts_lt_hash ) )
409 0 : return;
410 :
411 0 : ulong tsorig = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
412 :
413 0 : FD_LOG_WARNING(( "Begining to produce epoch account hash in background for slot=%lu", eah_slot ));
414 :
415 : /* TODO: Perhaps it makes sense to factor this out into a function in the
416 : runtime as this could technically be considered a layering violation. */
417 :
418 : /* First, we must retrieve the corresponding slot_bank. We have the guarantee
419 : that the root record is frozen from the replay tile. */
420 :
421 0 : fd_funk_t * funk = ctx->funk;
422 0 : fd_funk_txn_t * eah_txn = get_eah_txn( funk, eah_slot );
423 0 : fd_funk_rec_key_t slot_id = fd_runtime_slot_bank_key();
424 0 : fd_funk_rec_t const * slot_rec = fd_funk_rec_query( funk, eah_txn, &slot_id );
425 0 : if( FD_UNLIKELY( !slot_rec ) ) {
426 0 : FD_LOG_ERR(( "Failed to read slot bank record: missing record" ));
427 0 : }
428 0 : void * slot_val = fd_funk_val( slot_rec, fd_funk_wksp( funk ) );
429 :
430 0 : if( FD_UNLIKELY( fd_funk_val_sz( slot_rec )<sizeof(uint) ) ) {
431 0 : FD_LOG_ERR(( "Failed to read slot bank record: empty record" ));
432 0 : }
433 :
434 0 : uint slot_magic = *(uint*)slot_val;
435 0 : FD_SPAD_FRAME_BEGIN( ctx->spad ) {
436 0 : fd_bincode_decode_ctx_t slot_decode_ctx = {
437 0 : .data = (uchar*)slot_val + sizeof(uint),
438 0 : .dataend = (uchar*)slot_val + fd_funk_val_sz( slot_rec )
439 0 : };
440 :
441 0 : if( FD_UNLIKELY( slot_magic!=FD_RUNTIME_ENC_BINCODE ) ) {
442 0 : FD_LOG_ERR(( "Slot bank record has wrong magic" ));
443 0 : }
444 :
445 0 : ulong total_sz = 0UL;
446 0 : int err = fd_slot_bank_decode_footprint( &slot_decode_ctx, &total_sz );
447 0 : if( FD_UNLIKELY( err ) ) {
448 0 : FD_LOG_ERR(( "Failed to read slot bank record: invalid decode" ));
449 0 : }
450 :
451 0 : uchar * mem = fd_spad_alloc( ctx->spad, fd_slot_bank_align(), total_sz );
452 0 : if( FD_UNLIKELY( !mem ) ) {
453 0 : FD_LOG_ERR(( "Failed to read slot bank record: unable to allocate memory" ));
454 0 : }
455 :
456 0 : fd_slot_bank_t * slot_bank = fd_slot_bank_decode( mem, &slot_decode_ctx );
457 :
458 : /* At this point, calculate the epoch account hash. */
459 :
460 0 : fd_hash_t epoch_account_hash = {0};
461 :
462 0 : fd_accounts_hash( funk, slot_bank, ctx->tpool, &epoch_account_hash, ctx->spad, 0, &ctx->replay_public->features );
463 :
464 0 : FD_LOG_NOTICE(( "Done computing epoch account hash (%s)", FD_BASE58_ENC_32_ALLOCA( &epoch_account_hash ) ));
465 :
466 : /* Once the hash is calculated, we are ready to push the computed hash
467 : onto the out link to replay. We don't need to add any other information
468 : as this is the only type of message that is transmitted. */
469 :
470 0 : uchar * out_buf = fd_chunk_to_laddr( ctx->replay_out_mem, ctx->replay_out_chunk );
471 0 : fd_memcpy( out_buf, epoch_account_hash.uc, sizeof(fd_hash_t) );
472 0 : ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
473 0 : fd_stem_publish( stem, 0UL, EAH_REPLAY_OUT_SIG, ctx->replay_out_chunk, sizeof(fd_hash_t), 0UL, tsorig, tspub );
474 :
475 : /* Reset the fseq allowing for the un-constipation of funk and allow for
476 : snapshots to be created again. */
477 :
478 0 : fd_fseq_update( ctx->is_constipated, 0UL );
479 0 : } FD_SPAD_FRAME_END;
480 0 : }
481 :
482 : static void
483 : after_credit( fd_snapshot_tile_ctx_t * ctx,
484 : fd_stem_context_t * stem,
485 : int * opt_poll_in FD_PARAM_UNUSED,
486 0 : int * charge_busy FD_PARAM_UNUSED ) {
487 :
488 0 : ulong batch_fseq = fd_fseq_query( ctx->is_constipated );
489 :
490 : /* If batch_fseq == 0, this means that we don't want to calculate/produce
491 : anything. Keep this tile spinning. */
492 0 : if( !batch_fseq ) {
493 0 : return;
494 0 : }
495 :
496 0 : if( FD_UNLIKELY( !ctx->is_funk_active ) ) {
497 : /* Setting these parameters are not required because we are joining the
498 : funk that was setup in the replay tile. */
499 0 : ctx->funk = fd_funk_open_file( ctx->funk_file,
500 0 : 1UL,
501 0 : 0UL,
502 0 : 0UL,
503 0 : 0UL,
504 0 : 0UL,
505 0 : FD_FUNK_READ_WRITE,
506 0 : NULL );
507 0 : if( FD_UNLIKELY( !ctx->funk ) ) {
508 0 : FD_LOG_ERR(( "failed to join a funky" ));
509 0 : }
510 0 : ctx->is_funk_active = 1;
511 :
512 0 : FD_LOG_WARNING(( "Just joined funk at file=%s", ctx->funk_file ));
513 0 : }
514 :
515 0 : if( fd_batch_fseq_is_snapshot( batch_fseq ) ) {
516 0 : produce_snapshot( ctx, batch_fseq );
517 0 : } else {
518 : // We need features to disable this...
519 0 : produce_eah( ctx, stem, batch_fseq );
520 0 : }
521 0 : }
522 :
523 : static ulong
524 : populate_allowed_seccomp( fd_topo_t const * topo,
525 : fd_topo_tile_t const * tile,
526 : ulong out_cnt,
527 0 : struct sock_filter * out ) {
528 0 : (void)topo;
529 :
530 0 : populate_sock_filter_policy_fd_batch_tile( out_cnt,
531 0 : out,
532 0 : (uint)fd_log_private_logfile_fd(),
533 0 : (uint)tile->batch.tmp_fd,
534 0 : (uint)tile->batch.tmp_inc_fd,
535 0 : (uint)tile->batch.full_snapshot_fd,
536 0 : (uint)tile->batch.incremental_snapshot_fd );
537 0 : return sock_filter_policy_fd_batch_tile_instr_cnt;
538 0 : }
539 :
540 : static ulong
541 : populate_allowed_fds( fd_topo_t const * topo,
542 : fd_topo_tile_t const * tile,
543 : ulong out_fds_cnt,
544 0 : int * out_fds ) {
545 0 : (void)topo;
546 :
547 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) {
548 0 : FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
549 0 : }
550 :
551 0 : ulong out_cnt = 0UL;
552 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
553 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
554 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
555 :
556 0 : out_fds[ out_cnt++ ] = tile->batch.tmp_fd;
557 0 : out_fds[ out_cnt++ ] = tile->batch.tmp_inc_fd;
558 0 : out_fds[ out_cnt++ ] = tile->batch.full_snapshot_fd;
559 0 : out_fds[ out_cnt++ ] = tile->batch.incremental_snapshot_fd;
560 0 : return out_cnt;
561 0 : }
562 :
563 0 : #define STEM_BURST (1UL)
564 :
565 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snapshot_tile_ctx_t
566 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapshot_tile_ctx_t)
567 :
568 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
569 :
570 : #include "../../disco/stem/fd_stem.c"
571 :
572 : fd_topo_run_tile_t fd_tile_batch = {
573 : .name = "batch",
574 : .populate_allowed_seccomp = populate_allowed_seccomp,
575 : .populate_allowed_fds = populate_allowed_fds,
576 : .scratch_align = scratch_align,
577 : .scratch_footprint = scratch_footprint,
578 : .privileged_init = privileged_init,
579 : .unprivileged_init = unprivileged_init,
580 : .run = stem_run,
581 : };
|