Line data Source code
1 : #include "fd_policy.h"
2 : #include "../../disco/metrics/fd_metrics.h"
3 :
4 : #define NONCE_NULL (UINT_MAX)
5 0 : #define DEFER_REPAIR_MS (200UL)
6 0 : #define TARGET_TICK_PER_SLOT (64.0)
7 0 : #define MS_PER_TICK (400.0 / TARGET_TICK_PER_SLOT)
8 :
9 : void *
10 0 : fd_policy_new( void * shmem, ulong dedup_max, ulong peer_max, ulong seed ) {
11 :
12 0 : if( FD_UNLIKELY( !shmem ) ) {
13 0 : FD_LOG_WARNING(( "NULL mem" ));
14 0 : return NULL;
15 0 : }
16 :
17 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_policy_align() ) ) ) {
18 0 : FD_LOG_WARNING(( "misaligned mem" ));
19 0 : return NULL;
20 0 : }
21 :
22 0 : ulong footprint = fd_policy_footprint( dedup_max, peer_max );
23 0 : fd_memset( shmem, 0, footprint );
24 :
25 0 : int lg_peer_max = fd_ulong_find_msb( fd_ulong_pow2_up( peer_max ) );
26 0 : FD_SCRATCH_ALLOC_INIT( l, shmem );
27 0 : fd_policy_t * policy = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_align(), sizeof(fd_policy_t) );
28 0 : void * dedup_map = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_map_align(), fd_policy_dedup_map_footprint ( dedup_max ) );
29 0 : void * dedup_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_pool_align(), fd_policy_dedup_pool_footprint( dedup_max ) );
30 0 : void * dedup_lru = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_lru_align(), fd_policy_dedup_lru_footprint ( ) );
31 0 : void * peers = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_peer_map_align(), fd_policy_peer_map_footprint ( lg_peer_max ) );
32 0 : void * peers_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_peer_pool_align(), fd_peer_pool_footprint ( peer_max ) );
33 0 : void * peers_dlist = FD_SCRATCH_ALLOC_APPEND( l, fd_peer_dlist_align(), fd_peer_dlist_footprint ( ) );
34 0 : FD_TEST( FD_SCRATCH_ALLOC_FINI( l, fd_policy_align() ) == (ulong)shmem + footprint );
35 :
36 0 : policy->dedup.map = fd_policy_dedup_map_new ( dedup_map, dedup_max, seed );
37 0 : policy->dedup.pool = fd_policy_dedup_pool_new( dedup_pool, dedup_max );
38 0 : policy->dedup.lru = fd_policy_dedup_lru_new ( dedup_lru );
39 0 : policy->peers.map = fd_policy_peer_map_new ( peers, lg_peer_max );
40 0 : policy->peers.pool = fd_peer_pool_new ( peers_pool, peer_max );
41 0 : policy->peers.dlist = fd_peer_dlist_new ( peers_dlist );
42 0 : policy->iterf.ele_idx = ULONG_MAX;
43 0 : policy->turbine_slot0 = ULONG_MAX;
44 0 : policy->tsreset = 0;
45 0 : policy->nonce = 1;
46 :
47 0 : return shmem;
48 0 : }
49 :
50 : fd_policy_t *
51 0 : fd_policy_join( void * shpolicy ) {
52 0 : fd_policy_t * policy = (fd_policy_t *)shpolicy;
53 :
54 0 : if( FD_UNLIKELY( !policy ) ) {
55 0 : FD_LOG_WARNING(( "NULL policy" ));
56 0 : return NULL;
57 0 : }
58 :
59 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)policy, fd_policy_align() ) ) ) {
60 0 : FD_LOG_WARNING(( "misaligned policy" ));
61 0 : return NULL;
62 0 : }
63 :
64 0 : fd_wksp_t * wksp = fd_wksp_containing( policy );
65 0 : if( FD_UNLIKELY( !wksp ) ) {
66 0 : FD_LOG_WARNING(( "policy must be part of a workspace" ));
67 0 : return NULL;
68 0 : }
69 :
70 0 : policy->dedup.map = fd_policy_dedup_map_join ( policy->dedup.map );
71 0 : policy->dedup.pool = fd_policy_dedup_pool_join( policy->dedup.pool );
72 0 : policy->dedup.lru = fd_policy_dedup_lru_join ( policy->dedup.lru );
73 0 : policy->peers.map = fd_policy_peer_map_join ( policy->peers.map );
74 0 : policy->peers.pool = fd_peer_pool_join ( policy->peers.pool );
75 0 : policy->peers.dlist = fd_peer_dlist_join ( policy->peers.dlist );
76 0 : policy->peers.iter = fd_peer_dlist_iter_fwd_init( policy->peers.dlist, policy->peers.pool );
77 :
78 0 : return policy;
79 0 : }
80 :
81 : void *
82 0 : fd_policy_leave( fd_policy_t const * policy ) {
83 :
84 0 : if( FD_UNLIKELY( !policy ) ) {
85 0 : FD_LOG_WARNING(( "NULL policy" ));
86 0 : return NULL;
87 0 : }
88 :
89 0 : return (void *)policy;
90 0 : }
91 :
92 : void *
93 0 : fd_policy_delete( void * policy ) {
94 :
95 0 : if( FD_UNLIKELY( !policy ) ) {
96 0 : FD_LOG_WARNING(( "NULL policy" ));
97 0 : return NULL;
98 0 : }
99 :
100 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)policy, fd_policy_align() ) ) ) {
101 0 : FD_LOG_WARNING(( "misaligned policy" ));
102 0 : return NULL;
103 0 : }
104 :
105 0 : return policy;
106 0 : }
107 :
108 : /* dedup_evict evicts the first element returned by the map iterator. */
109 :
110 : static void
111 0 : dedup_evict( fd_policy_t * policy ) {
112 0 : fd_policy_dedup_ele_t * ele = fd_policy_dedup_lru_ele_pop_head( policy->dedup.lru, policy->dedup.pool );
113 0 : fd_policy_dedup_map_ele_remove( policy->dedup.map, &ele->key, NULL, policy->dedup.pool );
114 0 : fd_policy_dedup_pool_ele_release( policy->dedup.pool, ele );
115 0 : }
116 :
117 : /* dedup_next returns 1 if key is deduped, 0 otherwise. */
118 : static int
119 0 : dedup_next( fd_policy_t * policy, ulong key, long now ) {
120 0 : fd_policy_dedup_t * dedup = &policy->dedup;
121 0 : fd_policy_dedup_ele_t * ele = fd_policy_dedup_map_ele_query( dedup->map, &key, NULL, dedup->pool );
122 0 : if( FD_UNLIKELY( !ele ) ) {
123 0 : if( FD_UNLIKELY( !fd_policy_dedup_pool_free( dedup->pool ) ) ) dedup_evict( policy );
124 0 : ele = fd_policy_dedup_pool_ele_acquire( dedup->pool );
125 0 : ele->key = key;
126 0 : ele->req_ts = 0;
127 0 : fd_policy_dedup_map_ele_insert ( dedup->map, ele, dedup->pool );
128 0 : fd_policy_dedup_lru_ele_push_tail( dedup->lru, ele, dedup->pool );
129 0 : }
130 0 : if( FD_LIKELY( now < ele->req_ts + (long)80e6 ) ) {
131 0 : return 1;
132 0 : }
133 0 : ele->req_ts = now;
134 0 : return 0;
135 0 : }
136 :
137 0 : static ulong ts_ms( long wallclock ) {
138 0 : return (ulong)wallclock / (ulong)1e6;
139 0 : }
140 :
141 : static int
142 0 : passes_throttle_threshold( fd_policy_t * policy, fd_forest_blk_t * ele ) {
143 0 : if( FD_UNLIKELY( ele->slot < policy->turbine_slot0 ) ) return 1;
144 : /* Essentially is checking if current duration of block ( from the
145 : first shred received until now ) is greater than the highest tick
146 : received + 200ms. */
147 0 : double current_duration = (double)(fd_tickcount() - ele->first_shred_ts) / fd_tempo_tick_per_ns(NULL);
148 0 : double tick_plus_buffer = (ele->est_buffered_tick_recv * MS_PER_TICK + DEFER_REPAIR_MS) * 1e6; // change to 400e6 for a slot duration policy
149 :
150 0 : if( current_duration >= tick_plus_buffer ){
151 0 : FD_MCNT_INC( REPAIR, EAGER_REPAIR_AGGRESSES, 1 );
152 0 : return 1;
153 0 : }
154 0 : return 0;
155 0 : }
156 :
157 : fd_pubkey_t const *
158 0 : fd_policy_peer_select( fd_policy_t * policy ) {
159 0 : fd_peer_dlist_t * dlist = policy->peers.dlist;
160 0 : fd_peer_t * pool = policy->peers.pool;
161 0 : if( FD_UNLIKELY( fd_peer_dlist_iter_done( policy->peers.iter, dlist, pool ) ) ) {
162 0 : policy->peers.iter = fd_peer_dlist_iter_fwd_init( dlist, pool );
163 0 : }
164 0 : fd_peer_t * select = fd_peer_dlist_iter_ele( policy->peers.iter, dlist, pool );
165 0 : policy->peers.iter = fd_peer_dlist_iter_fwd_next( policy->peers.iter, dlist, pool );
166 0 : return &select->identity;
167 0 : }
168 :
169 : fd_repair_msg_t const *
170 0 : fd_policy_next( fd_policy_t * policy, fd_forest_t * forest, fd_repair_t * repair, long now, ulong highest_known_slot ) {
171 0 : fd_forest_blk_t * pool = fd_forest_pool( forest );
172 0 : fd_forest_subtrees_t * subtrees = fd_forest_subtrees( forest );
173 :
174 0 : if( FD_UNLIKELY( forest->root == ULONG_MAX ) ) return NULL;
175 0 : if( FD_UNLIKELY( fd_peer_pool_used( policy->peers.pool ) == 0 ) ) return NULL;
176 :
177 0 : fd_repair_msg_t * out = NULL;
178 0 : ulong now_ms = ts_ms( now );
179 :
180 0 : if( FD_UNLIKELY( forest->subtree_cnt > 0 ) ) {
181 0 : for( fd_forest_subtrees_iter_t iter = fd_forest_subtrees_iter_init( subtrees, pool );
182 0 : !fd_forest_subtrees_iter_done( iter, subtrees, pool );
183 0 : iter = fd_forest_subtrees_iter_next( iter, subtrees, pool ) ) {
184 0 : fd_forest_blk_t * orphan = fd_forest_subtrees_iter_ele( iter, subtrees, pool );
185 0 : ulong key = fd_policy_dedup_key( FD_REPAIR_KIND_ORPHAN, orphan->slot, UINT_MAX );
186 0 : if( FD_UNLIKELY( !dedup_next( policy, key, now ) ) ) {
187 0 : out = fd_repair_orphan( repair, fd_policy_peer_select( policy ), now_ms, policy->nonce, orphan->slot );
188 0 : policy->nonce++;
189 0 : return out;
190 0 : }
191 0 : }
192 0 : }
193 :
194 : /* Every so often we'll need to reset the frontier iterator to the
195 : head of frontier, because we could end up traversing down a very
196 : long tree if we are far behind. */
197 :
198 0 : if( FD_UNLIKELY( now_ms - policy->tsreset > 100UL /* ms */ ||
199 0 : policy->iterf.frontier_ver != fd_fseq_query( fd_forest_ver_const( forest ) ) ) ) {
200 0 : fd_policy_reset( policy, forest );
201 0 : }
202 :
203 0 : fd_forest_blk_t * ele = fd_forest_pool_ele( pool, policy->iterf.ele_idx );
204 0 : if( FD_UNLIKELY( !ele ) ) {
205 : // This happens when we are fully caught up i.e. we have all the shreds of every slot we know about.
206 0 : return NULL;
207 0 : }
208 :
209 : /* When we are at the head of the turbine, we should give turbine the
210 : chance to complete the shreds. Agave waits 200ms from the
211 : estimated "correct time" of the highest shred received to repair.
212 : i.e. if we've received the first 200 shreds, the 200th has a tick
213 : of x. Translate that to millis, and we should wait to request shred
214 : 201 until x + 200ms. If we have a hole, i.e. first 200 shreds
215 : receive except shred 100, and the 101th shred has a tick of y, we
216 : should wait until y + 200ms to request shred 100.
217 :
218 : At the start of the loop, the policy iterf is valid and requestable.
219 : At the end of the loop, the policy iterf has been advanced to the
220 : next valid requestable element. */
221 :
222 0 : int req_made = 0;
223 0 : while( !req_made ) {
224 0 : ele = fd_forest_pool_ele( pool, policy->iterf.ele_idx );
225 :
226 0 : if( FD_UNLIKELY( !passes_throttle_threshold( policy, ele ) ) ) {
227 : /* We are not ready to repair this slot yet. But it's possible we
228 : have another fork that we need to repair... so we just
229 : should skip to the next SLOT in the consumed iterator. The
230 : likelihood that this ele is the head of turbine is high, which
231 : means that the shred_idx of the iterf is likely to be UINT_MAX,
232 : which means calling fd_forest_iter_next will advance the iterf
233 : to the next slot. */
234 0 : policy->iterf.shred_idx = UINT_MAX; // heinous... i'm sorry
235 0 : policy->iterf = fd_forest_iter_next( policy->iterf, forest );
236 0 : if( FD_UNLIKELY( fd_forest_iter_done( policy->iterf, forest ) ) ) {
237 0 : policy->iterf = fd_forest_iter_init( forest );
238 0 : break;
239 0 : }
240 0 : continue;
241 0 : }
242 :
243 0 : if( FD_UNLIKELY( policy->iterf.shred_idx == UINT_MAX ) ) {
244 0 : ulong key = fd_policy_dedup_key( FD_REPAIR_KIND_HIGHEST_SHRED, ele->slot, 0 );
245 0 : if( FD_UNLIKELY( ele->slot < highest_known_slot && !dedup_next( policy, key, now ) ) ) {
246 : // We'll never know the the highest shred for the current turbine slot, so there's no point in requesting it.
247 0 : out = fd_repair_highest_shred( repair, fd_policy_peer_select( policy ), now_ms, policy->nonce, ele->slot, 0 );
248 0 : policy->nonce++;
249 0 : req_made = 1;
250 0 : }
251 0 : } else {
252 0 : ulong key = fd_policy_dedup_key( FD_REPAIR_KIND_SHRED, ele->slot, policy->iterf.shred_idx );
253 0 : if( FD_UNLIKELY( !dedup_next( policy, key, now ) ) ) {
254 0 : out = fd_repair_shred( repair, fd_policy_peer_select( policy ), now_ms, policy->nonce, ele->slot, policy->iterf.shred_idx );
255 0 : policy->nonce++;
256 0 : if( FD_UNLIKELY( ele->first_req_ts == 0 ) ) ele->first_req_ts = fd_tickcount();
257 0 : req_made = 1;
258 0 : }
259 0 : }
260 :
261 : /* Even if we have a request ready, we need to advance the iterator.
262 : Otherwise on the next call of policy_next, we'll try to re-request the
263 : same shred and it will get deduped. */
264 :
265 0 : policy->iterf = fd_forest_iter_next( policy->iterf, forest );
266 0 : if( FD_UNLIKELY( fd_forest_iter_done( policy->iterf, forest ) ) ) {
267 0 : policy->iterf = fd_forest_iter_init( forest );
268 0 : break;
269 0 : }
270 0 : }
271 :
272 0 : if( FD_UNLIKELY( !req_made ) ) return NULL;
273 0 : return out;
274 0 : }
275 :
276 : fd_policy_peer_t const *
277 0 : fd_policy_peer_insert( fd_policy_t * policy, fd_pubkey_t const * key, fd_ip4_port_t const * addr ) {
278 0 : fd_policy_peer_t * peer_map = policy->peers.map;
279 0 : fd_policy_peer_t * peer = fd_policy_peer_map_query( peer_map, *key, NULL );
280 0 : if( FD_UNLIKELY( !peer && fd_policy_peer_map_key_cnt( peer_map ) < fd_policy_peer_map_key_max( peer_map ) ) ) {
281 0 : peer = fd_policy_peer_map_insert( policy->peers.map, *key );
282 0 : peer->key = *key;
283 0 : peer->ip4 = addr->addr;
284 0 : peer->port = addr->port;
285 0 : peer->req_cnt = 0;
286 0 : peer->res_cnt = 0;
287 0 : peer->first_req_ts = 0;
288 0 : peer->last_req_ts = 0;
289 0 : peer->first_resp_ts = 0;
290 0 : peer->last_resp_ts = 0;
291 0 : peer->total_lat = 0;
292 0 : peer->stake = 0;
293 :
294 0 : fd_peer_t * peer_ele = fd_peer_pool_ele_acquire( policy->peers.pool );
295 0 : peer->pool_idx = fd_peer_pool_idx( policy->peers.pool, peer_ele );
296 0 : peer_ele->identity = *key;
297 0 : fd_peer_dlist_ele_push_tail( policy->peers.dlist, peer_ele, policy->peers.pool );
298 0 : return peer;
299 0 : }
300 0 : return NULL;
301 0 : }
302 :
303 : fd_policy_peer_t *
304 0 : fd_policy_peer_query( fd_policy_t * policy, fd_pubkey_t const * key ) {
305 0 : return fd_policy_peer_map_query( policy->peers.map, *key, NULL );
306 0 : }
307 :
308 : int
309 0 : fd_policy_peer_remove( fd_policy_t * policy, fd_pubkey_t const * key ) {
310 0 : fd_policy_peer_t * peer = fd_policy_peer_map_query( policy->peers.map, *key, NULL );
311 0 : if( FD_UNLIKELY( !peer ) ) return 0;
312 0 : fd_peer_t * peer_ele = fd_peer_pool_ele( policy->peers.pool, peer->pool_idx );
313 0 : fd_policy_peer_map_remove( policy->peers.map, peer );
314 :
315 0 : if( FD_UNLIKELY( policy->peers.iter == fd_peer_pool_idx( policy->peers.pool, peer_ele ) ) ) {
316 : /* In general removal during iteration is safe, except when the iterator is on the peer to be removed. */
317 0 : policy->peers.iter = fd_peer_dlist_iter_fwd_next( policy->peers.iter, policy->peers.dlist, policy->peers.pool );
318 0 : }
319 0 : fd_peer_dlist_ele_remove( policy->peers.dlist, peer_ele, policy->peers.pool );
320 0 : fd_peer_pool_ele_release( policy->peers.pool, peer_ele );
321 0 : return 1;
322 0 : }
323 :
324 : void
325 0 : fd_policy_peer_request_update( fd_policy_t * policy, fd_pubkey_t const * to ) {
326 0 : fd_policy_peer_t * active = fd_policy_peer_query( policy, to );
327 0 : if( FD_LIKELY( active ) ) {
328 0 : active->req_cnt++;
329 0 : active->last_req_ts = fd_tickcount();
330 0 : if( FD_UNLIKELY( active->first_req_ts == 0 ) ) active->first_req_ts = active->last_req_ts;
331 0 : }
332 0 : }
333 :
334 : void
335 0 : fd_policy_peer_response_update( fd_policy_t * policy, fd_pubkey_t const * to, long rtt ) {
336 0 : fd_policy_peer_t * peer = fd_policy_peer_query( policy, to );
337 0 : if( FD_LIKELY( peer ) ) {
338 0 : long now = fd_tickcount();
339 0 : peer->res_cnt++;
340 0 : if( FD_UNLIKELY( peer->first_resp_ts == 0 ) ) peer->first_resp_ts = now;
341 0 : peer->last_resp_ts = now;
342 0 : peer->total_lat += rtt;
343 0 : }
344 0 : }
345 :
346 : void
347 0 : fd_policy_reset( fd_policy_t * policy, fd_forest_t * forest ) {
348 0 : policy->iterf = fd_forest_iter_init( forest );
349 0 : policy->tsreset = ts_ms( fd_log_wallclock() );
350 0 : }
351 :
352 : void
353 0 : fd_policy_set_turbine_slot0( fd_policy_t * policy, ulong slot ) {
354 0 : policy->turbine_slot0 = slot;
355 0 : }
356 :
|