Line data Source code
1 : #include "fd_sched.h"
2 : #include "../../flamenco/runtime/fd_runtime.h" /* for fd_runtime_load_txn_address_lookup_tables */
3 :
4 : #include "../../flamenco/runtime/sysvar/fd_sysvar_slot_hashes.h" /* for ALUTs */
5 :
6 :
7 : // TODO can the bounds be tighter?
8 : #define FD_SCHED_MAX_TXN_PER_BLOCK (FD_TXN_MAX_PER_SLOT)
9 0 : #define FD_SCHED_MAX_BLOCK_DEPTH (1024UL)
10 : #define FD_SCHED_MAX_NON_EMPTY_BLOCK_DEPTH (32UL)
11 0 : #define FD_SCHED_MAX_DEPTH (FD_RDISP_MAX_DEPTH>>2)
12 : #define FD_SCHED_MAX_STAGING_LANES_LOG (2)
13 : #define FD_SCHED_MAX_STAGING_LANES (1UL<<FD_SCHED_MAX_STAGING_LANES_LOG)
14 :
15 : /* We size the buffer to be able to hold residual data from the previous
16 : FEC set that only becomes parseable after the next FEC set is
17 : ingested, as well as the incoming FEC set. The largest minimally
18 : parseable unit of data is a transaction. So that much data may
19 : straddle FEC set boundaries. Other minimally parseable units of data
20 : include the microblock header and the microblock count within a
21 : batch. */
22 0 : #define FD_SCHED_MAX_PAYLOAD_PER_FEC (FD_STORE_DATA_MAX)
23 : #define FD_SCHED_MAX_FEC_BUF_SZ (FD_SCHED_MAX_PAYLOAD_PER_FEC+FD_TXN_MTU)
24 : FD_STATIC_ASSERT( FD_TXN_MTU>=sizeof(fd_microblock_hdr_t), resize buffer for residual data );
25 : FD_STATIC_ASSERT( FD_TXN_MTU>=sizeof(ulong), resize buffer for residual data );
26 :
27 0 : #define FD_SCHED_MAGIC (0xace8a79c181f89b6UL) /* echo -n "fd_sched_v0" | sha512sum | head -c 16 */
28 :
29 :
30 : /* Structs. */
31 :
32 : struct fd_sched_block {
33 : fd_sched_block_id_t block_id;
34 : ulong next; /* reserved for internal use by fd_pool, fd_map_chain */
35 : ulong parent_idx; /* index of the parent in the pool */
36 : ulong child_idx; /* index of the left-child in the pool */
37 : ulong sibling_idx; /* index of the right-sibling in the pool */
38 :
39 : /* Counters. */
40 : uint txn_parsed_cnt;
41 : /* txn_queued_cnt = txn_parsed_cnt-txn_in_flight_cnt-txn_done_cnt */
42 : uint txn_in_flight_cnt;
43 : uint txn_done_cnt;
44 : uint shred_cnt;
45 :
46 : /* Parser state. */
47 : uchar txn[ FD_TXN_MAX_SZ ] __attribute__((aligned(alignof(fd_txn_t))));
48 : fd_hash_t poh; /* latest PoH hash we've seen from the ingested FEC sets */
49 : ulong mblks_rem; /* number of microblocks remaining in the current batch */
50 : ulong txns_rem; /* number of transactions remaining in the current microblock */
51 : fd_acct_addr_t aluts[ 256 ]; /* resolve ALUT accounts into this buffer for more parallelism */
52 : uint fec_buf_sz; /* size of the fec_buf in bytes */
53 : uint fec_buf_soff; /* starting offset into fec_buf for unparsed transactions */
54 : uint fec_eob:1; /* FEC end-of-batch: set if the last FEC set in the batch is being
55 : ingested */
56 : uint fec_sob:1; /* FEC start-of-batch: set if the parser expects to be receiving a new
57 : batch */
58 :
59 : /* Block state. */
60 : uint fec_eos:1; /* FEC end-of-stream: set if the last FEC set in the block has been
61 : ingested */
62 : uint rooted:1; /* set if the block is rooted */
63 : uint dying:1; /* set if the block has been abandoned and no transactions should be
64 : scheduled from it */
65 : uint in_rdisp:1; /* set if the block is being tracked by the dispatcher, either as staged
66 : or unstaged */
67 : uint block_start_signaled:1; /* set if the start-of-block sentinel has been dispatched */
68 : uint block_end_signaled:1; /* set if the end-of-block sentinel has been dispatched */
69 : uint staged:1; /* set if the block is in a dispatcher staging lane; a staged block is
70 : tracked by the dispatcher */
71 : ulong staging_lane; /* ignored if staged==0 */
72 : ulong luf_depth; /* depth of longest unstaged fork starting from this node; only
73 : stageable unstaged descendants are counted */
74 : uchar fec_buf[ FD_SCHED_MAX_FEC_BUF_SZ ]; /* the previous FEC set could have some residual data that only becomes
75 : parseable after the next FEC set is ingested */
76 : };
77 : typedef struct fd_sched_block fd_sched_block_t;
78 :
79 : FD_STATIC_ASSERT( sizeof(fd_hash_t)==sizeof(((fd_microblock_hdr_t *)0)->hash), unexpected poh hash size );
80 :
81 : #define POOL_NAME block_pool
82 0 : #define POOL_T fd_sched_block_t
83 0 : #define POOL_NEXT next
84 : #include "../../util/tmpl/fd_pool.c"
85 :
86 : #define MAP_NAME block_map
87 : #define MAP_ELE_T fd_sched_block_t
88 : #define MAP_KEY_T fd_sched_block_id_t
89 0 : #define MAP_KEY block_id
90 0 : #define MAP_KEY_EQ(k0,k1) (!memcmp((k0),(k1), sizeof(fd_sched_block_id_t)))
91 0 : #define MAP_KEY_HASH(key,seed) (fd_ulong_hash((key)->id^seed))
92 0 : #define MAP_NEXT next
93 : #include "../../util/tmpl/fd_map_chain.c"
94 :
95 : struct fd_sched_metrics {
96 : uint block_added_cnt;
97 : uint block_added_staged_cnt;
98 : uint block_added_unstaged_cnt;
99 : uint block_added_dead_ood_cnt;
100 : uint block_removed_cnt;
101 : uint block_abandoned_cnt;
102 : uint block_promoted_cnt;
103 : uint block_demoted_cnt;
104 : uint deactivate_no_child_cnt;
105 : uint deactivate_no_txn_cnt;
106 : uint deactivate_pruned_cnt;
107 : uint deactivate_abandoned_cnt;
108 : uint lane_switch_cnt;
109 : uint lane_promoted_cnt;
110 : uint lane_demoted_cnt;
111 : uint alut_success_cnt;
112 : uint alut_serializing_cnt;
113 : uint txn_abandoned_parsed_cnt;
114 : uint txn_abandoned_done_cnt;
115 : uint txn_max_in_flight_cnt;
116 : ulong txn_weighted_in_flight_cnt;
117 : ulong txn_weighted_in_flight_tickcount;
118 : ulong txn_none_in_flight_tickcount;
119 : ulong txn_parsed_cnt;
120 : ulong txn_done_cnt;
121 : ulong bytes_ingested_cnt;
122 : ulong bytes_ingested_unparsed_cnt;
123 : ulong bytes_dropped_cnt;
124 : ulong fec_cnt;
125 : };
126 : typedef struct fd_sched_metrics fd_sched_metrics_t;
127 :
128 : struct fd_sched {
129 : fd_sched_metrics_t metrics[ 1 ];
130 : long txn_in_flight_last_tick;
131 : ulong root_idx;
132 : fd_rdisp_t * rdisp;
133 : ulong active_block_idx; /* index of the actively replayed block, or null_idx if no block is
134 : actively replayed; has to have a transaction to dispatch; staged
135 : blocks that have no transactions to dispatch are not eligible for
136 : being active. */
137 : ulong staged_bitset; /* bit i set if staging lane i is occupied */
138 : ulong staged_head_block_idx[ FD_SCHED_MAX_STAGING_LANES ]; /* head of the linear chain in each staging lane, ignored if bit i is
139 : not set in the bitset */
140 : ulong txn_pool_free_cnt;
141 : fd_txn_p_t txn_pool[ FD_SCHED_MAX_DEPTH ];
142 : ulong txn_to_block_idx[ FD_SCHED_MAX_DEPTH ]; /* index of the block that the txn belongs to */
143 : fd_sched_block_t * block_pool; /* fd_pool of max_block_depth elements */
144 : block_map_t * block_map; /* map_chain */
145 : };
146 : typedef struct fd_sched fd_sched_t;
147 :
148 :
149 : /* Internal helpers. */
150 :
151 : static void
152 : add_block( fd_sched_t * sched,
153 : fd_sched_block_id_t * block_id,
154 : fd_sched_block_id_t * parent_block_id,
155 : fd_sched_block_t * * out_block,
156 : fd_sched_block_t * * out_parent_block );
157 :
158 : static void
159 : fd_sched_parse( fd_sched_t * sched, fd_sched_block_t * block, fd_sched_alut_ctx_t * alut_ctx );
160 :
161 : static int
162 : fd_sched_parse_txn( fd_sched_t * sched, fd_sched_block_t * block, fd_sched_alut_ctx_t * alut_ctx );
163 :
164 : static void
165 : try_activate_block( fd_sched_t * sched );
166 :
167 : static void
168 : subtree_abandon( fd_sched_t * sched, fd_sched_block_t * block );
169 :
170 : FD_FN_UNUSED static ulong
171 : find_and_stage_longest_unstaged_fork( fd_sched_t * sched, int lane_idx );
172 :
173 : static ulong
174 : compute_longest_unstaged_fork( fd_sched_t * sched, ulong block_idx );
175 :
176 : static ulong
177 : stage_longest_unstaged_fork( fd_sched_t * sched, ulong block_idx, int lane_idx );
178 :
179 : FD_FN_UNUSED static inline int
180 0 : block_is_void( fd_sched_block_t * block ) {
181 0 : /* We've seen everything in the block and no transaction got parsed
182 0 : out. */
183 0 : return block->fec_eos && block->txn_parsed_cnt==0;
184 0 : }
185 :
186 : static inline int
187 0 : block_should_signal_end( fd_sched_block_t * block ) {
188 0 : ulong txn_queued_cnt = block->txn_parsed_cnt-block->txn_in_flight_cnt-block->txn_done_cnt;
189 0 : return block->fec_eos && txn_queued_cnt==0UL && !block->block_end_signaled;
190 0 : }
191 :
192 : static inline int
193 0 : block_is_dispatchable( fd_sched_block_t * block ) {
194 0 : return block->txn_parsed_cnt>block->txn_done_cnt ||
195 0 : !block->block_start_signaled ||
196 0 : block_should_signal_end( block );
197 0 : }
198 :
199 : static inline int
200 0 : block_is_done( fd_sched_block_t * block ) {
201 0 : return block->fec_eos && !block_is_dispatchable( block );
202 0 : }
203 :
204 : static inline int
205 0 : block_is_stageable( fd_sched_block_t * block ) {
206 0 : int rv = !block_is_done( block ) && !block->dying;
207 0 : if( FD_UNLIKELY( rv && !block->in_rdisp ) ) {
208 : /* Invariant: stageable blocks may be currently staged or unstaged,
209 : but must be in the dispatcher either way. When a block
210 : transitions to DONE, it will be immediately removed from the
211 : dispatcher. When a block transitions to DYING, it will be
212 : eventually abandoned from the dispatcher. */
213 0 : FD_LOG_CRIT(( "invariant violation: stageable block->in_rdisp==0, slot %lu, prime %lu",
214 0 : (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
215 0 : }
216 0 : return rv;
217 0 : }
218 :
219 : static inline int
220 0 : block_is_promotable( fd_sched_block_t * block ) {
221 0 : return block_is_stageable( block ) && !block->staged;
222 0 : }
223 :
224 : static inline int
225 0 : block_is_activatable( fd_sched_block_t * block ) {
226 0 : return block_is_stageable( block ) && block_is_dispatchable( block ) && block->staged;
227 0 : }
228 :
229 : FD_FN_UNUSED static void
230 0 : debug_print_block( fd_sched_block_t * block ) {
231 0 : FD_LOG_INFO(( "block slot %lu, prime %lu, staged %d (lane %lu), dying %d, in_rdisp %d, fec_eos %d, rooted %d, block_start_signaled %d, block_end_signaled %d, txn_parsed_cnt %u, txn_in_flight_cnt %u, txn_done_cnt %u, shred_cnt %u",
232 0 : (ulong)block->block_id.slot, (ulong)block->block_id.prime, block->staged, block->staging_lane, block->dying, block->in_rdisp, block->fec_eos, block->rooted, block->block_start_signaled, block->block_end_signaled, block->txn_parsed_cnt, block->txn_in_flight_cnt, block->txn_done_cnt, block->shred_cnt ));
233 0 : }
234 :
235 : FD_FN_UNUSED static void
236 0 : debug_print_metrics( fd_sched_t * sched ) {
237 0 : FD_LOG_INFO(( "metrics: block_added_cnt %u, block_added_staged_cnt %u, block_added_unstaged_cnt %u, block_added_dead_ood_cnt %u, block_removed_cnt %u, block_abandoned_cnt %u, block_promoted_cnt %u, block_demoted_cnt %u, deactivate_no_child_cnt %u, deactivate_no_txn_cnt %u, deactivate_pruned_cnt %u, deactivate_abandoned_cnt %u, lane_switch_cnt %u, lane_promoted_cnt %u, lane_demoted_cnt %u, alut_success_cnt %u, alut_serializing_cnt %u, txn_abandoned_parsed_cnt %u, txn_abandoned_done_cnt %u, txn_max_in_flight_cnt %u, txn_weighted_in_flight_cnt %lu, txn_weighted_in_flight_tickcount %lu, txn_none_in_flight_tickcount %lu, txn_parsed_cnt %lu, txn_done_cnt %lu, bytes_ingested_cnt %lu, bytes_ingested_unparsed_cnt %lu, bytes_dropped_cnt %lu, fec_cnt %lu",
238 0 : sched->metrics->block_added_cnt, sched->metrics->block_added_staged_cnt, sched->metrics->block_added_unstaged_cnt, sched->metrics->block_added_dead_ood_cnt, sched->metrics->block_removed_cnt, sched->metrics->block_abandoned_cnt, sched->metrics->block_promoted_cnt, sched->metrics->block_demoted_cnt, sched->metrics->deactivate_no_child_cnt, sched->metrics->deactivate_no_txn_cnt, sched->metrics->deactivate_pruned_cnt, sched->metrics->deactivate_abandoned_cnt, sched->metrics->lane_switch_cnt, sched->metrics->lane_promoted_cnt, sched->metrics->lane_demoted_cnt, sched->metrics->alut_success_cnt, sched->metrics->alut_serializing_cnt, sched->metrics->txn_abandoned_parsed_cnt, sched->metrics->txn_abandoned_done_cnt, sched->metrics->txn_max_in_flight_cnt, sched->metrics->txn_weighted_in_flight_cnt, sched->metrics->txn_weighted_in_flight_tickcount, sched->metrics->txn_none_in_flight_tickcount, sched->metrics->txn_parsed_cnt, sched->metrics->txn_done_cnt, sched->metrics->bytes_ingested_cnt, sched->metrics->bytes_ingested_unparsed_cnt, sched->metrics->bytes_dropped_cnt, sched->metrics->fec_cnt ));
239 0 : }
240 :
241 : /* Public functions. */
242 :
243 0 : ulong fd_sched_align( void ) {
244 0 : return fd_ulong_max( alignof(fd_sched_t),
245 0 : fd_ulong_max( fd_rdisp_align(),
246 0 : fd_ulong_max( block_map_align(),
247 0 : fd_ulong_max( block_pool_align(), 64UL )))); /* Minimally cache line aligned. */
248 0 : }
249 :
250 : ulong
251 0 : fd_sched_footprint( void ) {
252 0 : ulong chain_cnt = block_map_chain_cnt_est( FD_SCHED_MAX_BLOCK_DEPTH );
253 :
254 0 : ulong l = FD_LAYOUT_INIT;
255 0 : l = FD_LAYOUT_APPEND( l, fd_sched_align(), sizeof(fd_sched_t) );
256 0 : l = FD_LAYOUT_APPEND( l, fd_rdisp_align(), fd_rdisp_footprint ( FD_SCHED_MAX_DEPTH, FD_SCHED_MAX_BLOCK_DEPTH ) ); /* dispatcher */
257 0 : l = FD_LAYOUT_APPEND( l, block_map_align(), block_map_footprint ( chain_cnt ) ); /* block map */
258 0 : l = FD_LAYOUT_APPEND( l, block_pool_align(), block_pool_footprint( FD_SCHED_MAX_BLOCK_DEPTH ) ); /* block pool */
259 0 : return FD_LAYOUT_FINI( l, fd_sched_align() );
260 0 : }
261 :
262 : void *
263 0 : fd_sched_new( void * mem ) {
264 0 : ulong chain_cnt = block_map_chain_cnt_est( FD_SCHED_MAX_BLOCK_DEPTH );
265 :
266 0 : FD_SCRATCH_ALLOC_INIT( l, mem );
267 0 : fd_sched_t * sched = FD_SCRATCH_ALLOC_APPEND( l, fd_sched_align(), sizeof(fd_sched_t) );
268 0 : void * _rdisp = FD_SCRATCH_ALLOC_APPEND( l, fd_rdisp_align(), fd_rdisp_footprint ( FD_SCHED_MAX_DEPTH, FD_SCHED_MAX_BLOCK_DEPTH ) );
269 0 : void * _bmap = FD_SCRATCH_ALLOC_APPEND( l, block_map_align(), block_map_footprint ( chain_cnt ) );
270 0 : void * _bpool = FD_SCRATCH_ALLOC_APPEND( l, block_pool_align(), block_pool_footprint( FD_SCHED_MAX_BLOCK_DEPTH ) );
271 0 : FD_SCRATCH_ALLOC_FINI( l, fd_sched_align() );
272 :
273 0 : ulong seed = ((ulong)fd_tickcount()) ^ FD_SCHED_MAGIC;
274 0 : fd_rdisp_new ( _rdisp, FD_SCHED_MAX_DEPTH, FD_SCHED_MAX_BLOCK_DEPTH, seed );
275 0 : block_map_new ( _bmap, chain_cnt, seed+1UL );
276 0 : block_pool_new( _bpool, FD_SCHED_MAX_BLOCK_DEPTH );
277 :
278 0 : fd_sched_block_t * _bpool_join = block_pool_join( _bpool );
279 0 : ulong null_idx = block_pool_idx_null( _bpool_join );
280 0 : block_pool_leave( _bpool_join );
281 :
282 0 : fd_memset( sched->metrics, 0, sizeof(fd_sched_metrics_t) );
283 0 : sched->txn_in_flight_last_tick = LONG_MAX;
284 :
285 0 : sched->root_idx = null_idx;
286 0 : sched->active_block_idx = null_idx;
287 0 : sched->staged_bitset = 0UL;
288 :
289 0 : sched->txn_pool_free_cnt = FD_SCHED_MAX_DEPTH-1UL; /* -1 because index 0 is unusable as a sentinel reserved by the dispatcher */
290 :
291 0 : return sched;
292 0 : }
293 :
294 : fd_sched_t *
295 0 : fd_sched_join( void * mem ) {
296 0 : fd_sched_t * sched = (fd_sched_t *)mem;
297 :
298 0 : ulong chain_cnt = block_map_chain_cnt_est( FD_SCHED_MAX_BLOCK_DEPTH );
299 :
300 0 : FD_SCRATCH_ALLOC_INIT( l, mem );
301 0 : /* */ FD_SCRATCH_ALLOC_APPEND( l, fd_sched_align(), sizeof(fd_sched_t) );
302 0 : void * _rdisp = FD_SCRATCH_ALLOC_APPEND( l, fd_rdisp_align(), fd_rdisp_footprint ( FD_SCHED_MAX_DEPTH, FD_SCHED_MAX_BLOCK_DEPTH ) );
303 0 : void * _bmap = FD_SCRATCH_ALLOC_APPEND( l, block_map_align(), block_map_footprint ( chain_cnt ) );
304 0 : void * _bpool = FD_SCRATCH_ALLOC_APPEND( l, block_pool_align(), block_pool_footprint( FD_SCHED_MAX_BLOCK_DEPTH ) );
305 0 : FD_SCRATCH_ALLOC_FINI( l, fd_sched_align() );
306 :
307 0 : sched->rdisp = fd_rdisp_join( _rdisp );
308 0 : sched->block_map = block_map_join( _bmap );
309 0 : sched->block_pool = block_pool_join( _bpool );
310 :
311 0 : return sched;
312 0 : }
313 :
314 : int
315 0 : fd_sched_fec_can_ingest( fd_sched_t * sched, fd_sched_fec_t * fec ) {
316 0 : if( FD_UNLIKELY( fec->fec->data_sz>FD_SCHED_MAX_PAYLOAD_PER_FEC ) ) {
317 0 : FD_LOG_CRIT(( "invalid FEC set: fec->data_sz %lu, fec->mr %s, slot %lu, prime %lu, parent slot %lu, parent prime %lu",
318 0 : fec->fec->data_sz, FD_BASE58_ENC_32_ALLOCA( fec->fec->key.mr.hash ), (ulong)fec->block_id.slot, (ulong)fec->block_id.prime, (ulong)fec->parent_block_id.slot, (ulong)fec->parent_block_id.prime ));
319 0 : }
320 :
321 0 : ulong fec_buf_sz = 0UL;
322 0 : fd_sched_block_t * block = block_map_ele_query( sched->block_map, &fec->block_id, NULL, sched->block_pool );
323 0 : if( FD_LIKELY( block ) ) {
324 0 : fec_buf_sz += block->fec_buf_sz-block->fec_buf_soff;
325 0 : } else {
326 : /* This FEC set will need to allocate a new block from the pool. */
327 0 : if( FD_UNLIKELY( !block_pool_free( sched->block_pool ) ) ) {
328 0 : return 0;
329 0 : }
330 0 : }
331 : /* Addition is safe and won't overflow because we checked the FEC set
332 : size above. */
333 0 : fec_buf_sz += fec->fec->data_sz;
334 : /* Assuming every transaction is min size, do we have enough free
335 : entries in the txn pool? For a more precise txn count, we would
336 : have to do some parsing. */
337 0 : return sched->txn_pool_free_cnt>=fec_buf_sz/FD_TXN_MIN_SERIALIZED_SZ;
338 0 : }
339 :
340 : int
341 0 : fd_sched_can_ingest( fd_sched_t * sched ) {
342 : /* Assume worst case we will need to allocate a new block from the
343 : pool. */
344 0 : if( FD_UNLIKELY( !block_pool_free( sched->block_pool ) ) ) {
345 0 : return 0;
346 0 : }
347 :
348 : /* Worst case, we need one byte from the incoming data to extract a
349 : transaction out of the residual data, and the rest of the incoming
350 : data contributes toward min sized transactions. */
351 0 : ulong txn_cnt = (FD_SCHED_MAX_PAYLOAD_PER_FEC-1UL)/FD_TXN_MIN_SERIALIZED_SZ+1UL; /* 478 */
352 0 : return sched->txn_pool_free_cnt>=txn_cnt;
353 0 : }
354 :
355 : void
356 0 : fd_sched_fec_ingest( fd_sched_t * sched, fd_sched_fec_t * fec ) {
357 0 : if( FD_UNLIKELY( fec->fec->data_sz>FD_SCHED_MAX_PAYLOAD_PER_FEC ) ) {
358 0 : FD_LOG_CRIT(( "invalid FEC set: fec->data_sz %lu, fec->mr %s, slot %lu, prime %lu, parent slot %lu, parent prime %lu",
359 0 : fec->fec->data_sz, FD_BASE58_ENC_32_ALLOCA( fec->fec->key.mr.hash ), (ulong)fec->block_id.slot, (ulong)fec->block_id.prime, (ulong)fec->parent_block_id.slot, (ulong)fec->parent_block_id.prime ));
360 0 : }
361 :
362 0 : ulong null_idx = block_pool_idx_null( sched->block_pool );
363 :
364 0 : fd_sched_block_t * block = block_map_ele_query( sched->block_map, &fec->block_id, NULL, sched->block_pool );
365 0 : if( FD_UNLIKELY( !block ) ) {
366 : /* This is a new block. */
367 0 : fd_sched_block_t * parent_block = NULL;
368 0 : add_block( sched, &fec->block_id, &fec->parent_block_id, &block, &parent_block );
369 :
370 0 : if( FD_UNLIKELY( block->dying ) ) {
371 : /* The child of a dead block is also dead. We added it to our
372 : fork tree just so we could track an entire lineage of dead
373 : children and propagate the dead property to the entire lineage,
374 : in case there were frags for more than one dead children
375 : in-flight at the time the parent was abandoned. That being
376 : said, we shouldn't need to add the dead child to the
377 : dispatcher. */
378 0 : sched->metrics->block_added_dead_ood_cnt++;
379 :
380 : /* Ignore the FEC set for a dead block. */
381 0 : sched->metrics->bytes_dropped_cnt += fec->fec->data_sz;
382 0 : return;
383 0 : }
384 :
385 : /* Try to find a staging lane for this block. */
386 0 : int alloc_lane = 0;
387 0 : if( FD_LIKELY( parent_block->staged ) ) {
388 : /* Parent is staged. So see if we can continue down the same
389 : staging lane. */
390 0 : ulong staging_lane = parent_block->staging_lane;
391 0 : ulong child_idx = parent_block->child_idx;
392 0 : while( child_idx != null_idx ) {
393 0 : fd_sched_block_t * child = block_pool_ele( sched->block_pool, child_idx );
394 0 : if( child->staged && child->staging_lane==staging_lane ) {
395 : /* Found a child on the same lane. So we're done. */
396 0 : staging_lane = FD_RDISP_UNSTAGED;
397 0 : break;
398 0 : }
399 0 : child_idx = child->sibling_idx;
400 0 : }
401 : /* No child is staged on the same lane as the parent. So stage
402 : this block. This is the common case. */
403 0 : if( FD_LIKELY( staging_lane!=FD_RDISP_UNSTAGED ) ) {
404 0 : block->in_rdisp = 1;
405 0 : block->staged = 1;
406 0 : block->staging_lane = staging_lane;
407 0 : fd_rdisp_add_block( sched->rdisp, block->block_id.id, staging_lane );
408 0 : sched->metrics->block_added_cnt++;
409 0 : sched->metrics->block_added_staged_cnt++;
410 0 : } else {
411 0 : alloc_lane = 1;
412 0 : }
413 0 : } else {
414 0 : if( block_is_stageable( parent_block ) ) {
415 : /* Parent is unstaged but stageable. So let's be unstaged too.
416 : This is a policy decision to be lazy and not promote parent
417 : at the moment. */
418 0 : block->in_rdisp = 1;
419 0 : block->staged = 0;
420 0 : fd_rdisp_add_block( sched->rdisp, block->block_id.id, FD_RDISP_UNSTAGED );
421 0 : sched->metrics->block_added_cnt++;
422 0 : sched->metrics->block_added_unstaged_cnt++;
423 0 : } else {
424 0 : alloc_lane = 1;
425 0 : }
426 0 : }
427 0 : if( FD_UNLIKELY( alloc_lane ) ) {
428 : /* We weren't able to inherit the parent's staging lane. So try
429 : to find a new staging lane. */
430 0 : if( FD_LIKELY( sched->staged_bitset!=fd_ulong_mask_lsb( FD_SCHED_MAX_STAGING_LANES ) ) ) { /* Optimize for lane available. */
431 0 : int lane_idx = fd_ulong_find_lsb( ~sched->staged_bitset );
432 0 : if( FD_UNLIKELY( lane_idx>=(int)FD_SCHED_MAX_STAGING_LANES ) ) {
433 0 : FD_LOG_CRIT(( "invariant violation: lane_idx %d, sched->staged_bitset %lx",
434 0 : lane_idx, sched->staged_bitset ));
435 0 : }
436 0 : sched->staged_bitset = fd_ulong_set_bit( sched->staged_bitset, lane_idx );
437 0 : sched->staged_head_block_idx[ lane_idx ] = block_pool_idx( sched->block_pool, block );
438 0 : block->in_rdisp = 1;
439 0 : block->staged = 1;
440 0 : block->staging_lane = (ulong)lane_idx;
441 0 : fd_rdisp_add_block( sched->rdisp, block->block_id.id, block->staging_lane );
442 0 : sched->metrics->block_added_cnt++;
443 0 : sched->metrics->block_added_staged_cnt++;
444 0 : } else {
445 : /* No lanes available. */
446 0 : block->in_rdisp = 1;
447 0 : block->staged = 0;
448 0 : fd_rdisp_add_block( sched->rdisp, block->block_id.id, FD_RDISP_UNSTAGED );
449 0 : sched->metrics->block_added_cnt++;
450 0 : sched->metrics->block_added_unstaged_cnt++;
451 0 : }
452 0 : }
453 0 : }
454 :
455 0 : if( FD_UNLIKELY( block->dying ) ) {
456 : /* Ignore the FEC set for a dead block. */
457 0 : sched->metrics->bytes_dropped_cnt += fec->fec->data_sz;
458 0 : return;
459 0 : }
460 :
461 0 : if( FD_UNLIKELY( !block->in_rdisp ) ) {
462 : /* Invariant: block must be in the dispatcher at this point. */
463 0 : FD_LOG_CRIT(( "invariant violation: block->in_rdisp==0, slot %lu, prime %lu",
464 0 : (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
465 0 : }
466 :
467 0 : if( FD_UNLIKELY( block->fec_eos ) ) {
468 : /* This means something is wrong upstream. */
469 0 : FD_LOG_CRIT(( "invariant violation: block->fec_eos set but getting more FEC sets fec->mr %s, slot %lu, prime %lu, parent slot %lu, parent prime %lu",
470 0 : FD_BASE58_ENC_32_ALLOCA( fec->fec->key.mr.hash ), (ulong)fec->block_id.slot, (ulong)fec->block_id.prime, (ulong)fec->parent_block_id.slot, (ulong)fec->parent_block_id.prime ));
471 0 : }
472 0 : if( FD_UNLIKELY( block->fec_eob && fec->is_last_in_batch ) ) {
473 : /* This means the previous batch didn't parse properly. So this is
474 : a bad block. We should refuse to replay down the fork. */
475 0 : FD_LOG_WARNING(( "invariant violation: block->fec_eob set but getting another FEC set that is last in batch fec->mr %s, slot %lu, prime %lu, parent slot %lu, parent prime %lu",
476 0 : FD_BASE58_ENC_32_ALLOCA( fec->fec->key.mr.hash ), (ulong)fec->block_id.slot, (ulong)fec->block_id.prime, (ulong)fec->parent_block_id.slot, (ulong)fec->parent_block_id.prime ));
477 0 : block->dying = 1;//FIXME inform replay/banks that it's dead?
478 0 : sched->metrics->bytes_dropped_cnt += fec->fec->data_sz;
479 0 : return;
480 0 : }
481 0 : if( FD_UNLIKELY( block->child_idx!=null_idx ) ) {
482 : /* This means something is wrong upstream. FEC sets are not being
483 : delivered in replay order. */
484 0 : FD_LOG_CRIT(( "invariant violation: block->child_idx %lu, fec->mr %s, slot %lu, prime %lu, parent slot %lu, parent prime %lu",
485 0 : block->child_idx, FD_BASE58_ENC_32_ALLOCA( fec->fec->key.mr.hash ), (ulong)fec->block_id.slot, (ulong)fec->block_id.prime, (ulong)fec->parent_block_id.slot, (ulong)fec->parent_block_id.prime ));
486 0 : }
487 :
488 0 : FD_TEST( block->fec_buf_sz>=block->fec_buf_soff );
489 0 : if( FD_LIKELY( block->fec_buf_sz>block->fec_buf_soff ) ) {
490 : /* If there is residual data from the previous FEC set within the
491 : same batch, we move it to the beginning of the buffer and append
492 : the new FEC set. */
493 0 : memmove( block->fec_buf, block->fec_buf+block->fec_buf_soff, block->fec_buf_sz-block->fec_buf_soff );
494 0 : }
495 0 : block->fec_buf_sz -= block->fec_buf_soff;
496 0 : block->fec_buf_soff = 0;
497 : /* Addition is safe and won't overflow because we checked the FEC
498 : set size above. */
499 0 : if( FD_UNLIKELY( block->fec_buf_sz-block->fec_buf_soff+fec->fec->data_sz>FD_SCHED_MAX_FEC_BUF_SZ ) ) {
500 : /* In a conformant block, there shouldn't be more than a
501 : transaction's worth of residual data left over from the previous
502 : FEC set within the same batch. So if this condition doesn't
503 : hold, it's a bad block. Instead of crashing, we should refuse to
504 : replay down the fork. */
505 0 : FD_LOG_WARNING(( "bad block: fec_buf_sz %u, fec_buf_soff %u, fec->data_sz %lu, fec->mr %s, slot %lu, prime %lu, parent slot %lu, parent prime %lu",
506 0 : block->fec_buf_sz, block->fec_buf_soff, fec->fec->data_sz, FD_BASE58_ENC_32_ALLOCA( fec->fec->key.mr.hash ), (ulong)fec->block_id.slot, (ulong)fec->block_id.prime, (ulong)fec->parent_block_id.slot, (ulong)fec->parent_block_id.prime ));
507 0 : block->dying = 1;//FIXME inform replay/banks that it's dead?
508 0 : sched->metrics->bytes_dropped_cnt += fec->fec->data_sz;
509 0 : return;
510 0 : }
511 :
512 0 : block->shred_cnt += fec->shred_cnt;
513 0 : sched->metrics->fec_cnt++;
514 :
515 : /* Append the new FEC set to the end of the buffer. */
516 0 : fd_memcpy( block->fec_buf+block->fec_buf_sz, fec->fec->data, fec->fec->data_sz );
517 0 : block->fec_buf_sz += (uint)fec->fec->data_sz;
518 0 : sched->metrics->bytes_ingested_cnt += fec->fec->data_sz;
519 :
520 0 : block->fec_eob = fec->is_last_in_batch;
521 0 : block->fec_eos = fec->is_last_in_block;
522 :
523 0 : fd_sched_parse( sched, block, fec->alut_ctx );
524 :
525 : /* Check if we need to set the active block. */
526 0 : if( FD_UNLIKELY( sched->active_block_idx==null_idx ) ) {
527 0 : try_activate_block( sched );
528 0 : } else {
529 0 : fd_sched_block_t * active_block = block_pool_ele( sched->block_pool, sched->active_block_idx );
530 0 : if( FD_UNLIKELY( !block_is_activatable( active_block ) ) ) {
531 0 : FD_LOG_CRIT(( "invariant violation: active_block_idx %lu is not activatable, txn_parsed_cnt %u, txn_done_cnt %u, fec_eos %u, dying %u, slot %lu, prime %lu",
532 0 : sched->active_block_idx, active_block->txn_parsed_cnt, active_block->txn_done_cnt, (uint)active_block->fec_eos, (uint)active_block->dying, (ulong)active_block->block_id.slot, (ulong)active_block->block_id.prime ));
533 0 : }
534 0 : }
535 :
536 0 : return;
537 0 : }
538 :
539 : ulong
540 0 : fd_sched_txn_next_ready( fd_sched_t * sched, fd_sched_txn_ready_t * out_txn ) {
541 0 : ulong null_idx = block_pool_idx_null( sched->block_pool );
542 0 : if( FD_UNLIKELY( sched->active_block_idx==null_idx ) ) {
543 : /* No need to try activating a block. If we're in this state,
544 : there's truly nothing to execute. We will activate something
545 : when we ingest a FEC set with transactions. */
546 0 : return 0UL;
547 0 : }
548 :
549 0 : out_txn->txn_id = FD_SCHED_TXN_ID_NULL;
550 0 : out_txn->block_start = 0;
551 0 : out_txn->block_end = 0;
552 :
553 : /* We could in theory reevaluate staging lane allocation here and do
554 : promotion/demotion as needed. It's a policy decision to minimize
555 : fork churn for now and just execute down the same active fork. */
556 :
557 0 : fd_sched_block_t * block = block_pool_ele( sched->block_pool, sched->active_block_idx );
558 0 : if( FD_UNLIKELY( !block_is_activatable( block ) ) ) {
559 0 : FD_LOG_CRIT(( "invariant violation: active_block_idx %lu is not activatable, txn_parsed_cnt %u, txn_done_cnt %u, fec_eos %u, dying %u, slot %lu, prime %lu",
560 0 : sched->active_block_idx, block->txn_parsed_cnt, block->txn_done_cnt, (uint)block->fec_eos, (uint)block->dying, (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
561 0 : }
562 :
563 0 : if( FD_UNLIKELY( !block->block_start_signaled ) ) {
564 0 : out_txn->txn_id = FD_SCHED_TXN_ID_BLOCK_START;
565 0 : out_txn->block_id = block->block_id;
566 0 : out_txn->parent_block_id = block_pool_ele( sched->block_pool, block->parent_idx )->block_id;
567 0 : out_txn->block_start = 1;
568 :
569 0 : block->block_start_signaled = 1;
570 0 : return 1UL;
571 0 : }
572 :
573 0 : ulong txn_queued_cnt = block->txn_parsed_cnt-block->txn_in_flight_cnt-block->txn_done_cnt;
574 0 : if( FD_LIKELY( txn_queued_cnt>0 ) ) { /* Optimize for no fork switching. */
575 0 : out_txn->txn_id = fd_rdisp_get_next_ready( sched->rdisp, block->block_id.id );
576 0 : if( FD_UNLIKELY( out_txn->txn_id==0UL ) ) {
577 : /* There are transactions queued but none ready for execution.
578 : This implies that there must be in-flight transactions on whose
579 : completion the queued transactions depend. So we return and
580 : wait for those in-flight transactions to retire. This is a
581 : policy decision to execute as much as we can down the current
582 : fork. */
583 0 : if( FD_UNLIKELY( !block->txn_in_flight_cnt ) ) {
584 0 : FD_LOG_CRIT(( "invariant violation: no ready transaction found but block->txn_in_flight_cnt==0, txn_parsed_cnt %u, txn_queued_cnt %lu, fec_eos %u, slot %lu, prime %lu",
585 0 : block->txn_parsed_cnt, txn_queued_cnt, (uint)block->fec_eos, (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
586 0 : }
587 0 : return 0UL;
588 0 : }
589 0 : out_txn->block_id = block->block_id;
590 0 : out_txn->parent_block_id = block_pool_ele( sched->block_pool, block->parent_idx )->block_id;
591 :
592 0 : long now = fd_tickcount();
593 0 : ulong delta = (ulong)(now-sched->txn_in_flight_last_tick);
594 0 : sched->metrics->txn_none_in_flight_tickcount += fd_ulong_if( block->txn_in_flight_cnt==0U && sched->txn_in_flight_last_tick!=LONG_MAX, delta, 0UL );
595 0 : sched->metrics->txn_weighted_in_flight_tickcount += fd_ulong_if( block->txn_in_flight_cnt!=0U, delta, 0UL );
596 0 : sched->metrics->txn_weighted_in_flight_cnt += delta*block->txn_in_flight_cnt;
597 0 : sched->txn_in_flight_last_tick = now;
598 :
599 0 : block->txn_in_flight_cnt++;
600 0 : txn_queued_cnt--;
601 0 : sched->metrics->txn_max_in_flight_cnt = fd_uint_max( sched->metrics->txn_max_in_flight_cnt, block->txn_in_flight_cnt );
602 0 : return 1UL;
603 0 : }
604 :
605 0 : if( FD_UNLIKELY( block_should_signal_end( block ) ) ) {
606 0 : FD_TEST( block->block_start_signaled );
607 0 : out_txn->txn_id = FD_SCHED_TXN_ID_BLOCK_END;
608 0 : out_txn->block_id = block->block_id;
609 0 : out_txn->parent_block_id = block_pool_ele( sched->block_pool, block->parent_idx )->block_id;
610 0 : out_txn->block_end = 1;
611 :
612 0 : block->block_end_signaled = 1;
613 0 : return 1UL;
614 0 : }
615 :
616 : /* Nothing queued for the active block. If we haven't received all
617 : the FEC sets for it, then return and wait for more FEC sets, while
618 : there are in-flight transactions. This is a policy decision to
619 : minimize fork churn and allow for executing down the current fork
620 : as much as we can. If we have received all the FEC sets for it,
621 : then we'd still like to return and wait for the in-flight
622 : transactions to retire, before switching to a different block.
623 :
624 : Either way, there should be in-flight transactions. We deactivate
625 : the active block the moment we exhausted transactions from it. */
626 0 : if( FD_UNLIKELY( !block->txn_in_flight_cnt ) ) {
627 0 : FD_LOG_CRIT(( "invariant violation: expected in-flight transactions but none, txn_parsed_cnt %u, txn_done_cnt %u, fec_eos %u, slot %lu, prime %lu",
628 0 : block->txn_parsed_cnt, block->txn_done_cnt, (uint)block->fec_eos, (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
629 0 : }
630 :
631 0 : return 0UL;
632 0 : }
633 :
634 : void
635 0 : fd_sched_txn_done( fd_sched_t * sched, ulong txn_id ) {
636 0 : FD_TEST( txn_id!=FD_SCHED_TXN_ID_NULL );
637 :
638 0 : ulong null_idx = block_pool_idx_null( sched->block_pool );
639 0 : ulong block_idx = fd_ulong_if( txn_id==FD_SCHED_TXN_ID_BLOCK_START||txn_id==FD_SCHED_TXN_ID_BLOCK_END, sched->active_block_idx, sched->txn_to_block_idx[ txn_id ] );
640 0 : fd_sched_block_t * block = block_pool_ele( sched->block_pool, block_idx );
641 :
642 0 : if( FD_UNLIKELY( !block->staged ) ) {
643 : /* Invariant: only staged blocks can have in-flight transactions. */
644 0 : FD_LOG_CRIT(( "invariant violation: block->staged==0, slot %lu, prime %lu",
645 0 : (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
646 0 : }
647 0 : if( FD_UNLIKELY( !block->in_rdisp ) ) {
648 : /* Invariant: staged blocks must be in the dispatcher. */
649 0 : FD_LOG_CRIT(( "invariant violation: block->in_rdisp==0, slot %lu, prime %lu",
650 0 : (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
651 0 : }
652 :
653 0 : if( FD_LIKELY( txn_id!=FD_SCHED_TXN_ID_BLOCK_START && txn_id!=FD_SCHED_TXN_ID_BLOCK_END ) ) {
654 0 : FD_TEST( txn_id<FD_SCHED_MAX_DEPTH );
655 0 : long now = fd_tickcount();
656 0 : ulong delta = (ulong)(now-sched->txn_in_flight_last_tick);
657 0 : sched->metrics->txn_weighted_in_flight_tickcount += delta;
658 0 : sched->metrics->txn_weighted_in_flight_cnt += delta*block->txn_in_flight_cnt;
659 0 : sched->txn_in_flight_last_tick = now;
660 :
661 0 : block->txn_done_cnt++;
662 0 : block->txn_in_flight_cnt--;
663 0 : fd_rdisp_complete_txn( sched->rdisp, txn_id );
664 0 : sched->txn_pool_free_cnt++;
665 0 : sched->metrics->txn_done_cnt++;
666 0 : }
667 :
668 0 : if( FD_UNLIKELY( block->dying && block->txn_in_flight_cnt==0U ) ) {
669 0 : if( FD_UNLIKELY( sched->active_block_idx==block_idx ) ) {
670 0 : FD_LOG_CRIT(( "invariant violation: active block shouldn't be dying, block_idx %lu, slot %lu, prime %lu",
671 0 : block_idx, (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
672 0 : }
673 0 : subtree_abandon( sched, block );
674 0 : return;
675 0 : }
676 :
677 0 : if( FD_UNLIKELY( !block->dying && sched->active_block_idx!=block_idx ) ) {
678 : /* Block is not dead. So we should be actively replaying it. */
679 0 : fd_sched_block_t * active_block = block_pool_ele( sched->block_pool, sched->active_block_idx );
680 0 : FD_LOG_CRIT(( "invariant violation: sched->active_block_idx %lu, slot %lu, prime %lu, block_idx %lu, slot %lu, prime %lu",
681 0 : sched->active_block_idx, (ulong)active_block->block_id.slot, (ulong)active_block->block_id.prime,
682 0 : block_idx, (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
683 0 : }
684 :
685 0 : if( FD_UNLIKELY( block_is_done( block ) ) ) {
686 0 : block->in_rdisp = 0;
687 0 : block->staged = 0;
688 0 : fd_rdisp_remove_block( sched->rdisp, block->block_id.id );
689 0 : sched->metrics->block_removed_cnt++;
690 :
691 : /* See if there is a child block down the same staging lane. This
692 : is a policy decision to minimize fork churn. We could in theory
693 : reevaluate staging lane allocation here and do promotion/demotion
694 : as needed. */
695 0 : ulong child_idx = block->child_idx;
696 0 : while( child_idx != null_idx ) {
697 0 : fd_sched_block_t * child = block_pool_ele( sched->block_pool, child_idx );
698 0 : if( FD_LIKELY( child->staged && child->staging_lane==block->staging_lane ) ) {
699 : /* There is a child block down the same staging lane. So switch
700 : the active block to it, and have the child inherit the head
701 : status of the lane. This is the common case. */
702 0 : sched->active_block_idx = child_idx;
703 0 : sched->staged_head_block_idx[ block->staging_lane ] = child_idx;
704 0 : if( FD_UNLIKELY( !fd_ulong_extract_bit( sched->staged_bitset, (int)block->staging_lane ) ) ) {
705 0 : FD_LOG_CRIT(( "invariant violation: staged_bitset 0x%lx bit %lu is not set, slot %lu, prime %lu, child slot %lu, prime %lu",
706 0 : sched->staged_bitset, block->staging_lane, (ulong)block->block_id.slot, (ulong)block->block_id.prime, (ulong)child->block_id.slot, (ulong)child->block_id.prime ));
707 0 : }
708 0 : return;
709 0 : }
710 0 : child_idx = child->sibling_idx;
711 0 : }
712 : /* There isn't a child block down the same staging lane. This is
713 : the last block in the staging lane. Release the staging lane. */
714 0 : sched->staged_bitset = fd_ulong_clear_bit( sched->staged_bitset, (int)block->staging_lane );
715 0 : sched->staged_head_block_idx[ block->staging_lane ] = null_idx;
716 :
717 : /* Reset the active block. */
718 0 : sched->active_block_idx = null_idx;
719 0 : sched->metrics->deactivate_no_child_cnt++;
720 0 : try_activate_block( sched );
721 0 : } else if( !block_is_activatable( block ) ) {
722 : /* We exhaused the active block, but it's not fully done yet. We
723 : are just not getting FEC sets for it fast enough. This could
724 : happen when the network path is congested, or when the leader
725 : simply went down. Reset the active block. */
726 0 : sched->active_block_idx = null_idx;
727 0 : sched->metrics->deactivate_no_txn_cnt++;
728 0 : try_activate_block( sched );
729 0 : }
730 0 : }
731 :
732 : void
733 0 : fd_sched_block_abandon( fd_sched_t * sched, fd_sched_block_id_t * block_id ) {
734 0 : fd_sched_block_t * block = block_map_ele_query( sched->block_map, block_id, NULL, sched->block_pool );
735 0 : if( FD_UNLIKELY( !block ) ) {
736 0 : FD_LOG_CRIT(( "invariant violation: block not found slot %lu, prime %lu",
737 0 : (ulong)block_id->slot, (ulong)block_id->prime ));
738 0 : }
739 0 : ulong block_idx = block_pool_idx( sched->block_pool, block );
740 0 : if( FD_UNLIKELY( block_idx!=sched->active_block_idx ) ) {
741 : /* Invariant: abandoning should only be performed on actively
742 : replayed blocks. We impose this requirement on the caller
743 : because the dispatcher expects blocks to be abandoned in the same
744 : order that they were added, and having this requirement makes it
745 : easier to please the dispatcher. */
746 0 : FD_LOG_CRIT(( "invariant violation: active_block_idx %lu, block_idx %lu, slot %lu, prime %lu",
747 0 : sched->active_block_idx, block_idx, (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
748 0 : }
749 :
750 0 : subtree_abandon( sched, block );
751 :
752 : /* Reset the active block. */
753 0 : ulong null_idx = block_pool_idx_null( sched->block_pool );
754 0 : sched->active_block_idx = null_idx;
755 0 : sched->metrics->deactivate_abandoned_cnt++;
756 0 : try_activate_block( sched );
757 0 : }
758 :
759 : int
760 0 : fd_sched_block_is_done( fd_sched_t * sched, fd_sched_block_id_t * block_id ) {
761 0 : fd_sched_block_t * block = block_map_ele_query( sched->block_map, block_id, NULL, sched->block_pool );
762 0 : if( FD_UNLIKELY( !block ) ) {
763 0 : FD_LOG_CRIT(( "invariant violation: block not found slot %lu, prime %lu",
764 0 : (ulong)block_id->slot, (ulong)block_id->prime ));
765 0 : }
766 0 : return block_is_done( block );
767 0 : }
768 :
769 : void
770 0 : fd_sched_block_add_done( fd_sched_t * sched, fd_sched_block_id_t * block_id, fd_sched_block_id_t * parent_block_id ) {
771 0 : fd_sched_block_t * block = NULL;
772 0 : fd_sched_block_t * parent_block = NULL;
773 0 : add_block( sched, block_id, parent_block_id, &block, &parent_block );
774 0 : block->txn_done_cnt = block->txn_parsed_cnt = UINT_MAX;
775 0 : block->fec_eos = 1;
776 0 : block->block_start_signaled = 1;
777 0 : block->block_end_signaled = 1;
778 0 : if( FD_UNLIKELY( !parent_block_id ) ) {
779 : /* Assumes that a NULL parent implies the snapshot slot. */
780 0 : sched->root_idx = block_pool_idx( sched->block_pool, block );
781 0 : block->rooted = 1;
782 0 : }
783 0 : }
784 :
785 : void
786 0 : fd_sched_root_publish( fd_sched_t * sched, fd_sched_block_id_t * root ) {
787 0 : ulong null_idx = block_pool_idx_null( sched->block_pool );
788 :
789 0 : fd_sched_block_t * new_root = block_map_ele_query( sched->block_map, root, NULL, sched->block_pool );
790 0 : if( FD_UNLIKELY( !new_root ) ) {
791 0 : FD_LOG_CRIT(( "invariant violation: new_root not found slot %lu, prime %lu",
792 0 : (ulong)root->slot, (ulong)root->prime ));
793 0 : }
794 :
795 0 : fd_sched_block_t * old_root = block_pool_ele( sched->block_pool, sched->root_idx );
796 0 : if( FD_UNLIKELY( !old_root ) ) {
797 0 : FD_LOG_CRIT(( "invariant violation: old_root not found" ));
798 0 : }
799 0 : if( FD_UNLIKELY( !old_root->rooted ) ) {
800 0 : FD_LOG_CRIT(( "invariant violation: old_root is not rooted, slot %lu, prime %lu",
801 0 : (ulong)old_root->block_id.slot, (ulong)old_root->block_id.prime ));
802 0 : }
803 :
804 : /* Early exit if the new root is the same as the old root. */
805 0 : if( FD_UNLIKELY( old_root->block_id.id==new_root->block_id.id ) ) {
806 0 : FD_LOG_WARNING(( "new root is the same as the old root, slot %lu, prime %lu",
807 0 : (ulong)new_root->block_id.slot, (ulong)new_root->block_id.prime ));
808 0 : return;
809 0 : }
810 :
811 0 : fd_sched_block_t * head = block_map_ele_remove( sched->block_map, &old_root->block_id, NULL, sched->block_pool );
812 0 : head->next = null_idx;
813 0 : fd_sched_block_t * tail = head;
814 :
815 0 : while( head ) {
816 0 : fd_sched_block_t * child = block_pool_ele( sched->block_pool, head->child_idx );
817 0 : while( child ) {
818 : /* Add children to be pruned. */
819 0 : if( child!=new_root ) {
820 0 : tail->next = block_map_idx_remove( sched->block_map, &child->block_id, null_idx, sched->block_pool );
821 0 : tail = block_pool_ele( sched->block_pool, tail->next );
822 0 : tail->next = null_idx;
823 0 : }
824 0 : child = block_pool_ele( sched->block_pool, child->sibling_idx );
825 0 : }
826 :
827 : /* Prune the current block. We will never publish halfway into a
828 : staging lane, because anything on the rooted fork should have
829 : finished replaying gracefully and be out of the dispatcher. In
830 : fact, anything that we are publishing away should be out of the
831 : dispatcher at this point. And there should be no more in-flight
832 : transactions. */
833 0 : if( FD_UNLIKELY( head->txn_in_flight_cnt ) ) {
834 0 : FD_LOG_CRIT(( "invariant violation: block has transactions in flight, slot %lu, prime %lu",
835 0 : (ulong)head->block_id.slot, (ulong)head->block_id.prime ));
836 0 : }
837 0 : if( FD_UNLIKELY( head->in_rdisp ) ) {
838 : /* We should have removed it from the dispatcher when we were
839 : notified of the new root, or when in-flight transactions were
840 : drained. */
841 0 : FD_LOG_CRIT(( "invariant violation: block is in the dispatcher, slot %lu, prime %lu",
842 0 : (ulong)head->block_id.slot, (ulong)head->block_id.prime ));
843 0 : }
844 0 : fd_sched_block_t * next = block_pool_ele( sched->block_pool, head->next );
845 0 : block_pool_ele_release( sched->block_pool, head );
846 0 : head = next;
847 0 : }
848 :
849 0 : new_root->parent_idx = null_idx;
850 0 : sched->root_idx = block_pool_idx( sched->block_pool, new_root );
851 0 : }
852 :
853 : void
854 0 : fd_sched_root_notify( fd_sched_t * sched, fd_sched_block_id_t * root ) {
855 0 : ulong null_idx = block_pool_idx_null( sched->block_pool );
856 :
857 0 : fd_sched_block_t * block = block_map_ele_query( sched->block_map, root, NULL, sched->block_pool );
858 0 : if( FD_UNLIKELY( !block ) ) {
859 0 : FD_LOG_CRIT(( "invariant violation: block not found slot %lu, prime %lu",
860 0 : (ulong)root->slot, (ulong)root->prime ));
861 0 : }
862 :
863 0 : fd_sched_block_t * old_root = block_pool_ele( sched->block_pool, sched->root_idx );
864 0 : if( FD_UNLIKELY( !old_root ) ) {
865 0 : FD_LOG_CRIT(( "invariant violation: old_root not found" ));
866 0 : }
867 0 : if( FD_UNLIKELY( !old_root->rooted ) ) {
868 0 : FD_LOG_CRIT(( "invariant violation: old_root is not rooted, slot %lu, prime %lu",
869 0 : (ulong)old_root->block_id.slot, (ulong)old_root->block_id.prime ));
870 0 : }
871 :
872 : /* Early exit if the new root is the same as the old root. */
873 0 : if( FD_UNLIKELY( old_root->block_id.id==block->block_id.id ) ) {
874 0 : FD_LOG_WARNING(( "new root is the same as the old root, slot %lu, prime %lu",
875 0 : (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
876 0 : return;
877 0 : }
878 :
879 : /* Mark every node from the new root up through its parents to the
880 : old root as being rooted. */
881 0 : fd_sched_block_t * curr = block;
882 0 : fd_sched_block_t * prev = NULL;
883 0 : while( curr ) {
884 0 : if( FD_UNLIKELY( !block_is_done( curr ) ) ) {
885 0 : FD_LOG_CRIT(( "invariant violation: rooting a block that is not done, slot %lu, prime %lu",
886 0 : (ulong)curr->block_id.slot, (ulong)curr->block_id.prime ));
887 0 : }
888 0 : if( FD_UNLIKELY( curr->dying ) ) {
889 0 : FD_LOG_CRIT(( "invariant violation: rooting a block that is dying, slot %lu, prime %lu",
890 0 : (ulong)curr->block_id.slot, (ulong)curr->block_id.prime ));
891 0 : }
892 0 : if( FD_UNLIKELY( curr->staged ) ) {
893 0 : FD_LOG_CRIT(( "invariant violation: rooting a block that is staged, slot %lu, prime %lu",
894 0 : (ulong)curr->block_id.slot, (ulong)curr->block_id.prime ));
895 0 : }
896 0 : if( FD_UNLIKELY( curr->in_rdisp ) ) {
897 0 : FD_LOG_CRIT(( "invariant violation: rooting a block that is in the dispatcher, slot %lu, prime %lu",
898 0 : (ulong)curr->block_id.slot, (ulong)curr->block_id.prime ));
899 0 : }
900 0 : curr->rooted = 1;
901 0 : prev = curr;
902 0 : curr = block_pool_ele( sched->block_pool, curr->parent_idx );
903 0 : }
904 :
905 : /* If we didn't reach the old root, the new root is not a descendant. */
906 0 : if( FD_UNLIKELY( prev!=old_root ) ) {
907 0 : FD_LOG_CRIT(( "invariant violation: new root is not a descendant of old root, new root slot %lu, prime %lu, old root slot %lu, prime %lu",
908 0 : (ulong)block->block_id.slot, (ulong)block->block_id.prime, (ulong)old_root->block_id.slot, (ulong)old_root->block_id.prime ));
909 0 : }
910 :
911 0 : ulong old_active_block_idx = sched->active_block_idx;
912 :
913 : /* Now traverse from old root towards new root, and abandon all
914 : minority forks. */
915 0 : curr = old_root;
916 0 : while( curr && curr->rooted && curr!=block ) { /* curr!=block to avoid abandoning good forks. */
917 0 : fd_sched_block_t * rooted_child_block = NULL;
918 0 : ulong child_idx = curr->child_idx;
919 0 : while( child_idx!=null_idx ) {
920 0 : fd_sched_block_t * child = block_pool_ele( sched->block_pool, child_idx );
921 0 : if( child->rooted ) {
922 0 : rooted_child_block = child;
923 0 : } else {
924 : /* This is a minority fork. */
925 0 : subtree_abandon( sched, child );
926 0 : }
927 0 : child_idx = child->sibling_idx;
928 0 : }
929 0 : curr = rooted_child_block;
930 0 : }
931 :
932 : /* If the active block got abandoned, we need to reset it. */
933 0 : if( sched->active_block_idx==null_idx ) {
934 0 : sched->metrics->deactivate_pruned_cnt += fd_uint_if( old_active_block_idx!=null_idx, 1U, 0U );
935 0 : try_activate_block( sched );
936 0 : }
937 0 : }
938 :
939 : fd_txn_p_t *
940 0 : fd_sched_get_txn( fd_sched_t * sched, ulong txn_id ) {
941 0 : if( FD_UNLIKELY( txn_id>=FD_SCHED_MAX_DEPTH ) ) {
942 0 : return NULL;
943 0 : }
944 0 : return sched->txn_pool + txn_id;
945 0 : }
946 :
947 : fd_hash_t *
948 0 : fd_sched_get_poh( fd_sched_t * sched, fd_sched_block_id_t * block_id ) {
949 0 : fd_sched_block_t * block = block_map_ele_query( sched->block_map, block_id, NULL, sched->block_pool );
950 0 : if( FD_UNLIKELY( !block ) ) {
951 0 : FD_LOG_CRIT(( "invariant violation: block not found slot %lu, prime %lu",
952 0 : (ulong)block_id->slot, (ulong)block_id->prime ));
953 0 : }
954 0 : return &block->poh;
955 0 : }
956 :
957 : uint
958 0 : fd_sched_get_shred_cnt( fd_sched_t * sched, fd_sched_block_id_t * block_id ) {
959 0 : fd_sched_block_t * block = block_map_ele_query( sched->block_map, block_id, NULL, sched->block_pool );
960 0 : if( FD_UNLIKELY( !block ) ) {
961 0 : FD_LOG_CRIT(( "invariant violation: block not found slot %lu, prime %lu",
962 0 : (ulong)block_id->slot, (ulong)block_id->prime ));
963 0 : }
964 0 : return block->shred_cnt;
965 0 : }
966 :
967 0 : void * fd_sched_leave ( fd_sched_t * sched ) { return sched; }
968 0 : void * fd_sched_delete( void * mem ) { return mem; }
969 :
970 :
971 : /* Internal helpers. */
972 :
973 : static void
974 : add_block( fd_sched_t * sched,
975 : fd_sched_block_id_t * block_id,
976 : fd_sched_block_id_t * parent_block_id,
977 : fd_sched_block_t * * out_block,
978 0 : fd_sched_block_t * * out_parent_block ) {
979 0 : if( FD_UNLIKELY( !block_pool_free( sched->block_pool ) ) ) {
980 0 : FD_LOG_CRIT(( "block_pool is full" ));
981 0 : }
982 0 : fd_sched_block_t * block = block_pool_ele_acquire( sched->block_pool );
983 0 : block->block_id = *block_id;
984 0 : block_map_ele_insert( sched->block_map, block, sched->block_pool );
985 0 : *out_block = block;
986 :
987 0 : block->txn_parsed_cnt = 0U;
988 0 : block->txn_in_flight_cnt = 0U;
989 0 : block->txn_done_cnt = 0U;
990 0 : block->shred_cnt = 0U;
991 :
992 0 : block->mblks_rem = 0UL;
993 0 : block->txns_rem = 0UL;
994 0 : block->fec_buf_sz = 0U;
995 0 : block->fec_buf_soff = 0U;
996 0 : block->fec_eob = 0;
997 0 : block->fec_sob = 1;
998 :
999 0 : block->fec_eos = 0;
1000 0 : block->rooted = 0;
1001 0 : block->dying = 0;
1002 0 : block->in_rdisp = 0;
1003 0 : block->block_start_signaled = 0;
1004 0 : block->block_end_signaled = 0;
1005 0 : block->staged = 0;
1006 :
1007 0 : block->luf_depth = 0UL;
1008 :
1009 : /* New leaf node, no child, no sibling. */
1010 0 : ulong null_idx = block_pool_idx_null( sched->block_pool );
1011 0 : block->child_idx = null_idx;
1012 0 : block->sibling_idx = null_idx;
1013 0 : block->parent_idx = null_idx;
1014 :
1015 : /* node->parent link */
1016 0 : if( FD_LIKELY( parent_block_id ) ) {
1017 0 : fd_sched_block_t * parent_block = block_map_ele_query( sched->block_map, parent_block_id, NULL, sched->block_pool );
1018 0 : if( FD_UNLIKELY( !parent_block ) ) {
1019 0 : FD_LOG_CRIT(( "invariant violation: parent block not found slot %lu, prime %lu",
1020 0 : (ulong)parent_block_id->slot, (ulong)parent_block_id->prime ));
1021 0 : }
1022 0 : block->parent_idx = block_pool_idx( sched->block_pool, parent_block );
1023 0 : *out_parent_block = parent_block;
1024 :
1025 : /* parent->node and sibling->node links */
1026 0 : ulong child_idx = block_pool_idx( sched->block_pool, block );
1027 0 : if( FD_LIKELY( parent_block->child_idx == null_idx ) ) { /* Optimize for no forking. */
1028 0 : parent_block->child_idx = child_idx;
1029 0 : } else {
1030 0 : fd_sched_block_t * curr_block = block_pool_ele( sched->block_pool, parent_block->child_idx );
1031 0 : while( curr_block->sibling_idx != null_idx ) {
1032 0 : curr_block = block_pool_ele( sched->block_pool, curr_block->sibling_idx );
1033 0 : }
1034 0 : curr_block->sibling_idx = child_idx;
1035 0 : }
1036 :
1037 0 : if( FD_UNLIKELY( parent_block->dying ) ) {
1038 0 : block->dying = 1;
1039 0 : }
1040 0 : }
1041 0 : }
1042 :
1043 0 : #define CHECK( cond ) do { \
1044 0 : if( FD_UNLIKELY( !(cond) ) ) { \
1045 0 : return; \
1046 0 : } \
1047 0 : } while( 0 )
1048 :
1049 : /* CHECK that it is safe to read at least n more bytes. */
1050 0 : #define CHECK_LEFT( n ) CHECK( (n)<=(block->fec_buf_sz-block->fec_buf_soff) )
1051 :
1052 : /* Consume as much as possible from the buffer. By the end of this
1053 : function, we will either have residual data that is unparseable only
1054 : because it is a batch that straddles FEC set boundaries, or we will
1055 : have reached the end of a batch. In the former case, any remaining
1056 : bytes should be concatenated with the next FEC set for further
1057 : parsing. In the latter case, any remaining bytes should be thrown
1058 : away. */
1059 : static void
1060 0 : fd_sched_parse( fd_sched_t * sched, fd_sched_block_t * block, fd_sched_alut_ctx_t * alut_ctx ) {
1061 0 : while( 1 ) {
1062 0 : while( block->txns_rem>0UL ) {
1063 0 : if( FD_UNLIKELY( !fd_sched_parse_txn( sched, block, alut_ctx ) ) ) {
1064 0 : return;
1065 0 : }
1066 0 : }
1067 0 : if( block->txns_rem==0UL && block->mblks_rem>0UL ) {
1068 0 : CHECK_LEFT( sizeof(fd_microblock_hdr_t) );
1069 0 : fd_microblock_hdr_t * hdr = (fd_microblock_hdr_t *)fd_type_pun( block->fec_buf+block->fec_buf_soff );
1070 0 : block->fec_buf_soff += (uint)sizeof(fd_microblock_hdr_t);
1071 :
1072 0 : memcpy( block->poh.hash, hdr->hash, sizeof(block->poh.hash) );
1073 0 : block->txns_rem = hdr->txn_cnt;
1074 0 : block->mblks_rem--;
1075 0 : continue;
1076 0 : }
1077 0 : if( block->txns_rem==0UL && block->mblks_rem==0UL && block->fec_sob ) {
1078 0 : CHECK_LEFT( sizeof(ulong) );
1079 0 : FD_TEST( block->fec_buf_soff==0U );
1080 0 : block->mblks_rem = FD_LOAD( ulong, block->fec_buf );
1081 0 : block->fec_buf_soff += (uint)sizeof(ulong);
1082 : /* FIXME what happens if someone sends us mblks_rem==0UL here? */
1083 :
1084 0 : block->fec_sob = 0;
1085 0 : continue;
1086 0 : }
1087 0 : if( block->txns_rem==0UL && block->mblks_rem==0UL ) {
1088 0 : break;
1089 0 : }
1090 0 : }
1091 0 : if( block->fec_eob ) {
1092 : /* Ignore trailing bytes at the end of a batch. */
1093 0 : sched->metrics->bytes_ingested_unparsed_cnt += block->fec_buf_sz-block->fec_buf_soff;
1094 0 : block->fec_buf_soff = 0U;
1095 0 : block->fec_buf_sz = 0U;
1096 0 : block->fec_sob = 1;
1097 0 : block->fec_eob = 0;
1098 0 : }
1099 0 : }
1100 :
1101 : static int
1102 0 : fd_sched_parse_txn( fd_sched_t * sched, fd_sched_block_t * block, fd_sched_alut_ctx_t * alut_ctx ) {
1103 0 : fd_txn_t * txn = fd_type_pun( block->txn );
1104 :
1105 0 : ulong pay_sz = 0UL;
1106 0 : ulong txn_sz = fd_txn_parse_core( block->fec_buf+block->fec_buf_soff,
1107 0 : fd_ulong_min( FD_TXN_MTU, block->fec_buf_sz-block->fec_buf_soff ),
1108 0 : txn,
1109 0 : NULL,
1110 0 : &pay_sz );
1111 :
1112 0 : if( FD_UNLIKELY( !pay_sz || !txn_sz ) ) {
1113 : /* Can't parse out a full transaction. */
1114 0 : return 0;
1115 0 : }
1116 :
1117 : /* Try to expand ALUTs. */
1118 0 : int has_aluts = txn->transaction_version==FD_TXN_V0 && txn->addr_table_adtl_cnt>0;
1119 0 : int serializing = 0;
1120 0 : if( has_aluts ) {
1121 : /* FIXME: statically size out slot hashes decode footprint. */
1122 0 : FD_SPAD_FRAME_BEGIN( alut_ctx->runtime_spad ) {
1123 0 : fd_slot_hashes_global_t const * slot_hashes_global = fd_sysvar_slot_hashes_read( alut_ctx->funk, alut_ctx->funk_txn, alut_ctx->runtime_spad );
1124 0 : if( FD_LIKELY( slot_hashes_global ) ) {
1125 0 : fd_slot_hash_t * slot_hash = deq_fd_slot_hash_t_join( (uchar *)slot_hashes_global + slot_hashes_global->hashes_offset );
1126 0 : serializing = !!fd_runtime_load_txn_address_lookup_tables( txn, block->fec_buf+block->fec_buf_soff, alut_ctx->funk, alut_ctx->funk_txn, alut_ctx->els, slot_hash, block->aluts );
1127 0 : sched->metrics->alut_success_cnt += (uint)!serializing;
1128 0 : } else {
1129 0 : serializing = 1;
1130 0 : }
1131 0 : } FD_SPAD_FRAME_END;
1132 0 : }
1133 :
1134 0 : ulong txn_idx = fd_rdisp_add_txn( sched->rdisp, block->block_id.id, txn, block->fec_buf+block->fec_buf_soff, serializing ? NULL : block->aluts, serializing );
1135 0 : FD_TEST( txn_idx!=0UL );
1136 0 : sched->metrics->txn_parsed_cnt++;
1137 0 : sched->metrics->alut_serializing_cnt += (uint)serializing;
1138 0 : sched->txn_pool_free_cnt--;
1139 0 : fd_txn_p_t * txn_p = sched->txn_pool + txn_idx;
1140 0 : txn_p->payload_sz = pay_sz;
1141 0 : fd_memcpy( txn_p->payload, block->fec_buf+block->fec_buf_soff, pay_sz );
1142 0 : fd_memcpy( TXN(txn_p), txn, txn_sz );
1143 0 : sched->txn_to_block_idx[ txn_idx ] = block_pool_idx( sched->block_pool, block );
1144 :
1145 0 : block->fec_buf_soff += (uint)pay_sz;
1146 0 : block->txn_parsed_cnt++;
1147 0 : block->txns_rem--;
1148 0 : return 1;
1149 0 : }
1150 :
1151 : #undef CHECK
1152 : #undef CHECK_LEFT
1153 :
1154 : static void
1155 0 : try_activate_block( fd_sched_t * sched ) {
1156 0 : ulong null_idx = block_pool_idx_null( sched->block_pool );
1157 :
1158 : /* See if there are any allocated staging lanes that we can activate
1159 : for scheduling ... */
1160 0 : ulong staged_bitset = sched->staged_bitset;
1161 0 : while( staged_bitset ) {
1162 0 : int lane_idx = fd_ulong_find_lsb( staged_bitset );
1163 0 : staged_bitset = fd_ulong_pop_lsb( staged_bitset );
1164 :
1165 0 : ulong head_idx = sched->staged_head_block_idx[ lane_idx ];
1166 0 : fd_sched_block_t * head_block = block_pool_ele( sched->block_pool, head_idx );
1167 0 : fd_sched_block_t * parent_block = block_pool_ele( sched->block_pool, head_block->parent_idx );
1168 0 : if( FD_UNLIKELY( parent_block->dying ) ) {
1169 : /* Invariant: no child of a dying block should be staged. */
1170 0 : FD_LOG_CRIT(( "invariant violation: staged_head_block_idx %lu, slot %lu, prime %lu on lane %d has parent_block->dying set, slot %lu, prime %lu",
1171 0 : head_idx, (ulong)head_block->block_id.slot, (ulong)head_block->block_id.prime, lane_idx, (ulong)parent_block->block_id.slot, (ulong)parent_block->block_id.prime ));
1172 0 : }
1173 : //FIXME: restore this invariant check when we have immediate demotion of dying blocks
1174 : // if( FD_UNLIKELY( head_block->dying ) ) {
1175 : // /* Invariant: no dying block should be staged. */
1176 : // FD_LOG_CRIT(( "invariant violation: staged_head_block_idx %lu, slot %lu, prime %lu on lane %u has head_block->dying set",
1177 : // head_idx, (ulong)head_block->block_id.slot, (ulong)head_block->block_id.prime, lane_idx ));
1178 : // }
1179 0 : if( block_is_done( parent_block ) && block_is_activatable( head_block ) ) {
1180 : /* ... Yes, on this staging lane the parent block is done. So we
1181 : can switch to the staged child. */
1182 0 : sched->active_block_idx = head_idx;
1183 0 : sched->metrics->lane_switch_cnt++;
1184 0 : return;
1185 0 : }
1186 0 : }
1187 :
1188 : /* ... No, promote unstaged blocks. */
1189 0 : ulong root_idx = sched->root_idx;
1190 0 : if( FD_UNLIKELY( root_idx==null_idx ) ) {
1191 0 : FD_LOG_CRIT(( "invariant violation: root_idx==null_idx indicating fd_sched is unintialized" ));
1192 0 : }
1193 : /* Find and stage the longest stageable unstaged fork. This is a
1194 : policy decision. */
1195 0 : ulong depth = compute_longest_unstaged_fork( sched, root_idx );
1196 0 : if( FD_LIKELY( depth>0UL ) ) {
1197 0 : if( FD_UNLIKELY( sched->staged_bitset==fd_ulong_mask_lsb( FD_SCHED_MAX_STAGING_LANES ) ) ) {
1198 : /* No more staging lanes available. All of them are occupied by
1199 : slow squatters. Demote one of them. */
1200 : //FIXME implement this
1201 0 : FD_LOG_CRIT(( "unimplemented" ));
1202 0 : sched->metrics->lane_demoted_cnt++;
1203 : // sched->metrics->block_demoted_cnt++; for every demoted block
1204 0 : }
1205 0 : FD_TEST( sched->staged_bitset!=fd_ulong_mask_lsb( FD_SCHED_MAX_STAGING_LANES ) );
1206 0 : int lane_idx = fd_ulong_find_lsb( ~sched->staged_bitset );
1207 0 : if( FD_UNLIKELY( lane_idx>=(int)FD_SCHED_MAX_STAGING_LANES ) ) {
1208 0 : FD_LOG_CRIT(( "invariant violation: lane_idx %d, sched->staged_bitset %lx",
1209 0 : lane_idx, sched->staged_bitset ));
1210 0 : }
1211 0 : ulong head_block_idx = stage_longest_unstaged_fork( sched, root_idx, lane_idx );
1212 0 : if( FD_UNLIKELY( head_block_idx==null_idx ) ) {
1213 : /* We found a promotable fork depth>0. This should not happen. */
1214 0 : FD_LOG_CRIT(( "invariant violation: head_block_idx==null_idx" ));
1215 0 : }
1216 0 : sched->active_block_idx = head_block_idx;
1217 0 : return;
1218 0 : }
1219 : /* No unstaged blocks to promote. So we're done. Yay. */
1220 0 : }
1221 :
1222 : /* It's safe to call this function more than once on the same block. */
1223 : static void
1224 0 : subtree_abandon( fd_sched_t * sched, fd_sched_block_t * block ) {
1225 0 : if( FD_UNLIKELY( block->rooted ) ) {
1226 0 : FD_LOG_CRIT(( "invariant violation: rooted block should not be abandoned, slot %lu, prime %lu",
1227 0 : (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
1228 0 : }
1229 : /* All minority fork nodes pass through this function eventually. So
1230 : this is a good point to check per-node invariants for minority
1231 : forks. */
1232 0 : if( FD_UNLIKELY( block->staged && !block->in_rdisp ) ) {
1233 0 : FD_LOG_CRIT(( "invariant violation: staged block is not in the dispatcher, slot %lu, prime %lu",
1234 0 : (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
1235 0 : }
1236 :
1237 0 : ulong null_idx = block_pool_idx_null( sched->block_pool );
1238 :
1239 : /* Setting the flag is non-optional and can happen more than once. */
1240 0 : block->dying = 1;
1241 :
1242 : /* Removal from dispatcher should only happen once. */
1243 0 : if( block->in_rdisp ) {
1244 0 : fd_sched_block_t * parent = block_pool_ele( sched->block_pool, block->parent_idx );
1245 0 : if( FD_UNLIKELY( !parent ) ) {
1246 : /* Only the root has no parent. Abandon should never be called on
1247 : the root. So any block we are trying to abandon should have a
1248 : parent. */
1249 0 : FD_LOG_CRIT(( "invariant violation: parent not found slot %lu, prime %lu",
1250 0 : (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
1251 0 : }
1252 :
1253 : /* The dispatcher expects blocks to be abandoned in the same order
1254 : that they were added on each lane. There are no requirements on
1255 : the order of abandoning if two blocks are not on the same lane,
1256 : or if a block is unstaged. This means that in general we
1257 : shouldn't abandon a child block if the parent hasn't been
1258 : abandoned yet, if and only if they are on the same lane. So wait
1259 : until we can abandon the parent, and then descend down the fork
1260 : tree to ensure orderly abandoning. */
1261 0 : int abandon = !parent->in_rdisp || /* parent is not in the dispatcher */
1262 0 : !parent->staged || /* parent is in the dispatcher but not staged */
1263 0 : !block->staged || /* parent is in the dispatcher and staged but this block is unstaged */
1264 0 : block->staging_lane!=parent->staging_lane; /* this block is on a different staging lane than its parent */
1265 :
1266 : /* We inform the dispatcher of an abandon only when there are no
1267 : more in-flight transactions. Otherwise, if the dispatcher
1268 : recycles the same txn_id that was just abandoned, and we receive
1269 : completion of an in-flight transaction whose txn_id was just
1270 : recycled, we would basically be aliasing the same txn_id and end
1271 : up indexing into txn_to_block_idx[] that is already overwritten
1272 : with new blocks. */
1273 0 : abandon = abandon && block->txn_in_flight_cnt==0;
1274 :
1275 0 : if( abandon ) {
1276 0 : block->in_rdisp = 0;
1277 0 : fd_rdisp_abandon_block( sched->rdisp, block->block_id.id );
1278 0 : sched->txn_pool_free_cnt += block->txn_parsed_cnt-block->txn_done_cnt; /* in_flight_cnt==0 */
1279 0 : sched->metrics->block_abandoned_cnt++;
1280 0 : sched->metrics->txn_abandoned_parsed_cnt += block->txn_parsed_cnt;
1281 0 : sched->metrics->txn_abandoned_done_cnt += block->txn_done_cnt;
1282 :
1283 : /* Now release the staging lane. */
1284 : //FIXME when demote supports non-empty blocks, we should demote
1285 : //the block from the lane unconditionally and immediately,
1286 : //regardles of whether it's safe to abandon or not. So a block
1287 : //would go immediately from staged to unstaged and eventually to
1288 : //abandoned.
1289 0 : if( FD_LIKELY( block->staged ) ) {
1290 0 : block->staged = 0;
1291 0 : sched->staged_bitset = fd_ulong_clear_bit( sched->staged_bitset, (int)block->staging_lane );
1292 0 : sched->staged_head_block_idx[ block->staging_lane ] = null_idx;
1293 0 : }
1294 0 : }
1295 :
1296 0 : if( FD_UNLIKELY( block->staged && sched->active_block_idx==sched->staged_head_block_idx[ block->staging_lane ] ) ) {
1297 : /* Dying blocks should not be active. */
1298 0 : sched->active_block_idx = null_idx;
1299 0 : }
1300 0 : }
1301 :
1302 : /* Abandon the entire fork chaining off of this block. */
1303 0 : ulong child_idx = block->child_idx;
1304 0 : while( child_idx != null_idx ) {
1305 0 : fd_sched_block_t * child = block_pool_ele( sched->block_pool, child_idx );
1306 0 : subtree_abandon( sched, child );
1307 0 : child_idx = child->sibling_idx;
1308 0 : }
1309 0 : }
1310 :
1311 : FD_FN_UNUSED static ulong
1312 0 : find_and_stage_longest_unstaged_fork( fd_sched_t * sched, int lane_idx ) {
1313 0 : ulong null_idx = block_pool_idx_null( sched->block_pool );
1314 0 : ulong root_idx = sched->root_idx;
1315 0 :
1316 0 : if( FD_UNLIKELY( root_idx==null_idx ) ) {
1317 0 : FD_LOG_CRIT(( "invariant violation: root_idx==null_idx indicating fd_sched is unintialized" ));
1318 0 : }
1319 0 :
1320 0 : /* First pass: compute the longest unstaged fork depth for each node
1321 0 : in the fork tree. */
1322 0 : ulong depth = compute_longest_unstaged_fork( sched, root_idx );
1323 0 :
1324 0 : /* Second pass: stage blocks on the longest unstaged fork. */
1325 0 : ulong head_block_idx = stage_longest_unstaged_fork( sched, root_idx, lane_idx );
1326 0 :
1327 0 : if( FD_UNLIKELY( (depth>0UL && head_block_idx==null_idx) || (depth==0UL && head_block_idx!=null_idx) ) ) {
1328 0 : FD_LOG_CRIT(( "invariant violation: depth %lu, head_block_idx %lu",
1329 0 : depth, head_block_idx ));
1330 0 : }
1331 0 :
1332 0 : return head_block_idx;
1333 0 : }
1334 :
1335 : /* Returns length of the longest stageable unstaged fork, if there is
1336 : one, and 0 otherwise. */
1337 : static ulong
1338 0 : compute_longest_unstaged_fork( fd_sched_t * sched, ulong block_idx ) {
1339 0 : ulong null_idx = block_pool_idx_null( sched->block_pool );
1340 0 : if( FD_UNLIKELY( block_idx==null_idx ) ) {
1341 0 : FD_LOG_CRIT(( "invariant violation: block_idx==null_idx" ));
1342 0 : }
1343 :
1344 0 : fd_sched_block_t * block = block_pool_ele( sched->block_pool, block_idx );
1345 :
1346 0 : ulong max_child_depth = 0UL;
1347 0 : ulong child_idx = block->child_idx;
1348 0 : while( child_idx!=null_idx ) {
1349 0 : ulong child_depth = compute_longest_unstaged_fork( sched, child_idx );
1350 0 : if( child_depth > max_child_depth ) {
1351 0 : max_child_depth = child_depth;
1352 0 : }
1353 0 : fd_sched_block_t * child = block_pool_ele( sched->block_pool, child_idx );
1354 0 : child_idx = child->sibling_idx;
1355 0 : }
1356 :
1357 0 : block->luf_depth = max_child_depth + fd_ulong_if( block_is_promotable( block ), 1UL, 0UL );
1358 0 : return block->luf_depth;
1359 0 : }
1360 :
1361 : static ulong
1362 0 : stage_longest_unstaged_fork_helper( fd_sched_t * sched, ulong block_idx, int lane_idx ) {
1363 0 : ulong null_idx = block_pool_idx_null( sched->block_pool );
1364 0 : if( FD_UNLIKELY( block_idx==null_idx ) ) {
1365 0 : FD_LOG_CRIT(( "invariant violation: block_idx==null_idx" ));
1366 0 : }
1367 :
1368 0 : fd_sched_block_t * block = block_pool_ele( sched->block_pool, block_idx );
1369 :
1370 0 : int stage_it = fd_int_if( block_is_promotable( block ), 1, 0 );
1371 0 : ulong rv = fd_ulong_if( stage_it, block_idx, null_idx );
1372 0 : if( FD_LIKELY( stage_it ) ) {
1373 0 : block->staged = 1;
1374 0 : block->staging_lane = (ulong)lane_idx;
1375 0 : fd_rdisp_promote_block( sched->rdisp, block->block_id.id, block->staging_lane );
1376 0 : sched->metrics->block_promoted_cnt++;
1377 0 : }
1378 :
1379 : /* Base case: leaf node. */
1380 0 : if( block->child_idx==null_idx ) return rv;
1381 :
1382 0 : ulong max_depth = 0UL;
1383 0 : ulong best_child_idx = null_idx;
1384 0 : ulong child_idx = block->child_idx;
1385 0 : while( child_idx!=null_idx ) {
1386 0 : fd_sched_block_t * child = block_pool_ele( sched->block_pool, child_idx );
1387 0 : if( child->luf_depth>max_depth ) {
1388 0 : max_depth = child->luf_depth;
1389 0 : best_child_idx = child_idx;
1390 0 : }
1391 0 : child_idx = child->sibling_idx;
1392 0 : }
1393 :
1394 : /* Recursively stage descendants. */
1395 0 : if( best_child_idx!=null_idx ) {
1396 0 : ulong head_block_idx = stage_longest_unstaged_fork_helper( sched, best_child_idx, lane_idx );
1397 0 : rv = fd_ulong_if( rv!=null_idx, rv, head_block_idx );
1398 0 : }
1399 :
1400 0 : return rv;
1401 0 : }
1402 :
1403 : /* Returns idx of head block of staged lane on success, idx_null
1404 : otherwise. */
1405 : static ulong
1406 0 : stage_longest_unstaged_fork( fd_sched_t * sched, ulong block_idx, int lane_idx ) {
1407 0 : ulong head_block_idx = stage_longest_unstaged_fork_helper( sched, block_idx, lane_idx );
1408 0 : if( FD_LIKELY( head_block_idx!=block_pool_idx_null( sched->block_pool ) ) ) {
1409 0 : sched->metrics->lane_promoted_cnt++;
1410 0 : sched->staged_bitset = fd_ulong_set_bit( sched->staged_bitset, lane_idx );
1411 0 : sched->staged_head_block_idx[ lane_idx ] = head_block_idx;
1412 0 : }
1413 0 : return head_block_idx;
1414 0 : }
|