Line data Source code
1 : #include "fd_replay.h"
2 :
3 : void
4 0 : leader_pipeline( void ) {
5 0 : return;
6 0 : }
7 :
8 : // after_credit
9 : //
10 : /**********************************************************************/
11 : /* Consensus: decide (1) the fork for pack; (2) the fork to vote on */
12 : /**********************************************************************/
13 :
14 : // ulong reset_slot = fd_tower_reset_slot( ctx->tower, ctx->epoch, ctx->ghost );
15 : // fd_fork_t const * reset_fork = fd_forks_query_const( ctx->forks, reset_slot );
16 : // if( FD_UNLIKELY( !reset_fork ) ) {
17 : // FD_LOG_ERR( ( "failed to find reset fork %lu", reset_slot ) );
18 : // }
19 : // if( reset_fork->lock ) {
20 : // FD_LOG_WARNING(("RESET FORK FROZEN: %lu", reset_fork->slot ));
21 : // fd_fork_t * new_reset_fork = fd_forks_prepare( ctx->forks, reset_fork->slot_ctx->slot_bank.prev_slot, ctx->funk,
22 : // ctx->blockstore, ctx->epoch_ctx, ctx->runtime_spad );
23 : // new_reset_fork->lock = 0;
24 : // reset_fork = new_reset_fork;
25 : // }
26 :
27 : // ulong bank_idx = ctx->bank_idx;
28 : // ulong txn_cnt = ctx->txn_cnt;
29 : // fd_replay_out_ctx_t * bank_out = &ctx->bank_out[ bank_idx ];
30 : // fd_txn_p_t * txns = (fd_txn_p_t *)fd_chunk_to_laddr( bank_out->mem, bank_out->chunk );
31 :
32 : // if( FD_UNLIKELY( !ctx->startup_init_done && ctx->replay_plugin_out_mem ) ) {
33 : // ctx->startup_init_done = 1;
34 : // uchar msg[ 56 ];
35 : // fd_memset( msg, 0, sizeof(msg) );
36 : // msg[ 0 ] = 11; // ValidatorStartProgress::Running
37 : // replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_START_PROGRESS, msg, sizeof(msg) );
38 : // }
39 :
40 : // /* Update the gui */
41 : // if( ctx->replay_plugin_out_mem ) {
42 : // /* FIXME. We need a more efficient way to compute the ancestor chain. */
43 : // uchar msg[4098*8] __attribute__( ( aligned( 8U ) ) );
44 : // fd_memset( msg, 0, sizeof(msg) );
45 : // ulong s = reset_fork->slot_ctx->slot;
46 : // *(ulong*)(msg + 16U) = s;
47 : // ulong i = 0;
48 : // do {
49 : // if( !fd_blockstore_block_info_test( ctx->blockstore, s ) ) {
50 : // break;
51 : // }
52 : // s = fd_blockstore_parent_slot_query( ctx->blockstore, s );
53 : // if( s < ctx->blockstore->shmem->wmk ) {
54 : // break;
55 : // }
56 :
57 : // *(ulong*)(msg + 24U + i*8U) = s;
58 : // if( ++i == 4095U ) {
59 : // break;
60 : // }
61 : // } while( 1 );
62 : // *(ulong*)(msg + 8U) = i;
63 : // replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_SLOT_RESET, msg, sizeof(msg) );
64 : // }
65 :
66 : // fd_microblock_trailer_t * microblock_trailer = (fd_microblock_trailer_t *)(txns + txn_cnt);
67 : // memcpy( microblock_trailer->hash, reset_fork->slot_ctx->slot_bank.block_hash_queue.last_hash->uc, sizeof(fd_hash_t) );
68 : // if( ctx->poh_init_done == 1 ) {
69 : // ulong parent_slot = reset_fork->slot_ctx->slot_bank.prev_slot;
70 : // ulong curr_slot = reset_fork->slot_ctx->slot;
71 : // FD_LOG_DEBUG(( "publishing mblk to poh - slot: %lu, parent_slot: %lu, flags: %lx", curr_slot, parent_slot, flags ));
72 : // ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
73 : // ulong sig = fd_disco_replay_old_sig( curr_slot, flags );
74 : // fd_mcache_publish( bank_out->mcache, bank_out->depth, bank_out->seq, sig, bank_out->chunk, txn_cnt, 0UL, 0, tspub );
75 : // bank_out->chunk = fd_dcache_compact_next( bank_out->chunk, (txn_cnt * sizeof(fd_txn_p_t)) + sizeof(fd_microblock_trailer_t), bank_out->chunk0, bank_out->wmark );
76 : // bank_out->seq = fd_seq_inc( bank_out->seq, 1UL );
77 : // } else {
78 : // FD_LOG_DEBUG(( "NOT publishing mblk to poh - slot: %lu, parent_slot: %lu, flags: %lx", curr_slot, ctx->parent_slot, flags ));
79 : // }
80 :
81 : // during_frag
82 : //
83 : // if( in_idx == PACK_IN_IDX ) {
84 : // if( FD_UNLIKELY( chunk<ctx->pack_in_chunk0 || chunk>ctx->pack_in_wmark || sz>USHORT_MAX ) ) {
85 : // FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->pack_in_chunk0, ctx->pack_in_wmark ));
86 : // }
87 : // uchar * src = (uchar *)fd_chunk_to_laddr( ctx->pack_in_mem, chunk );
88 : // /* Incoming packet from pack tile. Format:
89 : // Microblock as a list of fd_txn_p_t (sz * sizeof(fd_txn_p_t))
90 : // Microblock bank trailer
91 : // */
92 : // ctx->curr_slot = fd_disco_poh_sig_slot( sig );
93 : // if( FD_UNLIKELY( ctx->curr_slot < fd_fseq_query( ctx->published_wmark ) ) ) {
94 : // FD_LOG_WARNING(( "pack sent slot %lu before our watermark %lu.", ctx->curr_slot, fd_fseq_query( ctx->published_wmark ) ));
95 : // }
96 : // if( fd_disco_poh_sig_pkt_type( sig )==POH_PKT_TYPE_MICROBLOCK ) {
97 : // ulong bank_idx = fd_disco_poh_sig_bank_tile( sig );
98 : // fd_replay_out_ctx_t * bank_out = &ctx->bank_out[ bank_idx ];
99 : // uchar * dst_poh = fd_chunk_to_laddr( bank_out->mem, bank_out->chunk );
100 : // ctx->flags = REPLAY_FLAG_PACKED_MICROBLOCK;
101 : // ctx->txn_cnt = (sz - sizeof(fd_microblock_bank_trailer_t)) / sizeof(fd_txn_p_t);
102 : // ctx->bank_idx = bank_idx;
103 : // fd_memcpy( dst_poh, src, (sz - sizeof(fd_microblock_bank_trailer_t)) );
104 : // src += (sz-sizeof(fd_microblock_bank_trailer_t));
105 : // dst_poh += (sz - sizeof(fd_microblock_bank_trailer_t));
106 : // fd_microblock_bank_trailer_t * t = (fd_microblock_bank_trailer_t *)src;
107 : // ctx->parent_slot = (ulong)t->bank;
108 : // } else {
109 : // FD_LOG_WARNING(("OTHER PACKET TYPE: %lu", fd_disco_poh_sig_pkt_type( sig )));
110 : // ctx->skip_frag = 1;
111 : // return;
112 : // }
113 :
114 : // FD_LOG_DEBUG(( "packed microblock - slot: %lu, parent_slot: %lu, txn_cnt: %lu", ctx->curr_slot, ctx->parent_slot, ctx->txn_cnt ));
115 : // }
116 : // if( ctx->flags & REPLAY_FLAG_PACKED_MICROBLOCK ) {
117 : // /* We do not know the parent slot, pick one from fork selection */
118 : // ulong max_slot = 0; /* FIXME: default to snapshot slot/smr */
119 : // for( fd_fork_frontier_iter_t iter = fd_fork_frontier_iter_init( ctx->forks->frontier, ctx->forks->pool );
120 : // !fd_fork_frontier_iter_done( iter, ctx->forks->frontier, ctx->forks->pool );
121 : // iter = fd_fork_frontier_iter_next( iter, ctx->forks->frontier, ctx->forks->pool ) ) {
122 : // fd_exec_slot_ctx_t * ele = &fd_fork_frontier_iter_ele( iter, ctx->forks->frontier, ctx->forks->pool )->slot_ctx;
123 : // if ( max_slot < ele->slot_bank.slot ) {
124 : // max_slot = ele->slot_bank.slot;
125 : // }
126 : // }
127 : // ctx->parent_slot = max_slot;
128 : // }
129 :
130 : /*uchar block_flags = 0;
131 : int err = FD_MAP_ERR_AGAIN;
132 : while( err == FD_MAP_ERR_AGAIN ){
133 : fd_block_map_query_t quer[1] = { 0 };
134 : err = fd_block_map_query_try( ctx->blockstore->block_map, &ctx->curr_slot, NULL, quer, 0 );
135 : fd_block_info_t * block_info = fd_block_map_query_ele( quer );
136 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
137 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY )) break;
138 : block_flags = block_info->flags;
139 : err = fd_block_map_query_test( quer );
140 : }
141 :
142 : if( FD_UNLIKELY( fd_uchar_extract_bit( block_flags, FD_BLOCK_FLAG_PROCESSED ) ) ) {
143 : FD_LOG_WARNING(( "block already processed - slot: %lu", ctx->curr_slot ));
144 : ctx->skip_frag = 1;
145 : }
146 : if( FD_UNLIKELY( fd_uchar_extract_bit( block_flags, FD_BLOCK_FLAG_DEADBLOCK ) ) ) {
147 : FD_LOG_WARNING(( "block already dead - slot: %lu", ctx->curr_slot ));
148 : ctx->skip_frag = 1;
149 : }*/
150 :
151 : // after_frag
152 : //
153 : // /**********************************************************************/
154 : // /* The rest of after_frag replays some microblocks in block curr_slot */
155 : // /**********************************************************************/
156 :
157 : // ulong curr_slot = ctx->curr_slot;
158 : // ulong flags = ctx->flags;
159 : // ulong bank_idx = ctx->bank_idx;
160 :
161 : // fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &ctx->curr_slot, NULL, ctx->forks->pool );
162 :
163 : // /**********************************************************************/
164 : // /* Execute the transactions which were gathered */
165 : // /**********************************************************************/
166 :
167 : // ulong txn_cnt = ctx->txn_cnt;
168 : // fd_replay_out_ctx_t * bank_out = &ctx->bank_out[ bank_idx ];
169 : // fd_txn_p_t * txns = (fd_txn_p_t *)fd_chunk_to_laddr( bank_out->mem, bank_out->chunk );
170 :
171 : // //Execute all txns which were successfully prepared
172 : // ctx->metrics.slot = curr_slot;
173 : // if( flags & REPLAY_FLAG_PACKED_MICROBLOCK ) {
174 : // /* TODO: The leader pipeline execution needs to be optimized. This is
175 : // very hacky and suboptimal. First, wait for the tpool workers to be idle.
176 : // Then, execute the transactions, and notify the pack tile. We should be
177 : // taking advantage of bank_busy flags.
178 :
179 : // FIXME: It is currently not working and the below commented out
180 : // code corresponds to executing packed microblocks. */
181 :
182 : // // for( ulong i=1UL; i<ctx->exec_spad_cnt; i++ ) {
183 : // // fd_tpool_wait( ctx->tpool, i );
184 : // // }
185 :
186 : // // fd_runtime_process_txns_in_microblock_stream( ctx->slot_ctx,
187 : // // ctx->capture_ctx,
188 : // // txns,
189 : // // txn_cnt,
190 : // // ctx->tpool,
191 : // // ctx->exec_spads,
192 : // // ctx->exec_spad_cnt,
193 : // // ctx->runtime_spad,
194 : // // NULL );
195 :
196 : // fd_microblock_trailer_t * microblock_trailer = (fd_microblock_trailer_t *)(txns + txn_cnt);
197 :
198 : // hash_transactions( ctx->bmtree[ bank_idx ], txns, txn_cnt, microblock_trailer->hash );
199 :
200 : // ulong sig = fd_disco_replay_old_sig( curr_slot, flags );
201 : // fd_mcache_publish( bank_out->mcache, bank_out->depth, bank_out->seq, sig, bank_out->chunk, txn_cnt, 0UL, 0UL, 0UL );
202 : // bank_out->chunk = fd_dcache_compact_next( bank_out->chunk, (txn_cnt * sizeof(fd_txn_p_t)) + sizeof(fd_microblock_trailer_t), bank_out->chunk0, bank_out->wmark );
203 : // bank_out->seq = fd_seq_inc( bank_out->seq, 1UL );
204 :
205 : // /* Indicate to pack tile we are done processing the transactions so it
206 : // can pack new microblocks using these accounts. DO NOT USE THE
207 : // SANITIZED TRANSACTIONS AFTER THIS POINT, THEY ARE NO LONGER VALID. */
208 : // fd_fseq_update( ctx->bank_busy[ bank_idx ], seq );
209 :
210 : // publish_account_notifications( ctx, fork, curr_slot, txns, txn_cnt );
211 : // }
212 :
213 : // /**********************************************************************/
214 : // /* Init PoH if it is ready */
215 : // /**********************************************************************/
216 :
217 : // if( FD_UNLIKELY( !(flags & REPLAY_FLAG_CATCHING_UP) && ctx->poh_init_done == 0 && ctx->blockstore ) ) {
218 : // init_poh( ctx );
219 : // }
220 :
221 : // /**********************************************************************/
222 : // /* Publish mblk to POH */
223 : // /**********************************************************************/
224 :
225 : // if( ctx->poh_init_done == 1 && !( flags & REPLAY_FLAG_FINISHED_BLOCK )
226 : // && ( ( flags & REPLAY_FLAG_MICROBLOCK ) ) ) {
227 : // // FD_LOG_INFO(( "publishing mblk to poh - slot: %lu, parent_slot: %lu", curr_slot, ctx->parent_slot ));
228 : // ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
229 : // ulong sig = fd_disco_replay_old_sig( curr_slot, flags );
230 : // fd_mcache_publish( bank_out->mcache, bank_out->depth, bank_out->seq, sig, bank_out->chunk, txn_cnt, 0UL, tsorig, tspub );
231 : // bank_out->chunk = fd_dcache_compact_next( bank_out->chunk, (txn_cnt * sizeof(fd_txn_p_t)) + sizeof(fd_microblock_trailer_t), bank_out->chunk0, bank_out->wmark );
232 : // bank_out->seq = fd_seq_inc( bank_out->seq, 1UL );
233 : // } else {
234 : // FD_LOG_DEBUG(( "NOT publishing mblk to poh - slot: %lu, parent_slot: %lu, flags: %lx", curr_slot, ctx->parent_slot, flags ));
235 : // }
|