Line data Source code
1 : #include <stdio.h> /* for vsnprintf */
2 : #include <stdarg.h> /* for va_list */
3 :
4 : #include "fd_sched.h"
5 : #include "../../util/math/fd_stat.h" /* for sorted search */
6 : #include "../../disco/fd_disco_base.h" /* for FD_MAX_TXN_PER_SLOT */
7 : #include "../../flamenco/accdb/fd_accdb_impl_v1.h"
8 : #include "../../flamenco/runtime/fd_runtime.h" /* for fd_runtime_load_txn_address_lookup_tables */
9 : #include "../../flamenco/runtime/sysvar/fd_sysvar_slot_hashes.h" /* for ALUTs */
10 :
11 0 : #define FD_SCHED_MAX_DEPTH (FD_RDISP_MAX_DEPTH>>2)
12 0 : #define FD_SCHED_MAX_STAGING_LANES_LOG (2)
13 0 : #define FD_SCHED_MAX_STAGING_LANES (1UL<<FD_SCHED_MAX_STAGING_LANES_LOG)
14 : #define FD_SCHED_MAX_EXEC_TILE_CNT (64UL)
15 0 : #define FD_SCHED_MAX_PRINT_BUF_SZ (2UL<<20)
16 :
17 : /* 64 ticks per slot, and a single gigantic microblock containing min
18 : size transactions. */
19 : FD_STATIC_ASSERT( FD_MAX_TXN_PER_SLOT_SHRED==((FD_SHRED_DATA_PAYLOAD_MAX_PER_SLOT-65UL*sizeof(fd_microblock_hdr_t))/FD_TXN_MIN_SERIALIZED_SZ), max_txn_per_slot_shred );
20 :
21 : /* We size the buffer to be able to hold residual data from the previous
22 : FEC set that only becomes parseable after the next FEC set is
23 : ingested, as well as the incoming FEC set. The largest minimally
24 : parseable unit of data is a transaction. So that much data may
25 : straddle FEC set boundaries. Other minimally parseable units of data
26 : include the microblock header and the microblock count within a
27 : batch. */
28 0 : #define FD_SCHED_MAX_PAYLOAD_PER_FEC (FD_STORE_DATA_MAX)
29 : #define FD_SCHED_MAX_FEC_BUF_SZ (FD_SCHED_MAX_PAYLOAD_PER_FEC+FD_TXN_MTU)
30 : FD_STATIC_ASSERT( FD_TXN_MTU>=sizeof(fd_microblock_hdr_t), resize buffer for residual data );
31 : FD_STATIC_ASSERT( FD_TXN_MTU>=sizeof(ulong), resize buffer for residual data );
32 :
33 0 : #define FD_SCHED_MAX_TXN_PER_FEC ((FD_SCHED_MAX_PAYLOAD_PER_FEC-1UL)/FD_TXN_MIN_SERIALIZED_SZ+1UL) /* 478 */
34 :
35 0 : #define FD_SCHED_MAGIC (0xace8a79c181f89b6UL) /* echo -n "fd_sched_v0" | sha512sum | head -c 16 */
36 :
37 0 : #define FD_SCHED_PARSER_OK (0)
38 0 : #define FD_SCHED_PARSER_AGAIN_LATER (1)
39 0 : #define FD_SCHED_PARSER_BAD_BLOCK (2)
40 :
41 :
42 : /* Structs. */
43 :
44 : #define SET_NAME txn_bitset
45 : #define SET_MAX FD_SCHED_MAX_DEPTH
46 : #include "../../util/tmpl/fd_set.c"
47 :
48 : struct fd_sched_block {
49 : ulong slot;
50 : ulong parent_slot;
51 : ulong parent_idx; /* Index of the parent in the pool. */
52 : ulong child_idx; /* Index of the left-child in the pool. */
53 : ulong sibling_idx; /* Index of the right-sibling in the pool. */
54 :
55 : /* Counters. */
56 : uint txn_parsed_cnt;
57 : /* txn_queued_cnt = txn_parsed_cnt-txn_in_flight_cnt-txn_done_cnt */
58 : uint txn_exec_in_flight_cnt;
59 : uint txn_exec_done_cnt;
60 : uint txn_sigverify_in_flight_cnt;
61 : uint txn_sigverify_done_cnt;
62 : uint txn_done_cnt; /* A transaction is considered done when all types of tasks associated with it are done. */
63 : ulong txn_pool_max_popcnt; /* Peak transaction pool occupancy during the time this block was replaying. */
64 : ulong block_pool_max_popcnt; /* Peak block pool occupancy. */
65 : uint shred_cnt;
66 : uint fec_cnt;
67 : ulong txn_idx[ FD_MAX_TXN_PER_SLOT ]; /* Indexed by parse order. */
68 : long txn_disp_ticks[ FD_MAX_TXN_PER_SLOT ]; /* Indexed by parse order. */
69 : long txn_done_ticks[ FD_MAX_TXN_PER_SLOT ]; /* Indexed by parse order. */
70 : fd_ed25519_sig_t txn_sigs[ FD_MAX_TXN_PER_SLOT ]; /* Indexed by parse order. */
71 :
72 : /* Parser state. */
73 : uchar txn[ FD_TXN_MAX_SZ ] __attribute__((aligned(alignof(fd_txn_t))));
74 : fd_hash_t poh; /* Latest PoH hash we've seen from the ingested FEC sets. */
75 : ulong mblks_rem; /* Number of microblocks remaining in the current batch. */
76 : ulong txns_rem; /* Number of transactions remaining in the current microblock. */
77 : fd_acct_addr_t aluts[ 256 ]; /* Resolve ALUT accounts into this buffer for more parallelism. */
78 : uint fec_buf_sz; /* Size of the fec_buf in bytes. */
79 : uint fec_buf_soff; /* Starting offset into fec_buf for unparsed transactions. */
80 : uint fec_buf_boff; /* Byte offset into raw block data of the first byte currently in fec_buf */
81 : uint fec_eob:1; /* FEC end-of-batch: set if the last FEC set in the batch is being
82 : ingested. */
83 : uint fec_sob:1; /* FEC start-of-batch: set if the parser expects to be receiving a new
84 : batch. */
85 :
86 : /* Block state. */
87 : uint fec_eos:1; /* FEC end-of-stream: set if the last FEC set in the block has been
88 : ingested. */
89 : uint rooted:1; /* Set if the block is rooted. */
90 : uint dying:1; /* Set if the block has been abandoned and no transactions should be
91 : scheduled from it. */
92 : uint in_sched:1; /* Set if the block is being tracked by the scheduler. */
93 : uint in_rdisp:1; /* Set if the block is being tracked by the dispatcher, either as staged
94 : or unstaged. */
95 : uint block_start_signaled:1; /* Set if the start-of-block sentinel has been dispatched. */
96 : uint block_end_signaled:1; /* Set if the end-of-block sentinel has been dispatched. */
97 : uint block_start_done:1; /* Set if the start-of-block processing has been completed. */
98 : uint block_end_done:1; /* Set if the end-of-block processing has been completed. */
99 : uint staged:1; /* Set if the block is in a dispatcher staging lane; a staged block is
100 : tracked by the dispatcher. */
101 : ulong staging_lane; /* Ignored if staged==0. */
102 : ulong luf_depth; /* Depth of longest unstaged fork starting from this node; only
103 : stageable unstaged descendants are counted. */
104 : uchar fec_buf[ FD_SCHED_MAX_FEC_BUF_SZ ]; /* The previous FEC set could have some residual data that only becomes
105 : parseable after the next FEC set is ingested. */
106 : uint shred_blk_offs[ FD_SHRED_BLK_MAX ]; /* The byte offsets into block data of ingested shreds */
107 : };
108 : typedef struct fd_sched_block fd_sched_block_t;
109 :
110 : FD_STATIC_ASSERT( sizeof(fd_hash_t)==sizeof(((fd_microblock_hdr_t *)0)->hash), unexpected poh hash size );
111 :
112 :
113 : struct fd_sched_metrics {
114 : uint block_added_cnt;
115 : uint block_added_staged_cnt;
116 : uint block_added_unstaged_cnt;
117 : uint block_added_dead_ood_cnt;
118 : uint block_removed_cnt;
119 : uint block_abandoned_cnt;
120 : uint block_bad_cnt;
121 : uint block_promoted_cnt;
122 : uint block_demoted_cnt;
123 : uint deactivate_no_child_cnt;
124 : uint deactivate_no_txn_cnt;
125 : uint deactivate_pruned_cnt;
126 : uint deactivate_abandoned_cnt;
127 : uint lane_switch_cnt;
128 : uint lane_promoted_cnt;
129 : uint lane_demoted_cnt;
130 : uint alut_success_cnt;
131 : uint alut_serializing_cnt;
132 : uint txn_abandoned_parsed_cnt;
133 : uint txn_abandoned_exec_done_cnt;
134 : uint txn_abandoned_done_cnt;
135 : uint txn_max_in_flight_cnt;
136 : ulong txn_weighted_in_flight_cnt;
137 : ulong txn_weighted_in_flight_tickcount;
138 : ulong txn_none_in_flight_tickcount;
139 : ulong txn_parsed_cnt;
140 : ulong txn_exec_done_cnt;
141 : ulong txn_sigverify_done_cnt;
142 : ulong txn_done_cnt;
143 : ulong bytes_ingested_cnt;
144 : ulong bytes_ingested_unparsed_cnt;
145 : ulong bytes_dropped_cnt;
146 : ulong fec_cnt;
147 : };
148 : typedef struct fd_sched_metrics fd_sched_metrics_t;
149 :
150 : struct fd_sched {
151 : char print_buf[ FD_SCHED_MAX_PRINT_BUF_SZ ];
152 : ulong print_buf_sz;
153 : fd_sched_metrics_t metrics[ 1 ];
154 : ulong canary; /* == FD_SCHED_MAGIC */
155 : ulong block_cnt_max; /* Immutable. */
156 : ulong exec_cnt; /* Immutable. */
157 : long txn_in_flight_last_tick;
158 : ulong root_idx;
159 : fd_rdisp_t * rdisp;
160 : ulong txn_exec_ready_bitset[ 1 ];
161 : ulong sigverify_ready_bitset[ 1 ];
162 : ulong active_bank_idx; /* Index of the actively replayed block, or ULONG_MAX if no block is
163 : actively replayed; has to have a transaction to dispatch; staged
164 : blocks that have no transactions to dispatch are not eligible for
165 : being active. */
166 : ulong staged_bitset; /* Bit i set if staging lane i is occupied. */
167 : ulong staged_head_bank_idx[ FD_SCHED_MAX_STAGING_LANES ]; /* Head of the linear chain in each staging lane, ignored if bit i is
168 : not set in the bitset. */
169 : ulong txn_pool_free_cnt;
170 : fd_txn_p_t txn_pool[ FD_SCHED_MAX_DEPTH ];
171 : uint txn_idx_to_parse_idx[ FD_SCHED_MAX_DEPTH ];
172 : ulong tile_to_bank_idx[ FD_SCHED_MAX_EXEC_TILE_CNT ]; /* Index of the bank that the exec tile is executing against. */
173 : txn_bitset_t exec_done_set[ txn_bitset_word_cnt ]; /* Indexed by txn_idx. */
174 : txn_bitset_t sigverify_done_set[ txn_bitset_word_cnt ]; /* Indexed by txn_idx. */
175 : fd_sched_block_t * block_pool; /* Just a flat array. */
176 : ulong block_pool_popcnt;
177 : };
178 : typedef struct fd_sched fd_sched_t;
179 :
180 :
181 : /* Internal helpers. */
182 :
183 : static void
184 : add_block( fd_sched_t * sched,
185 : ulong bank_idx,
186 : ulong parent_bank_idx );
187 :
188 : FD_WARN_UNUSED static int
189 : fd_sched_parse( fd_sched_t * sched, fd_sched_block_t * block, fd_sched_alut_ctx_t * alut_ctx );
190 :
191 : FD_WARN_UNUSED static int
192 : fd_sched_parse_txn( fd_sched_t * sched, fd_sched_block_t * block, fd_sched_alut_ctx_t * alut_ctx );
193 :
194 : static void
195 : try_activate_block( fd_sched_t * sched );
196 :
197 : static void
198 : check_or_set_active_block( fd_sched_t * sched );
199 :
200 : static void
201 : subtree_abandon( fd_sched_t * sched, fd_sched_block_t * block );
202 :
203 : static void
204 : maybe_switch_block( fd_sched_t * sched, ulong bank_idx );
205 :
206 : FD_FN_UNUSED static ulong
207 : find_and_stage_longest_unstaged_fork( fd_sched_t * sched, int lane_idx );
208 :
209 : static ulong
210 : compute_longest_unstaged_fork( fd_sched_t * sched, ulong bank_idx );
211 :
212 : static ulong
213 : stage_longest_unstaged_fork( fd_sched_t * sched, ulong bank_idx, int lane_idx );
214 :
215 : static inline fd_sched_block_t *
216 0 : block_pool_ele( fd_sched_t * sched, ulong idx ) {
217 0 : FD_TEST( idx<sched->block_cnt_max || idx==ULONG_MAX );
218 0 : return idx==ULONG_MAX ? NULL : sched->block_pool+idx;
219 0 : }
220 :
221 : FD_FN_UNUSED static inline int
222 0 : block_is_void( fd_sched_block_t * block ) {
223 0 : /* We've seen everything in the block and no transaction got parsed
224 0 : out. */
225 0 : return block->fec_eos && block->txn_parsed_cnt==0;
226 0 : }
227 :
228 : static inline int
229 0 : block_should_signal_end( fd_sched_block_t * block ) {
230 0 : return block->fec_eos && block->txn_parsed_cnt==block->txn_done_cnt && block->block_start_done && !block->block_end_signaled;
231 0 : }
232 :
233 : static inline int
234 0 : block_will_signal_end( fd_sched_block_t * block ) {
235 0 : return block->fec_eos && !block->block_end_signaled;
236 0 : }
237 :
238 : /* Is there something known to be dispatchable in the block? This is an
239 : important liveness property. A block that doesn't contain any known
240 : dispatchable tasks will be deactivated or demoted. */
241 : static inline int
242 0 : block_is_dispatchable( fd_sched_block_t * block ) {
243 0 : ulong exec_queued_cnt = block->txn_parsed_cnt-block->txn_exec_in_flight_cnt-block->txn_exec_done_cnt;
244 0 : ulong sigverify_queued_cnt = block->txn_parsed_cnt-block->txn_sigverify_in_flight_cnt-block->txn_sigverify_done_cnt;
245 0 : return exec_queued_cnt>0UL ||
246 0 : sigverify_queued_cnt>0UL ||
247 0 : !block->block_start_signaled ||
248 0 : block_will_signal_end( block );
249 0 : }
250 :
251 : static inline int
252 0 : block_is_in_flight( fd_sched_block_t * block ) {
253 0 : return block->txn_exec_in_flight_cnt || block->txn_sigverify_in_flight_cnt || (block->block_end_signaled && !block->block_end_done);
254 0 : }
255 :
256 : static inline int
257 0 : block_is_done( fd_sched_block_t * block ) {
258 0 : return block->fec_eos && block->txn_parsed_cnt==block->txn_done_cnt && block->block_start_done && block->block_end_done;
259 0 : }
260 :
261 : static inline int
262 0 : block_is_stageable( fd_sched_block_t * block ) {
263 0 : int rv = !block_is_done( block ) && !block->dying;
264 0 : if( FD_UNLIKELY( rv && !block->in_rdisp ) ) {
265 : /* Invariant: stageable blocks may be currently staged or unstaged,
266 : but must be in the dispatcher either way. When a block
267 : transitions to DONE, it will be immediately removed from the
268 : dispatcher. When a block transitions to DYING, it will be
269 : eventually abandoned from the dispatcher. */
270 0 : FD_LOG_CRIT(( "invariant violation: stageable block->in_rdisp==0, txn_parsed_cnt %u, txn_done_cnt %u, fec_eos %u,, slot %lu, parent slot %lu",
271 0 : block->txn_parsed_cnt, block->txn_done_cnt, (uint)block->fec_eos, block->slot, block->parent_slot ));
272 0 : }
273 0 : return rv;
274 0 : }
275 :
276 : static inline int
277 0 : block_is_promotable( fd_sched_block_t * block ) {
278 0 : return block_is_stageable( block ) && block_is_dispatchable( block ) && !block->staged;
279 0 : }
280 :
281 : static inline int
282 0 : block_is_activatable( fd_sched_block_t * block ) {
283 0 : return block_is_stageable( block ) && block_is_dispatchable( block ) && block->staged;
284 0 : }
285 :
286 : static inline int
287 0 : block_should_deactivate( fd_sched_block_t * block ) {
288 : /* We allow a grace period, during which a block has nothing to
289 : dispatch, but has something in-flight. The block is allowed to
290 : stay activated and ingest FEC sets during this time. The block
291 : will be deactivated if there's still nothing to dispatch by the
292 : time all in-flight tasks are completed. */
293 0 : return !block_is_activatable( block ) && !block_is_in_flight( block );
294 0 : }
295 :
296 : static inline ulong
297 0 : block_to_idx( fd_sched_t * sched, fd_sched_block_t * block ) { return (ulong)(block-sched->block_pool); }
298 :
299 : __attribute__((format(printf,2,3)))
300 : static void
301 : fd_sched_printf( fd_sched_t * sched,
302 : char const * fmt,
303 0 : ... ) {
304 0 : va_list ap;
305 0 : ulong len;
306 0 : va_start( ap, fmt );
307 0 : int ret = vsnprintf( sched->print_buf+sched->print_buf_sz,
308 0 : FD_SCHED_MAX_PRINT_BUF_SZ-sched->print_buf_sz,
309 0 : fmt, ap );
310 0 : va_end( ap );
311 0 : len = fd_ulong_if( ret<0, 0UL, fd_ulong_min( (ulong)ret, FD_SCHED_MAX_PRINT_BUF_SZ-sched->print_buf_sz-1UL ) );
312 0 : sched->print_buf[ sched->print_buf_sz+len ] = '\0';
313 0 : sched->print_buf_sz += len;
314 0 : }
315 :
316 : FD_FN_UNUSED static void
317 0 : log_block_txns( fd_sched_t * sched, fd_sched_block_t * block ) {
318 0 : for( ulong i=0UL; i<block->txn_parsed_cnt; i++ ) {
319 0 : sched->print_buf_sz = 0UL;
320 0 : FD_BASE58_ENCODE_64_BYTES( block->txn_sigs[ i ], sig_str );
321 0 : long disp_tick = block->txn_disp_ticks[ i ];
322 0 : long done_tick = block->txn_done_ticks[ i ];
323 0 : if( FD_LIKELY( disp_tick!=LONG_MAX && done_tick!=LONG_MAX ) ) fd_sched_printf( sched, "['%s',%ld,%ld],", sig_str, disp_tick, done_tick );
324 0 : else if( FD_LIKELY( disp_tick!=LONG_MAX ) ) fd_sched_printf( sched, "['%s',%ld,None],", sig_str, disp_tick );
325 0 : else fd_sched_printf( sched, "['%s',None,None],", sig_str );
326 0 : FD_LOG_DEBUG(( "%s", sched->print_buf ));
327 0 : }
328 0 : }
329 :
330 : FD_FN_UNUSED static void
331 0 : print_block_metrics( fd_sched_t * sched, fd_sched_block_t * block ) {
332 0 : fd_sched_printf( sched, "block idx %lu, block slot %lu, parent_slot %lu, fec_eos %d, rooted %d, txn_parsed_cnt %u, txn_exec_done_cnt %u, txn_sigverify_done_cnt %u, txn_done_cnt %u, shred_cnt %u, fec_cnt %u, txn_pool_max_popcnt %lu/%lu, block_pool_max_popcnt %lu/%lu, mblks_rem %lu, txns_rem %lu, fec_buf_sz %u, fec_buf_boff %u, fec_buf_soff %u, fec_eob %d, fec_sob %d\n",
333 0 : block_to_idx( sched, block ), block->slot, block->parent_slot, block->fec_eos, block->rooted, block->txn_parsed_cnt, block->txn_exec_done_cnt, block->txn_sigverify_done_cnt, block->txn_done_cnt, block->shred_cnt, block->fec_cnt, block->txn_pool_max_popcnt, FD_SCHED_MAX_DEPTH, block->block_pool_max_popcnt, sched->block_cnt_max, block->mblks_rem, block->txns_rem, block->fec_buf_sz, block->fec_buf_boff, block->fec_buf_soff, block->fec_eob, block->fec_sob );
334 0 : }
335 :
336 : FD_FN_UNUSED static void
337 0 : print_block_debug( fd_sched_t * sched, fd_sched_block_t * block ) {
338 0 : fd_sched_printf( sched, "block idx %lu, block slot %lu, parent_slot %lu, staged %d (lane %lu), dying %d, in_rdisp %d, fec_eos %d, rooted %d, block_start_signaled %d, block_end_signaled %d, block_start_done %d, block_end_done %d, txn_parsed_cnt %u, txn_exec_in_flight_cnt %u, txn_exec_done_cnt %u, txn_sigverify_in_flight_cnt %u, txn_sigverify_done_cnt %u, txn_done_cnt %u, shred_cnt %u, fec_cnt %u, txn_pool_max_popcnt %lu/%lu, block_pool_max_popcnt %lu/%lu, mblks_rem %lu, txns_rem %lu, fec_buf_sz %u, fec_buf_boff %u, fec_buf_soff %u, fec_eob %d, fec_sob %d\n",
339 0 : block_to_idx( sched, block ), block->slot, block->parent_slot, block->staged, block->staging_lane, block->dying, block->in_rdisp, block->fec_eos, block->rooted, block->block_start_signaled, block->block_end_signaled, block->block_start_done, block->block_end_done, block->txn_parsed_cnt, block->txn_exec_in_flight_cnt, block->txn_exec_done_cnt, block->txn_sigverify_in_flight_cnt, block->txn_sigverify_done_cnt, block->txn_done_cnt, block->shred_cnt, block->fec_cnt, block->txn_pool_max_popcnt, FD_SCHED_MAX_DEPTH, block->block_pool_max_popcnt, sched->block_cnt_max, block->mblks_rem, block->txns_rem, block->fec_buf_sz, block->fec_buf_boff, block->fec_buf_soff, block->fec_eob, block->fec_sob );
340 0 : }
341 :
342 : FD_FN_UNUSED static void
343 0 : print_block_and_parent( fd_sched_t * sched, fd_sched_block_t * block ) {
344 0 : print_block_debug( sched, block );
345 0 : fd_sched_block_t * parent = block_pool_ele( sched, block->parent_idx );
346 0 : if( FD_LIKELY( parent ) ) print_block_debug( sched, parent );
347 0 : }
348 :
349 : FD_FN_UNUSED static void
350 0 : print_metrics( fd_sched_t * sched ) {
351 0 : fd_sched_printf( sched, "metrics: block_added_cnt %u, block_added_staged_cnt %u, block_added_unstaged_cnt %u, block_added_dead_ood_cnt %u, block_removed_cnt %u, block_abandoned_cnt %u, block_bad_cnt %u, block_promoted_cnt %u, block_demoted_cnt %u, deactivate_no_child_cnt %u, deactivate_no_txn_cnt %u, deactivate_pruned_cnt %u, deactivate_abandoned_cnt %u, lane_switch_cnt %u, lane_promoted_cnt %u, lane_demoted_cnt %u, alut_success_cnt %u, alut_serializing_cnt %u, txn_abandoned_parsed_cnt %u, txn_abandoned_exec_done_cnt %u, txn_abandoned_done_cnt %u, txn_max_in_flight_cnt %u, txn_weighted_in_flight_cnt %lu, txn_weighted_in_flight_tickcount %lu, txn_none_in_flight_tickcount %lu, txn_parsed_cnt %lu, txn_exec_done_cnt %lu, txn_sigverify_done_cnt %lu, txn_done_cnt %lu, bytes_ingested_cnt %lu, bytes_ingested_unparsed_cnt %lu, bytes_dropped_cnt %lu, fec_cnt %lu\n",
352 0 : sched->metrics->block_added_cnt, sched->metrics->block_added_staged_cnt, sched->metrics->block_added_unstaged_cnt, sched->metrics->block_added_dead_ood_cnt, sched->metrics->block_removed_cnt, sched->metrics->block_abandoned_cnt, sched->metrics->block_bad_cnt, sched->metrics->block_promoted_cnt, sched->metrics->block_demoted_cnt, sched->metrics->deactivate_no_child_cnt, sched->metrics->deactivate_no_txn_cnt, sched->metrics->deactivate_pruned_cnt, sched->metrics->deactivate_abandoned_cnt, sched->metrics->lane_switch_cnt, sched->metrics->lane_promoted_cnt, sched->metrics->lane_demoted_cnt, sched->metrics->alut_success_cnt, sched->metrics->alut_serializing_cnt, sched->metrics->txn_abandoned_parsed_cnt, sched->metrics->txn_abandoned_exec_done_cnt, sched->metrics->txn_abandoned_done_cnt, sched->metrics->txn_max_in_flight_cnt, sched->metrics->txn_weighted_in_flight_cnt, sched->metrics->txn_weighted_in_flight_tickcount, sched->metrics->txn_none_in_flight_tickcount, sched->metrics->txn_parsed_cnt, sched->metrics->txn_exec_done_cnt, sched->metrics->txn_sigverify_done_cnt, sched->metrics->txn_done_cnt, sched->metrics->bytes_ingested_cnt, sched->metrics->bytes_ingested_unparsed_cnt, sched->metrics->bytes_dropped_cnt, sched->metrics->fec_cnt );
353 :
354 0 : }
355 :
356 : FD_FN_UNUSED static void
357 0 : print_sched( fd_sched_t * sched ) {
358 0 : fd_sched_printf( sched, "sched canary 0x%lx, exec_cnt %lu, root_idx %lu, txn_exec_ready_bitset[ 0 ] 0x%lx, sigverify_ready_bitset[ 0 ] 0x%lx, active_idx %lu, staged_bitset %lu, staged_head_idx[0] %lu, staged_head_idx[1] %lu, staged_head_idx[2] %lu, staged_head_idx[3] %lu, txn_pool_free_cnt %lu/%lu, block_pool_popcnt %lu/%lu\n",
359 0 : sched->canary, sched->exec_cnt, sched->root_idx, sched->txn_exec_ready_bitset[ 0 ], sched->sigverify_ready_bitset[ 0 ], sched->active_bank_idx, sched->staged_bitset, sched->staged_head_bank_idx[ 0 ], sched->staged_head_bank_idx[ 1 ], sched->staged_head_bank_idx[ 2 ], sched->staged_head_bank_idx[ 3 ], sched->txn_pool_free_cnt, FD_SCHED_MAX_DEPTH, sched->block_pool_popcnt, sched->block_cnt_max );
360 0 : fd_sched_block_t * active_block = block_pool_ele( sched, sched->active_bank_idx );
361 0 : if( active_block ) print_block_debug( sched, active_block );
362 0 : for( int l=0; l<(int)FD_SCHED_MAX_STAGING_LANES; l++ ) {
363 0 : if( fd_ulong_extract_bit( sched->staged_bitset, l ) ) {
364 0 : fd_sched_block_t * block = block_pool_ele( sched, sched->staged_head_bank_idx[ l ] );
365 0 : print_block_debug( sched, block );
366 0 : }
367 0 : }
368 0 : }
369 :
370 : FD_FN_UNUSED static void
371 0 : print_all( fd_sched_t * sched, fd_sched_block_t * block ) {
372 0 : print_metrics( sched );
373 0 : print_sched( sched );
374 0 : print_block_and_parent( sched, block );
375 0 : }
376 :
377 :
378 : /* Public functions. */
379 :
380 0 : ulong fd_sched_align( void ) {
381 0 : return fd_ulong_max( alignof(fd_sched_t),
382 0 : fd_ulong_max( fd_rdisp_align(),
383 0 : fd_ulong_max( alignof(fd_sched_block_t), 64UL ))); /* Minimally cache line aligned. */
384 0 : }
385 :
386 : ulong
387 0 : fd_sched_footprint( ulong block_cnt_max ) {
388 0 : ulong l = FD_LAYOUT_INIT;
389 0 : l = FD_LAYOUT_APPEND( l, fd_sched_align(), sizeof(fd_sched_t) );
390 0 : l = FD_LAYOUT_APPEND( l, fd_rdisp_align(), fd_rdisp_footprint( FD_SCHED_MAX_DEPTH, block_cnt_max ) ); /* dispatcher */
391 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_sched_block_t), block_cnt_max*sizeof(fd_sched_block_t) ); /* block pool */
392 0 : return FD_LAYOUT_FINI( l, fd_sched_align() );
393 0 : }
394 :
395 : void *
396 0 : fd_sched_new( void * mem, ulong block_cnt_max, ulong exec_cnt ) {
397 0 : FD_TEST( exec_cnt && exec_cnt<=FD_SCHED_MAX_EXEC_TILE_CNT );
398 :
399 0 : FD_SCRATCH_ALLOC_INIT( l, mem );
400 0 : fd_sched_t * sched = FD_SCRATCH_ALLOC_APPEND( l, fd_sched_align(), sizeof(fd_sched_t) );
401 0 : void * _rdisp = FD_SCRATCH_ALLOC_APPEND( l, fd_rdisp_align(), fd_rdisp_footprint( FD_SCHED_MAX_DEPTH, block_cnt_max ) );
402 0 : void * _bpool = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sched_block_t), block_cnt_max*sizeof(fd_sched_block_t) );
403 0 : FD_SCRATCH_ALLOC_FINI( l, fd_sched_align() );
404 :
405 0 : ulong seed = ((ulong)fd_tickcount()) ^ FD_SCHED_MAGIC;
406 0 : fd_rdisp_new( _rdisp, FD_SCHED_MAX_DEPTH, block_cnt_max, seed );
407 :
408 0 : fd_sched_block_t * bpool = (fd_sched_block_t *)_bpool;
409 0 : for( ulong i=0; i<block_cnt_max; i++ ) {
410 0 : bpool[ i ].in_sched = 0;
411 0 : }
412 :
413 0 : fd_memset( sched->metrics, 0, sizeof(fd_sched_metrics_t) );
414 0 : sched->txn_in_flight_last_tick = LONG_MAX;
415 :
416 0 : sched->canary = FD_SCHED_MAGIC;
417 0 : sched->block_cnt_max = block_cnt_max;
418 0 : sched->exec_cnt = exec_cnt;
419 0 : sched->root_idx = ULONG_MAX;
420 0 : sched->active_bank_idx = ULONG_MAX;
421 0 : sched->staged_bitset = 0UL;
422 :
423 0 : sched->txn_exec_ready_bitset[ 0 ] = fd_ulong_mask_lsb( (int)exec_cnt );
424 0 : sched->sigverify_ready_bitset[ 0 ] = fd_ulong_mask_lsb( (int)exec_cnt );
425 :
426 0 : sched->txn_pool_free_cnt = FD_SCHED_MAX_DEPTH-1UL; /* -1 because index 0 is unusable as a sentinel reserved by the dispatcher */
427 :
428 0 : txn_bitset_new( sched->exec_done_set );
429 0 : txn_bitset_new( sched->sigverify_done_set );
430 :
431 0 : sched->block_pool_popcnt = 0UL;
432 :
433 0 : return sched;
434 0 : }
435 :
436 : fd_sched_t *
437 0 : fd_sched_join( void * mem, ulong block_cnt_max ) {
438 0 : fd_sched_t * sched = (fd_sched_t *)mem;
439 :
440 0 : FD_TEST( sched->canary==FD_SCHED_MAGIC );
441 0 : FD_TEST( sched->block_cnt_max==block_cnt_max );
442 :
443 0 : FD_SCRATCH_ALLOC_INIT( l, mem );
444 0 : /* */ FD_SCRATCH_ALLOC_APPEND( l, fd_sched_align(), sizeof(fd_sched_t) );
445 0 : void * _rdisp = FD_SCRATCH_ALLOC_APPEND( l, fd_rdisp_align(), fd_rdisp_footprint( FD_SCHED_MAX_DEPTH, block_cnt_max ) );
446 0 : void * _bpool = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sched_block_t), block_cnt_max*sizeof(fd_sched_block_t) );
447 0 : FD_SCRATCH_ALLOC_FINI( l, fd_sched_align() );
448 :
449 0 : sched->rdisp = fd_rdisp_join( _rdisp );
450 0 : sched->block_pool = _bpool;
451 :
452 0 : txn_bitset_join( sched->exec_done_set );
453 0 : txn_bitset_join( sched->sigverify_done_set );
454 :
455 0 : return sched;
456 0 : }
457 :
458 : int
459 0 : fd_sched_fec_can_ingest( fd_sched_t * sched, fd_sched_fec_t * fec ) {
460 0 : FD_TEST( sched->canary==FD_SCHED_MAGIC );
461 0 : FD_TEST( fec->bank_idx<sched->block_cnt_max );
462 0 : FD_TEST( fec->parent_bank_idx<sched->block_cnt_max );
463 :
464 0 : if( FD_UNLIKELY( fec->fec->data_sz>FD_SCHED_MAX_PAYLOAD_PER_FEC ) ) {
465 0 : sched->print_buf_sz = 0UL;
466 0 : print_metrics( sched );
467 0 : print_sched( sched );
468 0 : FD_LOG_NOTICE(( "%s", sched->print_buf ));
469 0 : FD_LOG_CRIT(( "invalid FEC set: fec->data_sz %lu, slot %lu, parent slot %lu", fec->fec->data_sz, fec->slot, fec->parent_slot ));
470 0 : }
471 :
472 0 : ulong fec_buf_sz = 0UL;
473 0 : fd_sched_block_t * block = block_pool_ele( sched, fec->bank_idx );
474 0 : if( FD_LIKELY( !fec->is_first_in_block ) ) {
475 0 : fec_buf_sz += block->fec_buf_sz-block->fec_buf_soff;
476 0 : } else {
477 : /* No residual data as this is a fresh new block. */
478 0 : }
479 : /* Addition is safe and won't overflow because we checked the FEC set
480 : size above. */
481 0 : fec_buf_sz += fec->fec->data_sz;
482 : /* Assuming every transaction is min size, do we have enough free
483 : entries in the txn pool? For a more precise txn count, we would
484 : have to do some parsing. */
485 0 : return sched->txn_pool_free_cnt>=fec_buf_sz/FD_TXN_MIN_SERIALIZED_SZ;
486 0 : }
487 :
488 : int
489 : fd_sched_can_ingest( fd_sched_t * sched,
490 0 : ulong fec_cnt ) {
491 0 : FD_TEST( sched->canary==FD_SCHED_MAGIC );
492 : /* Worst case, we need one byte from the incoming data to extract a
493 : transaction out of the residual data, and the rest of the incoming
494 : data contributes toward min sized transactions. */
495 0 : return sched->txn_pool_free_cnt>=(FD_SCHED_MAX_TXN_PER_FEC*fec_cnt);
496 0 : }
497 :
498 : FD_WARN_UNUSED int
499 0 : fd_sched_fec_ingest( fd_sched_t * sched, fd_sched_fec_t * fec ) {
500 0 : FD_TEST( sched->canary==FD_SCHED_MAGIC );
501 0 : FD_TEST( fec->bank_idx<sched->block_cnt_max );
502 0 : FD_TEST( fec->parent_bank_idx<sched->block_cnt_max );
503 :
504 0 : fd_sched_block_t * block = block_pool_ele( sched, fec->bank_idx );
505 :
506 0 : if( FD_UNLIKELY( fec->fec->data_sz>FD_SCHED_MAX_PAYLOAD_PER_FEC ) ) {
507 0 : sched->print_buf_sz = 0UL;
508 0 : print_all( sched, block );
509 0 : FD_LOG_NOTICE(( "%s", sched->print_buf ));
510 0 : FD_LOG_CRIT(( "invalid FEC set: fec->data_sz %lu, slot %lu, parent slot %lu", fec->fec->data_sz, fec->slot, fec->parent_slot ));
511 0 : }
512 :
513 0 : if( FD_UNLIKELY( fec->is_first_in_block ) ) {
514 : /* This is a new block. */
515 0 : add_block( sched, fec->bank_idx, fec->parent_bank_idx );
516 0 : block->slot = fec->slot;
517 0 : block->parent_slot = fec->parent_slot;
518 :
519 0 : if( FD_UNLIKELY( block->dying ) ) {
520 : /* The child of a dead block is also dead. We added it to our
521 : fork tree just so we could track an entire lineage of dead
522 : children and propagate the dead property to the entire lineage,
523 : in case there were frags for more than one dead children
524 : in-flight at the time the parent was abandoned. That being
525 : said, we shouldn't need to add the dead child to the
526 : dispatcher. */
527 0 : sched->metrics->block_added_dead_ood_cnt++;
528 :
529 : /* Ignore the FEC set for a dead block. */
530 0 : sched->metrics->bytes_dropped_cnt += fec->fec->data_sz;
531 0 : return 1;
532 0 : }
533 :
534 : /* Try to find a staging lane for this block. */
535 0 : int alloc_lane = 0;
536 0 : fd_sched_block_t * parent_block = block_pool_ele( sched, fec->parent_bank_idx );
537 0 : if( FD_LIKELY( parent_block->staged ) ) {
538 : /* Parent is staged. So see if we can continue down the same
539 : staging lane. */
540 0 : ulong staging_lane = parent_block->staging_lane;
541 0 : ulong child_idx = parent_block->child_idx;
542 0 : while( child_idx!=ULONG_MAX ) {
543 0 : fd_sched_block_t * child = block_pool_ele( sched, child_idx );
544 0 : if( child->staged && child->staging_lane==staging_lane ) {
545 : /* Found a child on the same lane. So we're done. */
546 0 : staging_lane = FD_RDISP_UNSTAGED;
547 0 : break;
548 0 : }
549 0 : child_idx = child->sibling_idx;
550 0 : }
551 : /* No child is staged on the same lane as the parent. So stage
552 : this block. This is the common case. */
553 0 : if( FD_LIKELY( staging_lane!=FD_RDISP_UNSTAGED ) ) {
554 0 : block->in_rdisp = 1;
555 0 : block->staged = 1;
556 0 : block->staging_lane = staging_lane;
557 0 : fd_rdisp_add_block( sched->rdisp, fec->bank_idx, staging_lane );
558 0 : sched->metrics->block_added_cnt++;
559 0 : sched->metrics->block_added_staged_cnt++;
560 0 : } else {
561 0 : alloc_lane = 1;
562 0 : }
563 0 : } else {
564 0 : if( block_is_stageable( parent_block ) ) {
565 : /* Parent is unstaged but stageable. So let's be unstaged too.
566 : This is a policy decision to be lazy and not promote parent
567 : at the moment. */
568 0 : block->in_rdisp = 1;
569 0 : block->staged = 0;
570 0 : fd_rdisp_add_block( sched->rdisp, fec->bank_idx, FD_RDISP_UNSTAGED );
571 0 : sched->metrics->block_added_cnt++;
572 0 : sched->metrics->block_added_unstaged_cnt++;
573 0 : } else {
574 0 : alloc_lane = 1;
575 0 : }
576 0 : }
577 0 : if( FD_UNLIKELY( alloc_lane ) ) {
578 : /* We weren't able to inherit the parent's staging lane. So try
579 : to find a new staging lane. */
580 0 : if( FD_LIKELY( sched->staged_bitset!=fd_ulong_mask_lsb( FD_SCHED_MAX_STAGING_LANES ) ) ) { /* Optimize for lane available. */
581 0 : int lane_idx = fd_ulong_find_lsb( ~sched->staged_bitset );
582 0 : if( FD_UNLIKELY( lane_idx>=(int)FD_SCHED_MAX_STAGING_LANES ) ) {
583 0 : FD_LOG_CRIT(( "invariant violation: lane_idx %d, sched->staged_bitset %lx",
584 0 : lane_idx, sched->staged_bitset ));
585 0 : }
586 0 : sched->staged_bitset = fd_ulong_set_bit( sched->staged_bitset, lane_idx );
587 0 : sched->staged_head_bank_idx[ lane_idx ] = fec->bank_idx;
588 0 : block->in_rdisp = 1;
589 0 : block->staged = 1;
590 0 : block->staging_lane = (ulong)lane_idx;
591 0 : fd_rdisp_add_block( sched->rdisp, fec->bank_idx, block->staging_lane );
592 0 : sched->metrics->block_added_cnt++;
593 0 : sched->metrics->block_added_staged_cnt++;
594 0 : } else {
595 : /* No lanes available. */
596 0 : block->in_rdisp = 1;
597 0 : block->staged = 0;
598 0 : fd_rdisp_add_block( sched->rdisp, fec->bank_idx, FD_RDISP_UNSTAGED );
599 0 : sched->metrics->block_added_cnt++;
600 0 : sched->metrics->block_added_unstaged_cnt++;
601 0 : }
602 0 : }
603 0 : }
604 :
605 0 : block->txn_pool_max_popcnt = fd_ulong_max( block->txn_pool_max_popcnt, FD_SCHED_MAX_DEPTH-sched->txn_pool_free_cnt );
606 0 : block->block_pool_max_popcnt = fd_ulong_max( block->block_pool_max_popcnt, sched->block_pool_popcnt );
607 :
608 0 : if( FD_UNLIKELY( block->dying ) ) {
609 : /* Ignore the FEC set for a dead block. */
610 0 : sched->metrics->bytes_dropped_cnt += fec->fec->data_sz;
611 0 : return 1;
612 0 : }
613 :
614 0 : if( FD_UNLIKELY( !block->in_rdisp ) ) {
615 : /* Invariant: block must be in the dispatcher at this point. */
616 0 : sched->print_buf_sz = 0UL;
617 0 : print_all( sched, block );
618 0 : FD_LOG_NOTICE(( "%s", sched->print_buf ));
619 0 : FD_LOG_CRIT(( "invariant violation: block->in_rdisp==0, slot %lu, parent slot %lu",
620 0 : block->slot, block->parent_slot ));
621 0 : }
622 :
623 0 : if( FD_UNLIKELY( block->fec_eos ) ) {
624 : /* This means something is wrong upstream. We're getting more FEC
625 : sets for a block that has already ended, or so we were told. */
626 0 : sched->print_buf_sz = 0UL;
627 0 : print_all( sched, block );
628 0 : FD_LOG_NOTICE(( "%s", sched->print_buf ));
629 0 : FD_LOG_CRIT(( "invariant violation: block->fec_eos set but getting more FEC sets, slot %lu, parent slot %lu", fec->slot, fec->parent_slot ));
630 0 : }
631 0 : if( FD_UNLIKELY( block->fec_eob && fec->is_last_in_batch ) ) {
632 : /* If the previous FEC set ingestion and parse was successful,
633 : block->fec_eob should be cleared. The fact that fec_eob is set
634 : means that the previous batch didn't parse properly. So this is
635 : a bad block. We should refuse to replay down the fork. */
636 0 : FD_LOG_INFO(( "bad block: failed to parse, slot %lu, parent slot %lu", fec->slot, fec->parent_slot ));
637 0 : sched->print_buf_sz = 0UL;
638 0 : print_all( sched, block );
639 0 : FD_LOG_DEBUG(( "%s", sched->print_buf ));
640 0 : subtree_abandon( sched, block );
641 0 : sched->metrics->bytes_dropped_cnt += fec->fec->data_sz;
642 0 : sched->metrics->block_bad_cnt++;
643 0 : check_or_set_active_block( sched );
644 0 : return 0;
645 0 : }
646 0 : if( FD_UNLIKELY( block->child_idx!=ULONG_MAX ) ) {
647 : /* This means something is wrong upstream. FEC sets are not being
648 : delivered in replay order. We got a child block FEC set before
649 : this block was completely delivered. */
650 0 : sched->print_buf_sz = 0UL;
651 0 : print_all( sched, block );
652 0 : fd_sched_block_t * child_block = block_pool_ele( sched, block->child_idx );
653 0 : print_block_debug( sched, child_block );
654 0 : FD_LOG_NOTICE(( "%s", sched->print_buf ));
655 0 : FD_LOG_CRIT(( "invariant violation: block->child_idx %lu, slot %lu, parent slot %lu", block->child_idx, fec->slot, fec->parent_slot ));
656 0 : }
657 :
658 0 : FD_TEST( block->fec_buf_sz>=block->fec_buf_soff );
659 0 : if( FD_LIKELY( block->fec_buf_sz>block->fec_buf_soff ) ) {
660 : /* If there is residual data from the previous FEC set within the
661 : same batch, we move it to the beginning of the buffer and append
662 : the new FEC set. */
663 0 : memmove( block->fec_buf, block->fec_buf+block->fec_buf_soff, block->fec_buf_sz-block->fec_buf_soff );
664 0 : }
665 0 : block->fec_buf_boff += block->fec_buf_soff;
666 0 : block->fec_buf_sz -= block->fec_buf_soff;
667 0 : block->fec_buf_soff = 0;
668 : /* Addition is safe and won't overflow because we checked the FEC
669 : set size above. */
670 0 : if( FD_UNLIKELY( block->fec_buf_sz+fec->fec->data_sz>FD_SCHED_MAX_FEC_BUF_SZ ) ) {
671 : /* In a conformant block, there shouldn't be more than a
672 : transaction's worth of residual data left over from the previous
673 : FEC set within the same batch. So if this condition doesn't
674 : hold, it's a bad block. Instead of crashing, we should refuse to
675 : replay down the fork. */
676 0 : FD_LOG_INFO(( "bad block: fec_buf_sz %u, fec->data_sz %lu, slot %lu, parent slot %lu", block->fec_buf_sz, fec->fec->data_sz, fec->slot, fec->parent_slot ));
677 0 : sched->print_buf_sz = 0UL;
678 0 : print_all( sched, block );
679 0 : FD_LOG_DEBUG(( "%s", sched->print_buf ));
680 0 : subtree_abandon( sched, block );
681 0 : sched->metrics->bytes_dropped_cnt += fec->fec->data_sz;
682 0 : sched->metrics->block_bad_cnt++;
683 0 : check_or_set_active_block( sched );
684 0 : return 0;
685 0 : }
686 :
687 : /* Append the new FEC set to the end of the buffer. */
688 0 : fd_memcpy( block->fec_buf+block->fec_buf_sz, fec->fec->data, fec->fec->data_sz );
689 0 : block->fec_buf_sz += (uint)fec->fec->data_sz;
690 0 : sched->metrics->bytes_ingested_cnt += fec->fec->data_sz;
691 :
692 0 : block->fec_eob = fec->is_last_in_batch;
693 0 : block->fec_eos = fec->is_last_in_block;
694 :
695 0 : ulong block_sz = block->shred_cnt>0 ? block->shred_blk_offs[ block->shred_cnt-1 ] : 0UL;
696 0 : for( ulong i=0; i<fec->shred_cnt; i++ ) {
697 0 : if( FD_LIKELY( i<32UL ) ) {
698 0 : block->shred_blk_offs[ block->shred_cnt++ ] = (uint)block_sz + fec->fec->block_offs[ i ];
699 0 : } else if( FD_UNLIKELY( i!=fec->shred_cnt-1UL ) ) {
700 : /* We don't track shred boundaries after 32 shreds, assume they're
701 : sized uniformly */
702 0 : ulong num_overflow_shreds = fec->shred_cnt-32UL;
703 0 : ulong overflow_idx = i-32UL;
704 0 : ulong overflow_data_sz = fec->fec->data_sz-fec->fec->block_offs[ 31 ];
705 0 : block->shred_blk_offs[ block->shred_cnt++ ] = (uint)block_sz + fec->fec->block_offs[ 31 ] + (uint)(overflow_data_sz / num_overflow_shreds * (overflow_idx + 1UL));
706 0 : } else {
707 0 : block->shred_blk_offs[ block->shred_cnt++ ] = (uint)block_sz + (uint)fec->fec->data_sz;
708 0 : }
709 0 : }
710 :
711 0 : int err = fd_sched_parse( sched, block, fec->alut_ctx );
712 :
713 0 : block->fec_cnt++;
714 0 : sched->metrics->fec_cnt++;
715 :
716 0 : if( FD_UNLIKELY( err==FD_SCHED_PARSER_BAD_BLOCK ) ) {
717 0 : FD_LOG_INFO(( "bad block: slot %lu, parent slot %lu", block->slot, block->parent_slot ));
718 0 : sched->print_buf_sz = 0UL;
719 0 : print_all( sched, block );
720 0 : FD_LOG_DEBUG(( "%s", sched->print_buf ));
721 0 : subtree_abandon( sched, block );
722 0 : sched->metrics->bytes_dropped_cnt += block->fec_buf_sz-block->fec_buf_soff;
723 0 : sched->metrics->block_bad_cnt++;
724 0 : check_or_set_active_block( sched );
725 0 : return 0;
726 0 : }
727 :
728 : /* Check if we need to set the active block. */
729 0 : check_or_set_active_block( sched );
730 :
731 0 : return 1;
732 0 : }
733 :
734 : ulong
735 0 : fd_sched_task_next_ready( fd_sched_t * sched, fd_sched_task_t * out ) {
736 0 : FD_TEST( sched->canary==FD_SCHED_MAGIC );
737 :
738 0 : ulong exec_ready_bitset0 = sched->txn_exec_ready_bitset[ 0 ];
739 0 : ulong exec_fully_ready_bitset = sched->sigverify_ready_bitset[ 0 ] & exec_ready_bitset0;
740 0 : if( FD_UNLIKELY( !exec_fully_ready_bitset ) ) {
741 : /* Early exit if no exec tiles available. */
742 0 : return 0UL;
743 0 : }
744 :
745 0 : if( FD_UNLIKELY( sched->active_bank_idx==ULONG_MAX ) ) {
746 : /* No need to try activating a block. If we're in this state,
747 : there's truly nothing to execute. We will activate something
748 : when we ingest a FEC set with transactions. */
749 0 : return 0UL;
750 0 : }
751 :
752 0 : out->task_type = FD_SCHED_TT_NULL;
753 :
754 : /* We could in theory reevaluate staging lane allocation here and do
755 : promotion/demotion as needed. It's a policy decision to minimize
756 : fork churn for now and just execute down the same active fork. */
757 :
758 0 : ulong bank_idx = sched->active_bank_idx;
759 0 : fd_sched_block_t * block = block_pool_ele( sched, bank_idx );
760 0 : if( FD_UNLIKELY( block_should_deactivate( block ) ) ) {
761 0 : sched->print_buf_sz = 0UL;
762 0 : print_all( sched, block );
763 0 : FD_LOG_NOTICE(( "%s", sched->print_buf ));
764 0 : FD_LOG_CRIT(( "invariant violation: active_bank_idx %lu is not activatable nor has anything in-flight", sched->active_bank_idx ));
765 0 : }
766 :
767 0 : block->txn_pool_max_popcnt = fd_ulong_max( block->txn_pool_max_popcnt, FD_SCHED_MAX_DEPTH-sched->txn_pool_free_cnt );
768 0 : block->block_pool_max_popcnt = fd_ulong_max( block->block_pool_max_popcnt, sched->block_pool_popcnt );
769 :
770 0 : if( FD_UNLIKELY( !block->block_start_signaled ) ) {
771 0 : out->task_type = FD_SCHED_TT_BLOCK_START;
772 0 : out->block_start->bank_idx = bank_idx;
773 0 : out->block_start->parent_bank_idx = block->parent_idx;
774 0 : out->block_start->slot = block->slot;
775 0 : block->block_start_signaled = 1;
776 0 : return 1UL;
777 0 : }
778 :
779 0 : ulong exec_tile_idx0 = fd_ulong_if( !!exec_fully_ready_bitset, (ulong)fd_ulong_find_lsb( exec_fully_ready_bitset ), ULONG_MAX );
780 0 : ulong exec_queued_cnt = block->txn_parsed_cnt-block->txn_exec_in_flight_cnt-block->txn_exec_done_cnt;
781 0 : if( FD_LIKELY( exec_queued_cnt>0UL && fd_ulong_popcnt( exec_fully_ready_bitset ) ) ) { /* Optimize for no fork switching. */
782 : /* Transaction execution has the highest priority. Current mainnet
783 : block times are very much dominated by critical path transaction
784 : execution. To achieve the fastest block replay speed, we can't
785 : afford to make any mistake in critical path dispatching. Any
786 : deviation from perfect critical path dispatching is basically
787 : irrecoverable. As such, we try to keep all the exec tiles busy
788 : with transaction execution, but we allow at most one transaction
789 : to be in-flight per exec tile. This is to ensure that whenever a
790 : critical path transaction completes, we have at least one exec
791 : tile, e.g. the one that just completed said transaction, readily
792 : available to continue executing down the critical path. */
793 0 : out->txn_exec->txn_idx = fd_rdisp_get_next_ready( sched->rdisp, bank_idx );
794 0 : if( FD_UNLIKELY( out->txn_exec->txn_idx==0UL ) ) {
795 : /* There are transactions queued but none ready for execution.
796 : This implies that there must be in-flight transactions on whose
797 : completion the queued transactions depend. So we return and
798 : wait for those in-flight transactions to retire. This is a
799 : policy decision to execute as much as we can down the current
800 : fork. */
801 0 : if( FD_UNLIKELY( !block->txn_exec_in_flight_cnt ) ) {
802 0 : sched->print_buf_sz = 0UL;
803 0 : print_all( sched, block );
804 0 : FD_LOG_NOTICE(( "%s", sched->print_buf ));
805 0 : FD_LOG_CRIT(( "invariant violation: no ready transaction found but block->txn_exec_in_flight_cnt==0" ));
806 0 : }
807 :
808 : /* Dispatch more sigverify tasks only if at least one exec tile is
809 : executing transactions or completely idle. Allow at most one
810 : sigverify task in-flight per tile, and only dispatch to
811 : completely idle tiles. */
812 0 : ulong sigverify_ready_bitset = exec_fully_ready_bitset;
813 0 : ulong sigverify_queued_cnt = block->txn_parsed_cnt-block->txn_sigverify_in_flight_cnt-block->txn_sigverify_done_cnt;
814 0 : if( FD_LIKELY( sigverify_queued_cnt>0UL && fd_ulong_popcnt( sigverify_ready_bitset )>fd_int_if( block->txn_exec_in_flight_cnt>0U, 0, 1 ) ) ) {
815 : /* Dispatch transactions for sigverify in parse order. */
816 0 : int exec_tile_idx_sigverify = fd_ulong_find_lsb( sigverify_ready_bitset );
817 0 : out->task_type = FD_SCHED_TT_TXN_SIGVERIFY;
818 0 : out->txn_sigverify->bank_idx = bank_idx;
819 0 : out->txn_sigverify->txn_idx = block->txn_idx[ block->txn_sigverify_done_cnt+block->txn_sigverify_in_flight_cnt ];
820 0 : out->txn_sigverify->exec_idx = (ulong)exec_tile_idx_sigverify;
821 0 : sched->sigverify_ready_bitset[ 0 ] = fd_ulong_clear_bit( sched->sigverify_ready_bitset[ 0 ], exec_tile_idx_sigverify );
822 0 : sched->tile_to_bank_idx[ exec_tile_idx_sigverify ] = bank_idx;
823 0 : block->txn_sigverify_in_flight_cnt++;
824 0 : if( FD_UNLIKELY( (~sched->txn_exec_ready_bitset[ 0 ])&(~sched->sigverify_ready_bitset[ 0 ])&fd_ulong_mask_lsb( (int)sched->exec_cnt ) ) ) FD_LOG_CRIT(( "invariant violation: txn_exec_ready_bitset 0x%lx sigverify_ready_bitset 0x%lx", sched->txn_exec_ready_bitset[ 0 ], sched->sigverify_ready_bitset[ 0 ] ));
825 0 : return 1UL;
826 0 : }
827 0 : return 0UL;
828 0 : }
829 0 : out->task_type = FD_SCHED_TT_TXN_EXEC;
830 0 : out->txn_exec->bank_idx = bank_idx;
831 0 : out->txn_exec->slot = block->slot;
832 0 : out->txn_exec->exec_idx = exec_tile_idx0;
833 0 : FD_TEST( out->txn_exec->exec_idx!=ULONG_MAX );
834 :
835 0 : long now = fd_tickcount();
836 0 : ulong delta = (ulong)(now-sched->txn_in_flight_last_tick);
837 0 : ulong txn_exec_busy_cnt = sched->exec_cnt-(ulong)fd_ulong_popcnt( exec_ready_bitset0 );
838 0 : sched->metrics->txn_none_in_flight_tickcount += fd_ulong_if( txn_exec_busy_cnt==0UL && sched->txn_in_flight_last_tick!=LONG_MAX, delta, 0UL );
839 0 : sched->metrics->txn_weighted_in_flight_tickcount += fd_ulong_if( txn_exec_busy_cnt!=0UL, delta, 0UL );
840 0 : sched->metrics->txn_weighted_in_flight_cnt += delta*txn_exec_busy_cnt;
841 0 : sched->txn_in_flight_last_tick = now;
842 :
843 0 : block->txn_disp_ticks[ sched->txn_idx_to_parse_idx[ out->txn_exec->txn_idx ] ] = now;
844 :
845 0 : sched->txn_exec_ready_bitset[ 0 ] = fd_ulong_clear_bit( exec_ready_bitset0, (int)exec_tile_idx0);
846 0 : sched->tile_to_bank_idx[ exec_tile_idx0 ] = bank_idx;
847 :
848 0 : block->txn_exec_in_flight_cnt++;
849 0 : sched->metrics->txn_max_in_flight_cnt = fd_uint_max( sched->metrics->txn_max_in_flight_cnt, block->txn_exec_in_flight_cnt );
850 :
851 0 : ulong total_exec_busy_cnt = sched->exec_cnt-(ulong)fd_ulong_popcnt( sched->txn_exec_ready_bitset[ 0 ]&sched->sigverify_ready_bitset[ 0 ] );
852 0 : if( FD_UNLIKELY( (~sched->txn_exec_ready_bitset[ 0 ])&(~sched->sigverify_ready_bitset[ 0 ])&fd_ulong_mask_lsb( (int)sched->exec_cnt ) ) ) FD_LOG_CRIT(( "invariant violation: txn_exec_ready_bitset 0x%lx sigverify_ready_bitset 0x%lx", sched->txn_exec_ready_bitset[ 0 ], sched->sigverify_ready_bitset[ 0 ] ));
853 0 : if( FD_UNLIKELY( block->txn_exec_in_flight_cnt+block->txn_sigverify_in_flight_cnt!=total_exec_busy_cnt ) ) {
854 : /* Ideally we'd simply assert that the two sides of the equation
855 : are equal. But abandoned blocks throw a wrench into this. We
856 : allow abandoned blocks to have in-flight transactions that are
857 : naturally drained while we try to dispatch from another block.
858 : In such cases, the total number of in-flight transactions
859 : should include the abandoned blocks too. The contract is that
860 : blocks with in-flight transactions cannot be abandoned or
861 : demoted from rdisp. So a dying block has to be the head of one
862 : of the staging lanes. */
863 0 : ulong total_in_flight = 0UL;
864 0 : for( int l=0; l<(int)FD_SCHED_MAX_STAGING_LANES; l++ ) {
865 0 : if( fd_ulong_extract_bit( sched->staged_bitset, l ) ) {
866 0 : fd_sched_block_t * staged_block = block_pool_ele( sched, sched->staged_head_bank_idx[ l ] );
867 0 : if( FD_UNLIKELY( block_is_in_flight( staged_block )&&!(staged_block==block||staged_block->dying) ) ) {
868 0 : sched->print_buf_sz = 0UL;
869 0 : print_all( sched, staged_block );
870 0 : FD_LOG_NOTICE(( "%s", sched->print_buf ));
871 0 : FD_LOG_CRIT(( "invariant violation: in-flight block is neither active nor dying" ));
872 0 : }
873 0 : total_in_flight += staged_block->txn_exec_in_flight_cnt;
874 0 : total_in_flight += staged_block->txn_sigverify_in_flight_cnt;
875 0 : }
876 0 : }
877 0 : if( FD_UNLIKELY( total_in_flight!=total_exec_busy_cnt ) ) {
878 0 : sched->print_buf_sz = 0UL;
879 0 : print_all( sched, block );
880 0 : FD_LOG_NOTICE(( "%s", sched->print_buf ));
881 0 : FD_LOG_CRIT(( "invariant violation: total_in_flight %lu != total_exec_busy_cnt %lu", total_in_flight, total_exec_busy_cnt ));
882 0 : }
883 0 : FD_LOG_DEBUG(( "exec_busy_cnt %lu checks out", total_exec_busy_cnt ));
884 0 : }
885 0 : return 1UL;
886 0 : }
887 :
888 : /* At this point txn_queued_cnt==0 */
889 :
890 : /* Try to dispatch a sigverify task, but leave one exec tile idle for
891 : critical path execution, unless there's not going to be any more
892 : real transactions for the critical path. In the degenerate case of
893 : only one exec tile, keep it busy. */
894 0 : ulong sigverify_ready_bitset = exec_fully_ready_bitset;
895 0 : ulong sigverify_queued_cnt = block->txn_parsed_cnt-block->txn_sigverify_in_flight_cnt-block->txn_sigverify_done_cnt;
896 0 : if( FD_LIKELY( sigverify_queued_cnt>0UL && fd_ulong_popcnt( sigverify_ready_bitset )>fd_int_if( block->fec_eos||block->txn_exec_in_flight_cnt>0U||sched->exec_cnt==1UL, 0, 1 ) ) ) {
897 : /* Dispatch transactions for sigverify in parse order. */
898 0 : int exec_tile_idx_sigverify = fd_ulong_find_lsb( sigverify_ready_bitset );
899 0 : out->task_type = FD_SCHED_TT_TXN_SIGVERIFY;
900 0 : out->txn_sigverify->txn_idx = block->txn_idx[ block->txn_sigverify_done_cnt+block->txn_sigverify_in_flight_cnt ];
901 0 : out->txn_sigverify->bank_idx = bank_idx;
902 0 : out->txn_sigverify->exec_idx = (ulong)exec_tile_idx_sigverify;
903 0 : sched->sigverify_ready_bitset[ 0 ] = fd_ulong_clear_bit( sched->sigverify_ready_bitset[ 0 ], exec_tile_idx_sigverify );
904 0 : sched->tile_to_bank_idx[ exec_tile_idx_sigverify ] = bank_idx;
905 0 : block->txn_sigverify_in_flight_cnt++;
906 0 : if( FD_UNLIKELY( (~sched->txn_exec_ready_bitset[ 0 ])&(~sched->sigverify_ready_bitset[ 0 ])&fd_ulong_mask_lsb( (int)sched->exec_cnt ) ) ) FD_LOG_CRIT(( "invariant violation: txn_exec_ready_bitset 0x%lx sigverify_ready_bitset 0x%lx", sched->txn_exec_ready_bitset[ 0 ], sched->sigverify_ready_bitset[ 0 ] ));
907 0 : return 1UL;
908 0 : }
909 :
910 0 : if( FD_UNLIKELY( block_should_signal_end( block ) ) ) {
911 0 : FD_TEST( block->block_start_signaled );
912 0 : out->task_type = FD_SCHED_TT_BLOCK_END;
913 0 : out->block_end->bank_idx = bank_idx;
914 0 : block->block_end_signaled = 1;
915 0 : return 1UL;
916 0 : }
917 :
918 : /* Nothing queued for the active block. If we haven't received all
919 : the FEC sets for it, then return and wait for more FEC sets, while
920 : there are in-flight transactions. This is a policy decision to
921 : minimize fork churn and allow for executing down the current fork
922 : as much as we can. If we have received all the FEC sets for it,
923 : then we'd still like to return and wait for the in-flight
924 : transactions to retire, before switching to a different block.
925 :
926 : Either way, there should be in-flight transactions. We deactivate
927 : the active block the moment we exhausted transactions from it. */
928 0 : if( FD_UNLIKELY( !block_is_in_flight( block ) ) ) {
929 0 : sched->print_buf_sz = 0UL;
930 0 : print_all( sched, block );
931 0 : FD_LOG_NOTICE(( "%s", sched->print_buf ));
932 0 : FD_LOG_CRIT(( "invariant violation: expected in-flight transactions but none" ));
933 0 : }
934 :
935 0 : return 0UL;
936 0 : }
937 :
938 : void
939 0 : fd_sched_task_done( fd_sched_t * sched, ulong task_type, ulong txn_idx, ulong exec_idx ) {
940 0 : FD_TEST( sched->canary==FD_SCHED_MAGIC );
941 :
942 0 : ulong bank_idx = ULONG_MAX;
943 0 : switch( task_type ) {
944 0 : case FD_SCHED_TT_BLOCK_START:
945 0 : case FD_SCHED_TT_BLOCK_END: {
946 0 : (void)txn_idx;
947 0 : bank_idx = sched->active_bank_idx;
948 0 : break;
949 0 : }
950 0 : case FD_SCHED_TT_TXN_EXEC:
951 0 : case FD_SCHED_TT_TXN_SIGVERIFY: {
952 0 : FD_TEST( txn_idx<FD_SCHED_MAX_DEPTH );
953 0 : bank_idx = sched->tile_to_bank_idx[ exec_idx ];
954 0 : break;
955 0 : }
956 0 : default: FD_LOG_CRIT(( "unsupported task_type %lu", task_type ));
957 0 : }
958 0 : fd_sched_block_t * block = block_pool_ele( sched, bank_idx );
959 :
960 0 : if( FD_UNLIKELY( !block->staged ) ) {
961 : /* Invariant: only staged blocks can have in-flight transactions. */
962 0 : FD_LOG_CRIT(( "invariant violation: block->staged==0, slot %lu, parent slot %lu",
963 0 : block->slot, block->parent_slot ));
964 0 : }
965 0 : if( FD_UNLIKELY( !block->in_rdisp ) ) {
966 : /* Invariant: staged blocks must be in the dispatcher. */
967 0 : FD_LOG_CRIT(( "invariant violation: block->in_rdisp==0, slot %lu, parent slot %lu",
968 0 : block->slot, block->parent_slot ));
969 0 : }
970 :
971 0 : block->txn_pool_max_popcnt = fd_ulong_max( block->txn_pool_max_popcnt, FD_SCHED_MAX_DEPTH-sched->txn_pool_free_cnt );
972 0 : block->block_pool_max_popcnt = fd_ulong_max( block->block_pool_max_popcnt, sched->block_pool_popcnt );
973 :
974 0 : int exec_tile_idx = (int)exec_idx;
975 :
976 0 : switch( task_type ) {
977 0 : case FD_SCHED_TT_BLOCK_START: {
978 0 : FD_TEST( !block->block_start_done );
979 0 : block->block_start_done = 1;
980 0 : break;
981 0 : }
982 0 : case FD_SCHED_TT_BLOCK_END: {
983 : /* It may seem redundant to be invoking task_done() on these
984 : somewhat fake tasks. But these are necessary to drive state
985 : transition for empty blocks or slow blocks. */
986 0 : FD_TEST( !block->block_end_done );
987 0 : block->block_end_done = 1;
988 0 : break;
989 0 : }
990 0 : case FD_SCHED_TT_TXN_EXEC: {
991 0 : long now = fd_tickcount();
992 0 : ulong delta = (ulong)(now-sched->txn_in_flight_last_tick);
993 0 : ulong txn_exec_busy_cnt = sched->exec_cnt-(ulong)fd_ulong_popcnt( sched->txn_exec_ready_bitset[ 0 ] );
994 0 : sched->metrics->txn_weighted_in_flight_tickcount += delta;
995 0 : sched->metrics->txn_weighted_in_flight_cnt += delta*txn_exec_busy_cnt;
996 0 : sched->txn_in_flight_last_tick = now;
997 :
998 0 : block->txn_done_ticks[ sched->txn_idx_to_parse_idx[ txn_idx ] ] = now;
999 :
1000 0 : block->txn_exec_done_cnt++;
1001 0 : block->txn_exec_in_flight_cnt--;
1002 0 : sched->metrics->txn_exec_done_cnt++;
1003 0 : txn_bitset_insert( sched->exec_done_set, txn_idx );
1004 0 : if( txn_bitset_test( sched->exec_done_set, txn_idx ) && txn_bitset_test( sched->sigverify_done_set, txn_idx ) ) {
1005 : /* Release txn_idx if both exec and sigverify are done. This is
1006 : guaranteed to only happen once per transaction because
1007 : whichever one completed first would not release. */
1008 0 : fd_rdisp_complete_txn( sched->rdisp, txn_idx, 1 );
1009 0 : sched->txn_pool_free_cnt++;
1010 0 : block->txn_done_cnt++;
1011 0 : sched->metrics->txn_done_cnt++;
1012 0 : } else {
1013 0 : fd_rdisp_complete_txn( sched->rdisp, txn_idx, 0 );
1014 0 : }
1015 :
1016 0 : FD_TEST( !fd_ulong_extract_bit( sched->txn_exec_ready_bitset[ 0 ], exec_tile_idx ) );
1017 0 : sched->txn_exec_ready_bitset[ 0 ] = fd_ulong_set_bit( sched->txn_exec_ready_bitset[ 0 ], exec_tile_idx );
1018 0 : break;
1019 0 : }
1020 0 : case FD_SCHED_TT_TXN_SIGVERIFY: {
1021 0 : block->txn_sigverify_done_cnt++;
1022 0 : block->txn_sigverify_in_flight_cnt--;
1023 0 : sched->metrics->txn_sigverify_done_cnt++;
1024 0 : txn_bitset_insert( sched->sigverify_done_set, txn_idx );
1025 0 : if( txn_bitset_test( sched->exec_done_set, txn_idx ) && txn_bitset_test( sched->sigverify_done_set, txn_idx ) ) {
1026 : /* Release txn_idx if both exec and sigverify are done. This is
1027 : guaranteed to only happen once per transaction because
1028 : whichever one completed first would not release. */
1029 0 : fd_rdisp_complete_txn( sched->rdisp, txn_idx, 1 );
1030 0 : sched->txn_pool_free_cnt++;
1031 0 : block->txn_done_cnt++;
1032 0 : sched->metrics->txn_done_cnt++;
1033 0 : }
1034 :
1035 0 : FD_TEST( !fd_ulong_extract_bit( sched->sigverify_ready_bitset[ 0 ], exec_tile_idx ) );
1036 0 : sched->sigverify_ready_bitset[ 0 ] = fd_ulong_set_bit( sched->sigverify_ready_bitset[ 0 ], exec_tile_idx );
1037 0 : break;
1038 0 : }
1039 0 : }
1040 :
1041 0 : if( FD_UNLIKELY( block->dying && !block_is_in_flight( block ) ) ) {
1042 0 : if( FD_UNLIKELY( sched->active_bank_idx==bank_idx ) ) {
1043 0 : FD_LOG_CRIT(( "invariant violation: active block shouldn't be dying, bank_idx %lu, slot %lu, parent slot %lu",
1044 0 : bank_idx, block->slot, block->parent_slot ));
1045 0 : }
1046 0 : FD_LOG_DEBUG(( "dying block %lu drained", block->slot ));
1047 0 : subtree_abandon( sched, block );
1048 0 : return;
1049 0 : }
1050 :
1051 0 : if( FD_UNLIKELY( !block->dying && sched->active_bank_idx!=bank_idx ) ) {
1052 : /* Block is not dead. So we should be actively replaying it. */
1053 0 : fd_sched_block_t * active_block = block_pool_ele( sched, sched->active_bank_idx );
1054 0 : FD_LOG_CRIT(( "invariant violation: sched->active_bank_idx %lu, slot %lu, parent slot %lu, bank_idx %lu, slot %lu, parent slot %lu",
1055 0 : sched->active_bank_idx, active_block->slot, active_block->parent_slot,
1056 0 : bank_idx, block->slot, block->parent_slot ));
1057 0 : }
1058 :
1059 0 : maybe_switch_block( sched, bank_idx );
1060 0 : }
1061 :
1062 : void
1063 0 : fd_sched_block_abandon( fd_sched_t * sched, ulong bank_idx ) {
1064 0 : FD_TEST( sched->canary==FD_SCHED_MAGIC );
1065 0 : FD_TEST( bank_idx<sched->block_cnt_max );
1066 :
1067 0 : fd_sched_block_t * block = block_pool_ele( sched, bank_idx );
1068 0 : if( FD_UNLIKELY( bank_idx!=sched->active_bank_idx ) ) {
1069 : /* Invariant: abandoning should only be performed on actively
1070 : replayed blocks. We impose this requirement on the caller
1071 : because the dispatcher expects blocks to be abandoned in the same
1072 : order that they were added, and having this requirement makes it
1073 : easier to please the dispatcher. */
1074 0 : sched->print_buf_sz = 0UL;
1075 0 : print_all( sched, block );
1076 0 : FD_LOG_NOTICE(( "%s", sched->print_buf ));
1077 0 : FD_LOG_CRIT(( "invariant violation: active_bank_idx %lu, bank_idx %lu, slot %lu, parent slot %lu",
1078 0 : sched->active_bank_idx, bank_idx, block->slot, block->parent_slot ));
1079 0 : }
1080 :
1081 0 : FD_LOG_INFO(( "abandoning block %lu", block->slot ));
1082 0 : sched->print_buf_sz = 0UL;
1083 0 : print_all( sched, block );
1084 0 : FD_LOG_DEBUG(( "%s", sched->print_buf ));
1085 0 : log_block_txns( sched, block );
1086 :
1087 0 : subtree_abandon( sched, block );
1088 :
1089 : /* Reset the active block. */
1090 0 : FD_LOG_DEBUG(( "reset active_bank_idx %lu", sched->active_bank_idx ));
1091 0 : sched->active_bank_idx = ULONG_MAX;
1092 0 : sched->metrics->deactivate_abandoned_cnt++;
1093 0 : try_activate_block( sched );
1094 0 : }
1095 :
1096 : void
1097 0 : fd_sched_block_add_done( fd_sched_t * sched, ulong bank_idx, ulong parent_bank_idx, ulong slot ) {
1098 0 : FD_TEST( sched->canary==FD_SCHED_MAGIC );
1099 0 : FD_TEST( bank_idx<sched->block_cnt_max );
1100 :
1101 0 : fd_sched_block_t * block = block_pool_ele( sched, bank_idx );
1102 0 : add_block( sched, bank_idx, parent_bank_idx );
1103 0 : block->slot = slot;
1104 0 : block->txn_parsed_cnt = UINT_MAX;
1105 0 : block->txn_exec_done_cnt = UINT_MAX;
1106 0 : block->txn_sigverify_done_cnt = UINT_MAX;
1107 0 : block->txn_done_cnt = UINT_MAX;
1108 0 : block->fec_eos = 1;
1109 0 : block->block_start_signaled = 1;
1110 0 : block->block_end_signaled = 1;
1111 0 : block->block_start_done = 1;
1112 0 : block->block_end_done = 1;
1113 0 : if( FD_LIKELY( parent_bank_idx!=ULONG_MAX ) ) {
1114 0 : fd_sched_block_t * parent_block = block_pool_ele( sched, parent_bank_idx );
1115 0 : block->parent_slot = parent_block->slot;
1116 0 : }
1117 0 : if( FD_UNLIKELY( parent_bank_idx==ULONG_MAX ) ) {
1118 : /* Assumes that a NULL parent implies the snapshot slot. */
1119 0 : block->parent_slot = ULONG_MAX;
1120 0 : block->rooted = 1;
1121 0 : sched->root_idx = bank_idx;
1122 0 : }
1123 0 : }
1124 :
1125 : void
1126 0 : fd_sched_advance_root( fd_sched_t * sched, ulong root_idx ) {
1127 0 : FD_TEST( sched->canary==FD_SCHED_MAGIC );
1128 0 : FD_TEST( root_idx<sched->block_cnt_max );
1129 0 : FD_TEST( sched->root_idx<sched->block_cnt_max );
1130 :
1131 0 : fd_sched_block_t * new_root = block_pool_ele( sched, root_idx );
1132 0 : fd_sched_block_t * old_root = block_pool_ele( sched, sched->root_idx );
1133 0 : if( FD_UNLIKELY( !old_root->rooted ) ) {
1134 0 : FD_LOG_CRIT(( "invariant violation: old_root is not rooted, slot %lu, parent slot %lu",
1135 0 : old_root->slot, old_root->parent_slot ));
1136 0 : }
1137 :
1138 : /* Early exit if the new root is the same as the old root. */
1139 0 : if( FD_UNLIKELY( root_idx==sched->root_idx ) ) {
1140 0 : FD_LOG_INFO(( "new root is the same as the old root, slot %lu, parent slot %lu",
1141 0 : new_root->slot, new_root->parent_slot ));
1142 0 : return;
1143 0 : }
1144 :
1145 0 : fd_sched_block_t * head = old_root;
1146 0 : head->parent_idx = ULONG_MAX;
1147 0 : fd_sched_block_t * tail = head;
1148 :
1149 0 : while( head ) {
1150 0 : FD_TEST( head->in_sched );
1151 0 : head->in_sched = 0;
1152 :
1153 0 : sched->print_buf_sz = 0UL;
1154 0 : print_block_metrics( sched, head );
1155 0 : FD_LOG_DEBUG(( "%s", sched->print_buf ));
1156 :
1157 0 : ulong child_idx = head->child_idx;
1158 0 : while( child_idx!=ULONG_MAX ) {
1159 0 : fd_sched_block_t * child = block_pool_ele( sched, child_idx );
1160 : /* Add children to be visited. We abuse the parent_idx field to
1161 : link up the next block to visit. */
1162 0 : if( child!=new_root ) {
1163 0 : tail->parent_idx = child_idx;
1164 0 : tail = child;
1165 0 : tail->parent_idx = ULONG_MAX;
1166 0 : }
1167 0 : child_idx = child->sibling_idx;
1168 0 : }
1169 :
1170 : /* Prune the current block. We will never publish halfway into a
1171 : staging lane, because anything on the rooted fork should have
1172 : finished replaying gracefully and be out of the dispatcher. In
1173 : fact, anything that we are publishing away should be out of the
1174 : dispatcher at this point. And there should be no more in-flight
1175 : transactions. */
1176 0 : if( FD_UNLIKELY( block_is_in_flight( head ) ) ) {
1177 0 : FD_LOG_CRIT(( "invariant violation: block has transactions in flight (%u exec %u sigverify), slot %lu, parent slot %lu",
1178 0 : head->txn_exec_in_flight_cnt, head->txn_sigverify_in_flight_cnt, head->slot, head->parent_slot ));
1179 0 : }
1180 0 : if( FD_UNLIKELY( head->in_rdisp ) ) {
1181 : /* We should have removed it from the dispatcher when we were
1182 : notified of the new root, or when in-flight transactions were
1183 : drained. */
1184 0 : FD_LOG_CRIT(( "invariant violation: block is in the dispatcher, slot %lu, parent slot %lu",
1185 0 : head->slot, head->parent_slot ));
1186 0 : }
1187 0 : sched->block_pool_popcnt--;
1188 0 : fd_sched_block_t * next = block_pool_ele( sched, head->parent_idx );
1189 0 : head = next;
1190 0 : }
1191 :
1192 0 : new_root->parent_idx = ULONG_MAX;
1193 0 : sched->root_idx = root_idx;
1194 0 : }
1195 :
1196 : void
1197 0 : fd_sched_root_notify( fd_sched_t * sched, ulong root_idx ) {
1198 0 : FD_TEST( sched->canary==FD_SCHED_MAGIC );
1199 0 : FD_TEST( root_idx<sched->block_cnt_max );
1200 0 : FD_TEST( sched->root_idx<sched->block_cnt_max );
1201 :
1202 0 : fd_sched_block_t * block = block_pool_ele( sched, root_idx );
1203 0 : fd_sched_block_t * old_root = block_pool_ele( sched, sched->root_idx );
1204 0 : if( FD_UNLIKELY( !old_root->rooted ) ) {
1205 0 : FD_LOG_CRIT(( "invariant violation: old_root is not rooted, slot %lu, parent slot %lu",
1206 0 : old_root->slot, old_root->parent_slot ));
1207 0 : }
1208 :
1209 : /* Early exit if the new root is the same as the old root. */
1210 0 : if( FD_UNLIKELY( root_idx==sched->root_idx ) ) {
1211 0 : FD_LOG_INFO(( "new root is the same as the old root, slot %lu, parent slot %lu",
1212 0 : block->slot, block->parent_slot ));
1213 0 : return;
1214 0 : }
1215 :
1216 : /* Mark every node from the new root up through its parents to the
1217 : old root as being rooted. */
1218 0 : fd_sched_block_t * curr = block;
1219 0 : fd_sched_block_t * prev = NULL;
1220 0 : while( curr ) {
1221 0 : if( FD_UNLIKELY( !block_is_done( curr ) ) ) {
1222 0 : FD_LOG_CRIT(( "invariant violation: rooting a block that is not done, slot %lu, parent slot %lu",
1223 0 : curr->slot, curr->parent_slot ));
1224 0 : }
1225 0 : if( FD_UNLIKELY( curr->dying ) ) {
1226 0 : FD_LOG_CRIT(( "invariant violation: rooting a block that is dying, slot %lu, parent slot %lu",
1227 0 : curr->slot, curr->parent_slot ));
1228 0 : }
1229 0 : if( FD_UNLIKELY( curr->staged ) ) {
1230 0 : FD_LOG_CRIT(( "invariant violation: rooting a block that is staged, slot %lu, parent slot %lu",
1231 0 : curr->slot, curr->parent_slot ));
1232 0 : }
1233 0 : if( FD_UNLIKELY( curr->in_rdisp ) ) {
1234 0 : FD_LOG_CRIT(( "invariant violation: rooting a block that is in the dispatcher, slot %lu, parent slot %lu",
1235 0 : curr->slot, curr->parent_slot ));
1236 0 : }
1237 0 : curr->rooted = 1;
1238 0 : prev = curr;
1239 0 : curr = block_pool_ele( sched, curr->parent_idx );
1240 0 : }
1241 :
1242 : /* If we didn't reach the old root, the new root is not a descendant. */
1243 0 : if( FD_UNLIKELY( prev!=old_root ) ) {
1244 0 : FD_LOG_CRIT(( "invariant violation: new root is not a descendant of old root, new root slot %lu, parent slot %lu, old root slot %lu, parent slot %lu",
1245 0 : block->slot, block->parent_slot, old_root->slot, old_root->parent_slot ));
1246 0 : }
1247 :
1248 0 : ulong old_active_bank_idx = sched->active_bank_idx;
1249 :
1250 : /* Now traverse from old root towards new root, and abandon all
1251 : minority forks. */
1252 0 : curr = old_root;
1253 0 : while( curr && curr->rooted && curr!=block ) { /* curr!=block to avoid abandoning good forks. */
1254 0 : fd_sched_block_t * rooted_child_block = NULL;
1255 0 : ulong child_idx = curr->child_idx;
1256 0 : while( child_idx!=ULONG_MAX ) {
1257 0 : fd_sched_block_t * child = block_pool_ele( sched, child_idx );
1258 0 : if( child->rooted ) {
1259 0 : rooted_child_block = child;
1260 0 : } else {
1261 : /* This is a minority fork. */
1262 0 : FD_LOG_DEBUG(( "abandoning minority fork on block %lu", child->slot ));
1263 0 : subtree_abandon( sched, child );
1264 0 : }
1265 0 : child_idx = child->sibling_idx;
1266 0 : }
1267 0 : curr = rooted_child_block;
1268 0 : }
1269 :
1270 : /* If the active block got abandoned, we need to reset it. */
1271 0 : if( sched->active_bank_idx==ULONG_MAX ) {
1272 0 : sched->metrics->deactivate_pruned_cnt += fd_uint_if( old_active_bank_idx!=ULONG_MAX, 1U, 0U );
1273 0 : try_activate_block( sched );
1274 0 : }
1275 0 : }
1276 :
1277 : fd_txn_p_t *
1278 0 : fd_sched_get_txn( fd_sched_t * sched, ulong txn_idx ) {
1279 0 : FD_TEST( sched->canary==FD_SCHED_MAGIC );
1280 0 : if( FD_UNLIKELY( txn_idx>=FD_SCHED_MAX_DEPTH ) ) {
1281 0 : return NULL;
1282 0 : }
1283 0 : return sched->txn_pool+txn_idx;
1284 0 : }
1285 :
1286 : fd_hash_t *
1287 0 : fd_sched_get_poh( fd_sched_t * sched, ulong bank_idx ) {
1288 0 : FD_TEST( sched->canary==FD_SCHED_MAGIC );
1289 0 : FD_TEST( bank_idx<sched->block_cnt_max );
1290 0 : fd_sched_block_t * block = block_pool_ele( sched, bank_idx );
1291 0 : return &block->poh;
1292 0 : }
1293 :
1294 : uint
1295 0 : fd_sched_get_shred_cnt( fd_sched_t * sched, ulong bank_idx ) {
1296 0 : FD_TEST( sched->canary==FD_SCHED_MAGIC );
1297 0 : FD_TEST( bank_idx<sched->block_cnt_max );
1298 0 : fd_sched_block_t * block = block_pool_ele( sched, bank_idx );
1299 0 : return block->shred_cnt;
1300 0 : }
1301 :
1302 : char *
1303 0 : fd_sched_get_state_cstr( fd_sched_t * sched ) {
1304 0 : sched->print_buf_sz = 0UL;
1305 0 : print_metrics( sched );
1306 0 : print_sched( sched );
1307 0 : return sched->print_buf;
1308 0 : }
1309 :
1310 0 : void * fd_sched_leave ( fd_sched_t * sched ) { return sched; }
1311 0 : void * fd_sched_delete( void * mem ) { return mem; }
1312 :
1313 :
1314 : /* Internal helpers. */
1315 :
1316 : static void
1317 : add_block( fd_sched_t * sched,
1318 : ulong bank_idx,
1319 0 : ulong parent_bank_idx ) {
1320 0 : fd_sched_block_t * block = block_pool_ele( sched, bank_idx );
1321 0 : FD_TEST( !block->in_sched );
1322 0 : sched->block_pool_popcnt++;
1323 :
1324 0 : block->txn_parsed_cnt = 0U;
1325 0 : block->txn_exec_in_flight_cnt = 0U;
1326 0 : block->txn_exec_done_cnt = 0U;
1327 0 : block->txn_sigverify_in_flight_cnt = 0U;
1328 0 : block->txn_sigverify_done_cnt = 0U;
1329 0 : block->txn_done_cnt = 0U;
1330 0 : block->txn_pool_max_popcnt = FD_SCHED_MAX_DEPTH-sched->txn_pool_free_cnt;
1331 0 : block->block_pool_max_popcnt = sched->block_pool_popcnt;
1332 0 : block->shred_cnt = 0U;
1333 0 : block->fec_cnt = 0U;
1334 :
1335 0 : block->mblks_rem = 0UL;
1336 0 : block->txns_rem = 0UL;
1337 0 : block->fec_buf_sz = 0U;
1338 0 : block->fec_buf_boff = 0U;
1339 0 : block->fec_buf_soff = 0U;
1340 0 : block->fec_eob = 0;
1341 0 : block->fec_sob = 1;
1342 :
1343 0 : block->fec_eos = 0;
1344 0 : block->rooted = 0;
1345 0 : block->dying = 0;
1346 0 : block->in_sched = 1;
1347 0 : block->in_rdisp = 0;
1348 0 : block->block_start_signaled = 0;
1349 0 : block->block_end_signaled = 0;
1350 0 : block->block_start_done = 0;
1351 0 : block->block_end_done = 0;
1352 0 : block->staged = 0;
1353 :
1354 0 : block->luf_depth = 0UL;
1355 :
1356 : /* New leaf node, no child, no sibling. */
1357 0 : block->child_idx = ULONG_MAX;
1358 0 : block->sibling_idx = ULONG_MAX;
1359 0 : block->parent_idx = ULONG_MAX;
1360 :
1361 0 : if( FD_UNLIKELY( parent_bank_idx==ULONG_MAX ) ) {
1362 0 : return;
1363 0 : }
1364 :
1365 : /* node->parent link */
1366 0 : fd_sched_block_t * parent_block = block_pool_ele( sched, parent_bank_idx );
1367 0 : block->parent_idx = parent_bank_idx;
1368 :
1369 : /* parent->node and sibling->node links */
1370 0 : ulong child_idx = bank_idx;
1371 0 : if( FD_LIKELY( parent_block->child_idx==ULONG_MAX ) ) { /* Optimize for no forking. */
1372 0 : parent_block->child_idx = child_idx;
1373 0 : } else {
1374 0 : fd_sched_block_t * curr_block = block_pool_ele( sched, parent_block->child_idx );
1375 0 : while( curr_block->sibling_idx!=ULONG_MAX ) {
1376 0 : curr_block = block_pool_ele( sched, curr_block->sibling_idx );
1377 0 : }
1378 0 : curr_block->sibling_idx = child_idx;
1379 0 : }
1380 :
1381 0 : if( FD_UNLIKELY( parent_block->dying ) ) {
1382 0 : block->dying = 1;
1383 0 : }
1384 0 : }
1385 :
1386 0 : #define CHECK( cond ) do { \
1387 0 : if( FD_UNLIKELY( !(cond) ) ) { \
1388 0 : return FD_SCHED_PARSER_AGAIN_LATER; \
1389 0 : } \
1390 0 : } while( 0 )
1391 :
1392 : /* CHECK that it is safe to read at least n more bytes. */
1393 0 : #define CHECK_LEFT( n ) CHECK( (n)<=(block->fec_buf_sz-block->fec_buf_soff) )
1394 :
1395 : /* Consume as much as possible from the buffer. By the end of this
1396 : function, we will either have residual data that is unparseable only
1397 : because it is a batch that straddles FEC set boundaries, or we will
1398 : have reached the end of a batch. In the former case, any remaining
1399 : bytes should be concatenated with the next FEC set for further
1400 : parsing. In the latter case, any remaining bytes should be thrown
1401 : away. */
1402 : FD_WARN_UNUSED static int
1403 0 : fd_sched_parse( fd_sched_t * sched, fd_sched_block_t * block, fd_sched_alut_ctx_t * alut_ctx ) {
1404 0 : while( 1 ) {
1405 0 : while( block->txns_rem>0UL ) {
1406 0 : int err;
1407 0 : if( FD_UNLIKELY( (err=fd_sched_parse_txn( sched, block, alut_ctx ))!=FD_SCHED_PARSER_OK ) ) {
1408 0 : return err;
1409 0 : }
1410 0 : }
1411 0 : if( block->txns_rem==0UL && block->mblks_rem>0UL ) {
1412 0 : CHECK_LEFT( sizeof(fd_microblock_hdr_t) );
1413 0 : fd_microblock_hdr_t * hdr = (fd_microblock_hdr_t *)fd_type_pun( block->fec_buf+block->fec_buf_soff );
1414 0 : block->fec_buf_soff += (uint)sizeof(fd_microblock_hdr_t);
1415 :
1416 0 : memcpy( block->poh.hash, hdr->hash, sizeof(block->poh.hash) );
1417 0 : block->txns_rem = hdr->txn_cnt;
1418 0 : block->mblks_rem--;
1419 0 : continue;
1420 0 : }
1421 0 : if( block->txns_rem==0UL && block->mblks_rem==0UL && block->fec_sob ) {
1422 0 : CHECK_LEFT( sizeof(ulong) );
1423 0 : FD_TEST( block->fec_buf_soff==0U );
1424 0 : block->mblks_rem = FD_LOAD( ulong, block->fec_buf );
1425 0 : block->fec_buf_soff += (uint)sizeof(ulong);
1426 : /* FIXME what happens if someone sends us mblks_rem==0UL here? */
1427 :
1428 0 : block->fec_sob = 0;
1429 0 : continue;
1430 0 : }
1431 0 : if( block->txns_rem==0UL && block->mblks_rem==0UL ) {
1432 0 : break;
1433 0 : }
1434 0 : }
1435 0 : if( block->fec_eob ) {
1436 : /* Ignore trailing bytes at the end of a batch. */
1437 0 : sched->metrics->bytes_ingested_unparsed_cnt += block->fec_buf_sz-block->fec_buf_soff;
1438 0 : block->fec_buf_boff += block->fec_buf_sz;
1439 0 : block->fec_buf_soff = 0U;
1440 0 : block->fec_buf_sz = 0U;
1441 0 : block->fec_sob = 1;
1442 0 : block->fec_eob = 0;
1443 0 : }
1444 0 : return FD_SCHED_PARSER_OK;
1445 0 : }
1446 :
1447 : FD_WARN_UNUSED static int
1448 0 : fd_sched_parse_txn( fd_sched_t * sched, fd_sched_block_t * block, fd_sched_alut_ctx_t * alut_ctx ) {
1449 0 : fd_txn_t * txn = fd_type_pun( block->txn );
1450 :
1451 : /* FIXME: For the replay pipeline, we allow up to 128 instructions per
1452 : transaction. Note that we are not concomitantly bumping the size
1453 : of fd_txn_t. We allow this because transactions like that do get
1454 : packed by other validators, so we have to replay them. Those
1455 : transactions will eventually fail in the runtime, which imposes a
1456 : limit of 64 instructions, but unfortunately they are not tossed out
1457 : at parse time and they land on chain. static_instruction_limit is
1458 : going to enforece this limit at parse time, and transactions like
1459 : that would not land on chain. Then this short term change should
1460 : be rolled back. */
1461 0 : ulong pay_sz = 0UL;
1462 0 : ulong txn_sz = fd_txn_parse_core( block->fec_buf+block->fec_buf_soff,
1463 0 : fd_ulong_min( FD_TXN_MTU, block->fec_buf_sz-block->fec_buf_soff ),
1464 0 : txn,
1465 0 : NULL,
1466 0 : &pay_sz,
1467 0 : FD_TXN_INSTR_MAX*2UL );
1468 :
1469 0 : if( FD_UNLIKELY( !pay_sz || !txn_sz ) ) {
1470 : /* Can't parse out a full transaction. */
1471 0 : return FD_SCHED_PARSER_AGAIN_LATER;
1472 0 : }
1473 :
1474 0 : if( FD_UNLIKELY( block->txn_parsed_cnt>=FD_MAX_TXN_PER_SLOT ) ) {
1475 : /* The block contains more transactions than a valid block would.
1476 : Mark the block dead instead of keep processing it. */
1477 0 : return FD_SCHED_PARSER_BAD_BLOCK;
1478 0 : }
1479 :
1480 : /* Try to expand ALUTs. */
1481 0 : int has_aluts = txn->transaction_version==FD_TXN_V0 && txn->addr_table_adtl_cnt>0;
1482 0 : int serializing = 0;
1483 0 : if( has_aluts ) {
1484 0 : fd_funk_t * funk = fd_accdb_user_v1_funk( alut_ctx->accdb );
1485 0 : uchar __attribute__((aligned(FD_SLOT_HASHES_GLOBAL_ALIGN))) slot_hashes_mem[ FD_SYSVAR_SLOT_HASHES_FOOTPRINT ];
1486 0 : fd_slot_hashes_global_t const * slot_hashes_global = fd_sysvar_slot_hashes_read( funk, alut_ctx->xid, slot_hashes_mem );
1487 0 : if( FD_LIKELY( slot_hashes_global ) ) {
1488 0 : fd_slot_hash_t * slot_hash = deq_fd_slot_hash_t_join( (uchar *)slot_hashes_global + slot_hashes_global->hashes_offset );
1489 0 : serializing = !!fd_runtime_load_txn_address_lookup_tables( txn, block->fec_buf+block->fec_buf_soff, funk, alut_ctx->xid, alut_ctx->els, slot_hash, block->aluts );
1490 0 : sched->metrics->alut_success_cnt += (uint)!serializing;
1491 0 : } else {
1492 0 : serializing = 1;
1493 0 : }
1494 0 : }
1495 :
1496 0 : ulong bank_idx = (ulong)(block-sched->block_pool);
1497 0 : ulong txn_idx = fd_rdisp_add_txn( sched->rdisp, bank_idx, txn, block->fec_buf+block->fec_buf_soff, serializing ? NULL : block->aluts, serializing );
1498 0 : FD_TEST( txn_idx!=0UL );
1499 0 : sched->metrics->txn_parsed_cnt++;
1500 0 : sched->metrics->alut_serializing_cnt += (uint)serializing;
1501 0 : sched->txn_pool_free_cnt--;
1502 0 : fd_txn_p_t * txn_p = sched->txn_pool + txn_idx;
1503 0 : txn_p->payload_sz = pay_sz;
1504 :
1505 0 : txn_p->start_shred_idx = (ushort)fd_sort_up_uint_split( block->shred_blk_offs, block->shred_cnt, block->fec_buf_boff+block->fec_buf_soff );
1506 0 : txn_p->start_shred_idx = fd_ushort_if( txn_p->start_shred_idx>0U, (ushort)(txn_p->start_shred_idx-1U), txn_p->start_shred_idx );
1507 0 : txn_p->end_shred_idx = (ushort)fd_sort_up_uint_split( block->shred_blk_offs, block->shred_cnt, block->fec_buf_boff+block->fec_buf_soff+(uint)pay_sz );
1508 :
1509 0 : fd_memcpy( txn_p->payload, block->fec_buf+block->fec_buf_soff, pay_sz );
1510 0 : fd_memcpy( TXN(txn_p), txn, txn_sz );
1511 0 : txn_bitset_remove( sched->exec_done_set, txn_idx );
1512 0 : txn_bitset_remove( sched->sigverify_done_set, txn_idx );
1513 0 : sched->txn_idx_to_parse_idx[ txn_idx ] = block->txn_parsed_cnt;
1514 0 : memcpy( block->txn_sigs[ block->txn_parsed_cnt ], fd_txn_get_signatures( TXN(txn_p), txn_p->payload ), FD_TXN_SIGNATURE_SZ );
1515 0 : block->txn_idx[ block->txn_parsed_cnt ] = txn_idx;
1516 0 : block->txn_disp_ticks[ block->txn_parsed_cnt ] = LONG_MAX;
1517 0 : block->txn_done_ticks[ block->txn_parsed_cnt ] = LONG_MAX;
1518 0 : block->fec_buf_soff += (uint)pay_sz;
1519 0 : block->txn_parsed_cnt++;
1520 : #if FD_SCHED_SKIP_SIGVERIFY
1521 : txn_bitset_insert( sched->sigverify_done_set, txn_idx );
1522 : block->txn_sigverify_done_cnt++;
1523 : #endif
1524 0 : block->txns_rem--;
1525 0 : return FD_SCHED_PARSER_OK;
1526 0 : }
1527 :
1528 : #undef CHECK
1529 : #undef CHECK_LEFT
1530 :
1531 : static void
1532 0 : try_activate_block( fd_sched_t * sched ) {
1533 :
1534 : /* See if there are any allocated staging lanes that we can activate
1535 : for scheduling ... */
1536 0 : ulong staged_bitset = sched->staged_bitset;
1537 0 : while( staged_bitset ) {
1538 0 : int lane_idx = fd_ulong_find_lsb( staged_bitset );
1539 0 : staged_bitset = fd_ulong_pop_lsb( staged_bitset );
1540 :
1541 0 : ulong head_idx = sched->staged_head_bank_idx[ lane_idx ];
1542 0 : fd_sched_block_t * head_block = block_pool_ele( sched, head_idx );
1543 0 : fd_sched_block_t * parent_block = block_pool_ele( sched, head_block->parent_idx );
1544 0 : if( FD_UNLIKELY( parent_block->dying ) ) {
1545 : /* Invariant: no child of a dying block should be staged. */
1546 0 : FD_LOG_CRIT(( "invariant violation: staged_head_bank_idx %lu, slot %lu, parent slot %lu on lane %d has parent_block->dying set, slot %lu, parent slot %lu",
1547 0 : head_idx, head_block->slot, head_block->parent_slot, lane_idx, parent_block->slot, parent_block->parent_slot ));
1548 0 : }
1549 : //FIXME: restore this invariant check when we have immediate demotion of dying blocks
1550 : // if( FD_UNLIKELY( head_block->dying ) ) {
1551 : // /* Invariant: no dying block should be staged. */
1552 : // FD_LOG_CRIT(( "invariant violation: staged_head_bank_idx %lu, slot %lu, prime %lu on lane %u has head_block->dying set",
1553 : // head_idx, (ulong)head_block->block_id.slot, (ulong)head_block->block_id.prime, lane_idx ));
1554 : // }
1555 0 : if( block_is_done( parent_block ) && block_is_activatable( head_block ) ) {
1556 : /* ... Yes, on this staging lane the parent block is done. So we
1557 : can switch to the staged child. */
1558 0 : sched->active_bank_idx = head_idx;
1559 0 : sched->metrics->lane_switch_cnt++;
1560 0 : return;
1561 0 : }
1562 0 : }
1563 :
1564 : /* ... No, promote unstaged blocks. */
1565 0 : ulong root_idx = sched->root_idx;
1566 0 : if( FD_UNLIKELY( root_idx==ULONG_MAX ) ) {
1567 0 : FD_LOG_CRIT(( "invariant violation: root_idx==ULONG_MAX indicating fd_sched is unintialized" ));
1568 0 : }
1569 : /* Find and stage the longest stageable unstaged fork. This is a
1570 : policy decision. */
1571 0 : ulong depth = compute_longest_unstaged_fork( sched, root_idx );
1572 0 : if( FD_LIKELY( depth>0UL ) ) {
1573 0 : if( FD_UNLIKELY( sched->staged_bitset==fd_ulong_mask_lsb( FD_SCHED_MAX_STAGING_LANES ) ) ) {
1574 : /* No more staging lanes available. All of them are occupied by
1575 : slow squatters. Demote one of them. */
1576 : //FIXME implement this, note that only empty blocks can be
1577 : //demoted, and so blocks with in-flight transactions, including
1578 : //dying in-flight blocks, shouldn't be demoted
1579 0 : FD_LOG_CRIT(( "unimplemented" ));
1580 0 : sched->metrics->lane_demoted_cnt++;
1581 : // sched->metrics->block_demoted_cnt++; for every demoted block
1582 0 : }
1583 0 : FD_TEST( sched->staged_bitset!=fd_ulong_mask_lsb( FD_SCHED_MAX_STAGING_LANES ) );
1584 0 : int lane_idx = fd_ulong_find_lsb( ~sched->staged_bitset );
1585 0 : if( FD_UNLIKELY( lane_idx>=(int)FD_SCHED_MAX_STAGING_LANES ) ) {
1586 0 : FD_LOG_CRIT(( "invariant violation: lane_idx %d, sched->staged_bitset %lx",
1587 0 : lane_idx, sched->staged_bitset ));
1588 0 : }
1589 0 : ulong head_bank_idx = stage_longest_unstaged_fork( sched, root_idx, lane_idx );
1590 0 : if( FD_UNLIKELY( head_bank_idx==ULONG_MAX ) ) {
1591 : /* We found a promotable fork depth>0. This should not happen. */
1592 0 : FD_LOG_CRIT(( "invariant violation: head_bank_idx==ULONG_MAX" ));
1593 0 : }
1594 : /* We don't bother with promotion unless the block is immediately
1595 : dispatchable. So it's okay to set the active block here. */
1596 0 : sched->active_bank_idx = head_bank_idx;
1597 0 : return;
1598 0 : }
1599 : /* No unstaged blocks to promote. So we're done. Yay. */
1600 0 : }
1601 :
1602 : static void
1603 0 : check_or_set_active_block( fd_sched_t * sched ) {
1604 0 : if( FD_UNLIKELY( sched->active_bank_idx==ULONG_MAX ) ) {
1605 0 : try_activate_block( sched );
1606 0 : } else {
1607 0 : fd_sched_block_t * active_block = block_pool_ele( sched, sched->active_bank_idx );
1608 0 : if( FD_UNLIKELY( block_should_deactivate( active_block ) ) ) {
1609 0 : sched->print_buf_sz = 0UL;
1610 0 : print_all( sched, active_block );
1611 0 : FD_LOG_NOTICE(( "%s", sched->print_buf ));
1612 0 : FD_LOG_CRIT(( "invariant violation: should have been deactivated" ));
1613 0 : }
1614 0 : }
1615 0 : }
1616 :
1617 : /* It's safe to call this function more than once on the same block. */
1618 : static void
1619 0 : subtree_abandon( fd_sched_t * sched, fd_sched_block_t * block ) {
1620 0 : if( FD_UNLIKELY( block->rooted ) ) {
1621 0 : FD_LOG_CRIT(( "invariant violation: rooted block should not be abandoned, slot %lu, parent slot %lu",
1622 0 : block->slot, block->parent_slot ));
1623 0 : }
1624 : /* All minority fork nodes pass through this function eventually. So
1625 : this is a good point to check per-node invariants for minority
1626 : forks. */
1627 0 : if( FD_UNLIKELY( block->staged && !block->in_rdisp ) ) {
1628 0 : FD_LOG_CRIT(( "invariant violation: staged block is not in the dispatcher, slot %lu, parent slot %lu",
1629 0 : block->slot, block->parent_slot ));
1630 0 : }
1631 :
1632 : /* Setting the flag is non-optional and can happen more than once. */
1633 0 : block->dying = 1;
1634 :
1635 : /* Removal from dispatcher should only happen once. */
1636 0 : if( block->in_rdisp ) {
1637 0 : fd_sched_block_t * parent = block_pool_ele( sched, block->parent_idx );
1638 0 : if( FD_UNLIKELY( !parent ) ) {
1639 : /* Only the root has no parent. Abandon should never be called on
1640 : the root. So any block we are trying to abandon should have a
1641 : parent. */
1642 0 : FD_LOG_CRIT(( "invariant violation: parent not found slot %lu, parent slot %lu",
1643 0 : block->slot, block->parent_slot ));
1644 0 : }
1645 :
1646 : /* The dispatcher expects blocks to be abandoned in the same order
1647 : that they were added on each lane. There are no requirements on
1648 : the order of abandoning if two blocks are not on the same lane,
1649 : or if a block is unstaged. This means that in general we
1650 : shouldn't abandon a child block if the parent hasn't been
1651 : abandoned yet, if and only if they are on the same lane. So wait
1652 : until we can abandon the parent, and then descend down the fork
1653 : tree to ensure orderly abandoning. */
1654 0 : int in_order = !parent->in_rdisp || /* parent is not in the dispatcher */
1655 0 : !parent->staged || /* parent is in the dispatcher but not staged */
1656 0 : !block->staged || /* parent is in the dispatcher and staged but this block is unstaged */
1657 0 : block->staging_lane!=parent->staging_lane; /* this block is on a different staging lane than its parent */
1658 :
1659 0 : if( FD_UNLIKELY( in_order && block->staged && sched->active_bank_idx==sched->staged_head_bank_idx[ block->staging_lane ] && sched->active_bank_idx!=ULONG_MAX ) ) {
1660 0 : FD_TEST( block_pool_ele( sched, sched->active_bank_idx )==block );
1661 0 : FD_LOG_DEBUG(( "reset active_bank_idx %lu", sched->active_bank_idx ));
1662 0 : sched->active_bank_idx = ULONG_MAX;
1663 0 : }
1664 :
1665 : /* We inform the dispatcher of an abandon only when there are no
1666 : more in-flight transactions. Otherwise, if the dispatcher
1667 : recycles the same txn_id that was just abandoned, and we receive
1668 : completion of an in-flight transaction whose txn_id was just
1669 : recycled. */
1670 : // FIXME The recycling might be fine now that we no longer use
1671 : // txn_id to index into anything. We might be able to just drop
1672 : // txn_id on abandoned blocks.
1673 0 : int abandon = in_order && block->txn_exec_in_flight_cnt==0 && block->txn_sigverify_in_flight_cnt==0;
1674 :
1675 0 : if( abandon ) {
1676 0 : block->in_rdisp = 0;
1677 0 : fd_rdisp_abandon_block( sched->rdisp, (ulong)(block-sched->block_pool) );
1678 0 : sched->txn_pool_free_cnt += block->txn_parsed_cnt-block->txn_done_cnt; /* in_flight_cnt==0 */
1679 0 : sched->metrics->block_abandoned_cnt++;
1680 0 : sched->metrics->txn_abandoned_parsed_cnt += block->txn_parsed_cnt;
1681 0 : sched->metrics->txn_abandoned_exec_done_cnt += block->txn_exec_done_cnt;
1682 0 : sched->metrics->txn_abandoned_done_cnt += block->txn_done_cnt;
1683 :
1684 : /* Now release the staging lane. */
1685 : //FIXME when demote supports non-empty blocks, we should demote
1686 : //the block from the lane unconditionally and immediately,
1687 : //regardles of whether it's safe to abandon or not. So a block
1688 : //would go immediately from staged to unstaged and eventually to
1689 : //abandoned.
1690 0 : if( FD_LIKELY( block->staged ) ) {
1691 0 : block->staged = 0;
1692 0 : sched->staged_bitset = fd_ulong_clear_bit( sched->staged_bitset, (int)block->staging_lane );
1693 0 : sched->staged_head_bank_idx[ block->staging_lane ] = ULONG_MAX;
1694 0 : }
1695 0 : }
1696 0 : }
1697 :
1698 : /* Abandon the entire fork chaining off of this block. */
1699 0 : ulong child_idx = block->child_idx;
1700 0 : while( child_idx!=ULONG_MAX ) {
1701 0 : fd_sched_block_t * child = block_pool_ele( sched, child_idx );
1702 0 : subtree_abandon( sched, child );
1703 0 : child_idx = child->sibling_idx;
1704 0 : }
1705 0 : }
1706 :
1707 : static void
1708 0 : maybe_switch_block( fd_sched_t * sched, ulong bank_idx ) {
1709 0 : fd_sched_block_t * block = block_pool_ele( sched, bank_idx );
1710 0 : if( FD_UNLIKELY( block_is_done( block ) ) ) {
1711 0 : block->in_rdisp = 0;
1712 0 : block->staged = 0;
1713 0 : fd_rdisp_remove_block( sched->rdisp, bank_idx );
1714 0 : sched->metrics->block_removed_cnt++;
1715 :
1716 : /* See if there is a child block down the same staging lane. This
1717 : is a policy decision to minimize fork churn. We could in theory
1718 : reevaluate staging lane allocation here and do promotion/demotion
1719 : as needed. */
1720 0 : ulong child_idx = block->child_idx;
1721 0 : while( child_idx!=ULONG_MAX ) {
1722 0 : fd_sched_block_t * child = block_pool_ele( sched, child_idx );
1723 0 : if( FD_LIKELY( child->staged && child->staging_lane==block->staging_lane ) ) {
1724 : /* There is a child block down the same staging lane ... */
1725 0 : if( FD_LIKELY( !child->dying ) ) {
1726 : /* ... and the child isn't dead */
1727 0 : if( FD_UNLIKELY( !block_is_activatable( child ) ) ) {
1728 : /* ... but the child is not activatable, likely because
1729 : there are no transactions available yet. */
1730 0 : FD_LOG_DEBUG(( "reset active_bank_idx %lu", sched->active_bank_idx ));
1731 0 : sched->active_bank_idx = ULONG_MAX;
1732 0 : sched->metrics->deactivate_no_txn_cnt++;
1733 0 : try_activate_block( sched );
1734 0 : return;
1735 0 : }
1736 : /* ... and it's immediately dispatchable, so switch the active
1737 : block to it, and have the child inherit the head status of
1738 : the lane. This is the common case. */
1739 0 : sched->active_bank_idx = child_idx;
1740 0 : sched->staged_head_bank_idx[ block->staging_lane ] = child_idx;
1741 0 : if( FD_UNLIKELY( !fd_ulong_extract_bit( sched->staged_bitset, (int)block->staging_lane ) ) ) {
1742 0 : FD_LOG_CRIT(( "invariant violation: staged_bitset 0x%lx bit %lu is not set, slot %lu, parent slot %lu, child slot %lu, parent slot %lu",
1743 0 : sched->staged_bitset, block->staging_lane, block->slot, block->parent_slot, child->slot, child->parent_slot ));
1744 0 : }
1745 0 : return;
1746 0 : } else {
1747 : /* ... but the child block is considered dead, likely because
1748 : the parser considers it invalid. */
1749 0 : FD_LOG_INFO(( "child block %lu is already dead", child->slot ));
1750 0 : subtree_abandon( sched, child );
1751 0 : break;
1752 0 : }
1753 0 : }
1754 0 : child_idx = child->sibling_idx;
1755 0 : }
1756 : /* There isn't a child block down the same staging lane. This is
1757 : the last block in the staging lane. Release the staging lane. */
1758 0 : sched->staged_bitset = fd_ulong_clear_bit( sched->staged_bitset, (int)block->staging_lane );
1759 0 : sched->staged_head_bank_idx[ block->staging_lane ] = ULONG_MAX;
1760 :
1761 : /* Reset the active block. */
1762 0 : FD_LOG_DEBUG(( "reset active_bank_idx %lu", sched->active_bank_idx ));
1763 0 : sched->active_bank_idx = ULONG_MAX;
1764 0 : sched->metrics->deactivate_no_child_cnt++;
1765 0 : try_activate_block( sched );
1766 0 : } else if( block_should_deactivate( block ) ) {
1767 : /* We exhausted the active block, but it's not fully done yet. We
1768 : are just not getting FEC sets for it fast enough. This could
1769 : happen when the network path is congested, or when the leader
1770 : simply went down. Reset the active block. */
1771 0 : sched->active_bank_idx = ULONG_MAX;
1772 0 : sched->metrics->deactivate_no_txn_cnt++;
1773 0 : try_activate_block( sched );
1774 0 : }
1775 0 : }
1776 :
1777 : FD_FN_UNUSED static ulong
1778 0 : find_and_stage_longest_unstaged_fork( fd_sched_t * sched, int lane_idx ) {
1779 0 : ulong root_idx = sched->root_idx;
1780 0 :
1781 0 : if( FD_UNLIKELY( root_idx==ULONG_MAX ) ) {
1782 0 : FD_LOG_CRIT(( "invariant violation: root_idx==ULONG_MAX indicating fd_sched is unintialized" ));
1783 0 : }
1784 0 :
1785 0 : /* First pass: compute the longest unstaged fork depth for each node
1786 0 : in the fork tree. */
1787 0 : ulong depth = compute_longest_unstaged_fork( sched, root_idx );
1788 0 :
1789 0 : /* Second pass: stage blocks on the longest unstaged fork. */
1790 0 : ulong head_bank_idx = stage_longest_unstaged_fork( sched, root_idx, lane_idx );
1791 0 :
1792 0 : if( FD_UNLIKELY( (depth>0UL && head_bank_idx==ULONG_MAX) || (depth==0UL && head_bank_idx!=ULONG_MAX) ) ) {
1793 0 : FD_LOG_CRIT(( "invariant violation: depth %lu, head_bank_idx %lu",
1794 0 : depth, head_bank_idx ));
1795 0 : }
1796 0 :
1797 0 : return head_bank_idx;
1798 0 : }
1799 :
1800 : /* Returns length of the longest stageable unstaged fork, if there is
1801 : one, and 0 otherwise. */
1802 : static ulong
1803 0 : compute_longest_unstaged_fork( fd_sched_t * sched, ulong bank_idx ) {
1804 0 : if( FD_UNLIKELY( bank_idx==ULONG_MAX ) ) {
1805 0 : FD_LOG_CRIT(( "invariant violation: bank_idx==ULONG_MAX" ));
1806 0 : }
1807 :
1808 0 : fd_sched_block_t * block = block_pool_ele( sched, bank_idx );
1809 :
1810 0 : ulong max_child_depth = 0UL;
1811 0 : ulong child_idx = block->child_idx;
1812 0 : while( child_idx!=ULONG_MAX ) {
1813 0 : ulong child_depth = compute_longest_unstaged_fork( sched, child_idx );
1814 0 : if( child_depth > max_child_depth ) {
1815 0 : max_child_depth = child_depth;
1816 0 : }
1817 0 : fd_sched_block_t * child = block_pool_ele( sched, child_idx );
1818 0 : child_idx = child->sibling_idx;
1819 0 : }
1820 :
1821 0 : block->luf_depth = max_child_depth + fd_ulong_if( block_is_promotable( block ), 1UL, 0UL );
1822 0 : return block->luf_depth;
1823 0 : }
1824 :
1825 : static ulong
1826 0 : stage_longest_unstaged_fork_helper( fd_sched_t * sched, ulong bank_idx, int lane_idx ) {
1827 0 : if( FD_UNLIKELY( bank_idx==ULONG_MAX ) ) {
1828 0 : FD_LOG_CRIT(( "invariant violation: bank_idx==ULONG_MAX" ));
1829 0 : }
1830 :
1831 0 : fd_sched_block_t * block = block_pool_ele( sched, bank_idx );
1832 :
1833 0 : int stage_it = fd_int_if( block_is_promotable( block ), 1, 0 );
1834 0 : ulong rv = fd_ulong_if( stage_it, bank_idx, ULONG_MAX );
1835 0 : if( FD_LIKELY( stage_it ) ) {
1836 0 : block->staged = 1;
1837 0 : block->staging_lane = (ulong)lane_idx;
1838 0 : fd_rdisp_promote_block( sched->rdisp, bank_idx, block->staging_lane );
1839 0 : sched->metrics->block_promoted_cnt++;
1840 0 : }
1841 :
1842 : /* Base case: leaf node. */
1843 0 : if( block->child_idx==ULONG_MAX ) return rv;
1844 :
1845 0 : ulong max_depth = 0UL;
1846 0 : ulong best_child_idx = ULONG_MAX;
1847 0 : ulong child_idx = block->child_idx;
1848 0 : while( child_idx!=ULONG_MAX ) {
1849 0 : fd_sched_block_t * child = block_pool_ele( sched, child_idx );
1850 0 : if( child->luf_depth>max_depth ) {
1851 0 : max_depth = child->luf_depth;
1852 0 : best_child_idx = child_idx;
1853 0 : }
1854 0 : child_idx = child->sibling_idx;
1855 0 : }
1856 :
1857 : /* Recursively stage descendants. */
1858 0 : if( best_child_idx!=ULONG_MAX ) {
1859 0 : ulong head_bank_idx = stage_longest_unstaged_fork_helper( sched, best_child_idx, lane_idx );
1860 0 : rv = fd_ulong_if( rv!=ULONG_MAX, rv, head_bank_idx );
1861 0 : }
1862 :
1863 0 : return rv;
1864 0 : }
1865 :
1866 : /* Returns idx of head block of staged lane on success, idx_null
1867 : otherwise. */
1868 : static ulong
1869 0 : stage_longest_unstaged_fork( fd_sched_t * sched, ulong bank_idx, int lane_idx ) {
1870 0 : ulong head_bank_idx = stage_longest_unstaged_fork_helper( sched, bank_idx, lane_idx );
1871 0 : if( FD_LIKELY( head_bank_idx!=ULONG_MAX ) ) {
1872 0 : sched->metrics->lane_promoted_cnt++;
1873 0 : sched->staged_bitset = fd_ulong_set_bit( sched->staged_bitset, lane_idx );
1874 0 : sched->staged_head_bank_idx[ lane_idx ] = head_bank_idx;
1875 0 : }
1876 0 : return head_bank_idx;
1877 0 : }
|