Line data Source code
1 : #include "fd_policy.h"
2 : #include "../../disco/metrics/fd_metrics.h"
3 :
4 : #define NONCE_NULL (UINT_MAX)
5 0 : #define DEFER_REPAIR_MS (200UL)
6 0 : #define TARGET_TICK_PER_SLOT (64.0)
7 0 : #define MS_PER_TICK (400.0 / TARGET_TICK_PER_SLOT)
8 :
9 : void *
10 0 : fd_policy_new( void * shmem, ulong dedup_max, ulong peer_max, ulong seed ) {
11 :
12 0 : if( FD_UNLIKELY( !shmem ) ) {
13 0 : FD_LOG_WARNING(( "NULL mem" ));
14 0 : return NULL;
15 0 : }
16 :
17 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_policy_align() ) ) ) {
18 0 : FD_LOG_WARNING(( "misaligned mem" ));
19 0 : return NULL;
20 0 : }
21 :
22 0 : ulong footprint = fd_policy_footprint( dedup_max, peer_max );
23 0 : fd_memset( shmem, 0, footprint );
24 :
25 0 : int lg_peer_max = fd_ulong_find_msb( fd_ulong_pow2_up( peer_max ) );
26 0 : FD_SCRATCH_ALLOC_INIT( l, shmem );
27 0 : fd_policy_t * policy = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_align(), sizeof(fd_policy_t) );
28 0 : void * dedup_map = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_map_align(), fd_policy_dedup_map_footprint ( dedup_max ) );
29 0 : void * dedup_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_pool_align(), fd_policy_dedup_pool_footprint( dedup_max ) );
30 0 : void * dedup_lru = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_lru_align(), fd_policy_dedup_lru_footprint() );
31 0 : void * peers = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_peer_map_align(), fd_policy_peer_map_footprint ( lg_peer_max ) );
32 0 : void * peers_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_peer_pool_align(), fd_peer_pool_footprint ( peer_max ) );
33 0 : void * peers_fast = FD_SCRATCH_ALLOC_APPEND( l, fd_peer_dlist_align(), fd_peer_dlist_footprint() );
34 0 : void * peers_slow = FD_SCRATCH_ALLOC_APPEND( l, fd_peer_dlist_align(), fd_peer_dlist_footprint() );
35 0 : FD_TEST( FD_SCRATCH_ALLOC_FINI( l, fd_policy_align() ) == (ulong)shmem + footprint );
36 :
37 0 : policy->dedup.map = fd_policy_dedup_map_new ( dedup_map, dedup_max, seed );
38 0 : policy->dedup.pool = fd_policy_dedup_pool_new( dedup_pool, dedup_max );
39 0 : policy->dedup.lru = fd_policy_dedup_lru_new ( dedup_lru );
40 0 : policy->peers.map = fd_policy_peer_map_new ( peers, lg_peer_max );
41 0 : policy->peers.pool = fd_peer_pool_new ( peers_pool, peer_max );
42 0 : policy->peers.fast = fd_peer_dlist_new ( peers_fast );
43 0 : policy->peers.slow = fd_peer_dlist_new ( peers_slow );
44 0 : policy->iterf.ele_idx = ULONG_MAX;
45 0 : policy->turbine_slot0 = ULONG_MAX;
46 0 : policy->tsreset = 0;
47 0 : policy->nonce = 1;
48 :
49 0 : return shmem;
50 0 : }
51 :
52 : fd_policy_t *
53 0 : fd_policy_join( void * shpolicy ) {
54 0 : fd_policy_t * policy = (fd_policy_t *)shpolicy;
55 :
56 0 : if( FD_UNLIKELY( !policy ) ) {
57 0 : FD_LOG_WARNING(( "NULL policy" ));
58 0 : return NULL;
59 0 : }
60 :
61 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)policy, fd_policy_align() ) ) ) {
62 0 : FD_LOG_WARNING(( "misaligned policy" ));
63 0 : return NULL;
64 0 : }
65 :
66 0 : fd_wksp_t * wksp = fd_wksp_containing( policy );
67 0 : if( FD_UNLIKELY( !wksp ) ) {
68 0 : FD_LOG_WARNING(( "policy must be part of a workspace" ));
69 0 : return NULL;
70 0 : }
71 :
72 0 : policy->dedup.map = fd_policy_dedup_map_join ( policy->dedup.map );
73 0 : policy->dedup.pool = fd_policy_dedup_pool_join( policy->dedup.pool );
74 0 : policy->dedup.lru = fd_policy_dedup_lru_join ( policy->dedup.lru );
75 0 : policy->peers.map = fd_policy_peer_map_join ( policy->peers.map );
76 0 : policy->peers.pool = fd_peer_pool_join ( policy->peers.pool );
77 0 : policy->peers.fast = fd_peer_dlist_join ( policy->peers.fast );
78 0 : policy->peers.slow = fd_peer_dlist_join ( policy->peers.slow );
79 :
80 0 : policy->peers.select.iter = fd_peer_dlist_iter_fwd_init( policy->peers.slow, policy->peers.pool );
81 0 : policy->peers.select.stage = 0;
82 :
83 0 : return policy;
84 0 : }
85 :
86 : void *
87 0 : fd_policy_leave( fd_policy_t const * policy ) {
88 :
89 0 : if( FD_UNLIKELY( !policy ) ) {
90 0 : FD_LOG_WARNING(( "NULL policy" ));
91 0 : return NULL;
92 0 : }
93 :
94 0 : return (void *)policy;
95 0 : }
96 :
97 : void *
98 0 : fd_policy_delete( void * policy ) {
99 :
100 0 : if( FD_UNLIKELY( !policy ) ) {
101 0 : FD_LOG_WARNING(( "NULL policy" ));
102 0 : return NULL;
103 0 : }
104 :
105 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)policy, fd_policy_align() ) ) ) {
106 0 : FD_LOG_WARNING(( "misaligned policy" ));
107 0 : return NULL;
108 0 : }
109 :
110 0 : return policy;
111 0 : }
112 :
113 : /* dedup_evict evicts the first element returned by the map iterator. */
114 :
115 : static void
116 0 : dedup_evict( fd_policy_t * policy ) {
117 0 : fd_policy_dedup_ele_t * ele = fd_policy_dedup_lru_ele_pop_head( policy->dedup.lru, policy->dedup.pool );
118 0 : fd_policy_dedup_map_ele_remove( policy->dedup.map, &ele->key, NULL, policy->dedup.pool );
119 0 : fd_policy_dedup_pool_ele_release( policy->dedup.pool, ele );
120 0 : }
121 :
122 : /* dedup_next returns 1 if key is deduped, 0 otherwise. */
123 : static int
124 0 : dedup_next( fd_policy_t * policy, ulong key, long now ) {
125 0 : fd_policy_dedup_t * dedup = &policy->dedup;
126 0 : fd_policy_dedup_ele_t * ele = fd_policy_dedup_map_ele_query( dedup->map, &key, NULL, dedup->pool );
127 0 : if( FD_UNLIKELY( !ele ) ) {
128 0 : if( FD_UNLIKELY( !fd_policy_dedup_pool_free( dedup->pool ) ) ) dedup_evict( policy );
129 0 : ele = fd_policy_dedup_pool_ele_acquire( dedup->pool );
130 0 : ele->key = key;
131 0 : ele->req_ts = 0;
132 0 : fd_policy_dedup_map_ele_insert ( dedup->map, ele, dedup->pool );
133 0 : fd_policy_dedup_lru_ele_push_tail( dedup->lru, ele, dedup->pool );
134 0 : }
135 0 : if( FD_LIKELY( now < ele->req_ts + (long)FD_POLICY_DEDUP_TIMEOUT ) ) {
136 0 : return 1;
137 0 : }
138 0 : ele->req_ts = now;
139 0 : return 0;
140 0 : }
141 :
142 0 : static ulong ts_ms( long wallclock ) {
143 0 : return (ulong)wallclock / (ulong)1e6;
144 0 : }
145 :
146 : static int
147 0 : passes_throttle_threshold( fd_policy_t * policy, fd_forest_blk_t * ele ) {
148 0 : if( FD_UNLIKELY( ele->slot < policy->turbine_slot0 ) ) return 1;
149 : /* Essentially is checking if current duration of block ( from the
150 : first shred received until now ) is greater than the highest tick
151 : received + 200ms. */
152 0 : double current_duration = (double)(fd_tickcount() - ele->first_shred_ts) / fd_tempo_tick_per_ns(NULL);
153 0 : double tick_plus_buffer = (ele->est_buffered_tick_recv * MS_PER_TICK + DEFER_REPAIR_MS) * 1e6; // change to 400e6 for a slot duration policy
154 :
155 0 : if( current_duration >= tick_plus_buffer ){
156 0 : FD_MCNT_INC( REPAIR, EAGER_REPAIR_AGGRESSES, 1 );
157 0 : return 1;
158 0 : }
159 0 : return 0;
160 0 : }
161 :
162 : fd_pubkey_t const *
163 0 : fd_policy_peer_select( fd_policy_t * policy ) {
164 0 : fd_peer_dlist_t * best_dlist = policy->peers.fast;
165 0 : fd_peer_dlist_t * worst_dlist = policy->peers.slow;
166 0 : fd_peer_t * pool = policy->peers.pool;
167 :
168 0 : fd_peer_dlist_t * dlist = bucket_stages[policy->peers.select.stage] == FD_POLICY_LATENCY_FAST ? best_dlist : worst_dlist;
169 :
170 0 : while( FD_UNLIKELY( fd_peer_dlist_iter_done( policy->peers.select.iter, dlist, pool ) ) ) {
171 0 : policy->peers.select.stage = (policy->peers.select.stage + 1) % (sizeof(bucket_stages) / sizeof(uint));
172 0 : dlist = bucket_stages[policy->peers.select.stage] == FD_POLICY_LATENCY_FAST ? best_dlist : worst_dlist;
173 0 : policy->peers.select.iter = fd_peer_dlist_iter_fwd_init( dlist, pool );
174 0 : }
175 0 : fd_peer_t * select = fd_peer_dlist_iter_ele( policy->peers.select.iter, dlist, pool );
176 0 : policy->peers.select.iter = fd_peer_dlist_iter_fwd_next( policy->peers.select.iter, dlist, pool );
177 0 : return &select->identity;
178 0 : }
179 :
180 : fd_repair_msg_t const *
181 0 : fd_policy_next( fd_policy_t * policy, fd_forest_t * forest, fd_repair_t * repair, long now, ulong highest_known_slot ) {
182 0 : fd_forest_blk_t * pool = fd_forest_pool( forest );
183 0 : fd_forest_subtrees_t * subtrees = fd_forest_subtrees( forest );
184 :
185 0 : if( FD_UNLIKELY( forest->root == ULONG_MAX ) ) return NULL;
186 0 : if( FD_UNLIKELY( fd_peer_pool_used( policy->peers.pool ) == 0 ) ) return NULL;
187 :
188 0 : fd_repair_msg_t * out = NULL;
189 0 : ulong now_ms = ts_ms( now );
190 :
191 0 : if( FD_UNLIKELY( forest->subtree_cnt > 0 ) ) {
192 0 : for( fd_forest_subtrees_iter_t iter = fd_forest_subtrees_iter_init( subtrees, pool );
193 0 : !fd_forest_subtrees_iter_done( iter, subtrees, pool );
194 0 : iter = fd_forest_subtrees_iter_next( iter, subtrees, pool ) ) {
195 0 : fd_forest_blk_t * orphan = fd_forest_subtrees_iter_ele( iter, subtrees, pool );
196 0 : ulong key = fd_policy_dedup_key( FD_REPAIR_KIND_ORPHAN, orphan->slot, UINT_MAX );
197 0 : if( FD_UNLIKELY( !dedup_next( policy, key, now ) ) ) {
198 0 : out = fd_repair_orphan( repair, fd_policy_peer_select( policy ), now_ms, policy->nonce, orphan->slot );
199 0 : policy->nonce++;
200 0 : return out;
201 0 : }
202 0 : }
203 0 : }
204 :
205 : /* Every so often we'll need to reset the frontier iterator to the
206 : head of frontier, because we could end up traversing down a very
207 : long tree if we are far behind. */
208 :
209 0 : if( FD_UNLIKELY( now_ms - policy->tsreset > 100UL /* ms */ ||
210 0 : policy->iterf.frontier_ver != fd_fseq_query( fd_forest_ver_const( forest ) ) ) ) {
211 0 : fd_policy_reset( policy, forest );
212 0 : }
213 :
214 0 : fd_forest_blk_t * ele = fd_forest_pool_ele( pool, policy->iterf.ele_idx );
215 0 : if( FD_UNLIKELY( !ele ) ) {
216 : // This happens when we are fully caught up i.e. we have all the shreds of every slot we know about.
217 0 : return NULL;
218 0 : }
219 :
220 : /* When we are at the head of the turbine, we should give turbine the
221 : chance to complete the shreds. Agave waits 200ms from the
222 : estimated "correct time" of the highest shred received to repair.
223 : i.e. if we've received the first 200 shreds, the 200th has a tick
224 : of x. Translate that to millis, and we should wait to request shred
225 : 201 until x + 200ms. If we have a hole, i.e. first 200 shreds
226 : receive except shred 100, and the 101th shred has a tick of y, we
227 : should wait until y + 200ms to request shred 100.
228 :
229 : At the start of the loop, the policy iterf is valid and requestable.
230 : At the end of the loop, the policy iterf has been advanced to the
231 : next valid requestable element. */
232 :
233 0 : int req_made = 0;
234 0 : while( !req_made ) {
235 0 : ele = fd_forest_pool_ele( pool, policy->iterf.ele_idx );
236 :
237 0 : if( FD_UNLIKELY( !passes_throttle_threshold( policy, ele ) ) ) {
238 : /* We are not ready to repair this slot yet. But it's possible we
239 : have another fork that we need to repair... so we just
240 : should skip to the next SLOT in the consumed iterator. The
241 : likelihood that this ele is the head of turbine is high, which
242 : means that the shred_idx of the iterf is likely to be UINT_MAX,
243 : which means calling fd_forest_iter_next will advance the iterf
244 : to the next slot. */
245 0 : policy->iterf.shred_idx = UINT_MAX; // heinous... i'm sorry
246 0 : policy->iterf = fd_forest_iter_next( policy->iterf, forest );
247 0 : if( FD_UNLIKELY( fd_forest_iter_done( policy->iterf, forest ) ) ) {
248 0 : policy->iterf = fd_forest_iter_init( forest );
249 0 : break;
250 0 : }
251 0 : continue;
252 0 : }
253 :
254 0 : if( FD_UNLIKELY( policy->iterf.shred_idx == UINT_MAX ) ) {
255 0 : ulong key = fd_policy_dedup_key( FD_REPAIR_KIND_HIGHEST_SHRED, ele->slot, 0 );
256 0 : if( FD_UNLIKELY( ele->slot < highest_known_slot && !dedup_next( policy, key, now ) ) ) {
257 : // We'll never know the the highest shred for the current turbine slot, so there's no point in requesting it.
258 0 : out = fd_repair_highest_shred( repair, fd_policy_peer_select( policy ), now_ms, policy->nonce, ele->slot, 0 );
259 0 : policy->nonce++;
260 0 : req_made = 1;
261 0 : }
262 0 : } else {
263 0 : ulong key = fd_policy_dedup_key( FD_REPAIR_KIND_SHRED, ele->slot, policy->iterf.shred_idx );
264 0 : if( FD_UNLIKELY( !dedup_next( policy, key, now ) ) ) {
265 0 : out = fd_repair_shred( repair, fd_policy_peer_select( policy ), now_ms, policy->nonce, ele->slot, policy->iterf.shred_idx );
266 0 : policy->nonce++;
267 0 : if( FD_UNLIKELY( ele->first_req_ts == 0 ) ) ele->first_req_ts = fd_tickcount();
268 0 : req_made = 1;
269 0 : }
270 0 : }
271 :
272 : /* Even if we have a request ready, we need to advance the iterator.
273 : Otherwise on the next call of policy_next, we'll try to re-request the
274 : same shred and it will get deduped. */
275 :
276 0 : policy->iterf = fd_forest_iter_next( policy->iterf, forest );
277 0 : if( FD_UNLIKELY( fd_forest_iter_done( policy->iterf, forest ) ) ) {
278 0 : policy->iterf = fd_forest_iter_init( forest );
279 0 : break;
280 0 : }
281 0 : }
282 :
283 0 : if( FD_UNLIKELY( !req_made ) ) return NULL;
284 0 : return out;
285 0 : }
286 :
287 : fd_policy_peer_t const *
288 0 : fd_policy_peer_insert( fd_policy_t * policy, fd_pubkey_t const * key, fd_ip4_port_t const * addr ) {
289 0 : fd_policy_peer_t * peer_map = policy->peers.map;
290 0 : fd_policy_peer_t * peer = fd_policy_peer_map_query( peer_map, *key, NULL );
291 0 : if( FD_UNLIKELY( !peer && fd_policy_peer_map_key_cnt( peer_map ) < fd_policy_peer_map_key_max( peer_map ) ) ) {
292 0 : peer = fd_policy_peer_map_insert( policy->peers.map, *key );
293 0 : peer->key = *key;
294 0 : peer->ip4 = addr->addr;
295 0 : peer->port = addr->port;
296 0 : peer->req_cnt = 0;
297 0 : peer->res_cnt = 0;
298 0 : peer->first_req_ts = 0;
299 0 : peer->last_req_ts = 0;
300 0 : peer->first_resp_ts = 0;
301 0 : peer->last_resp_ts = 0;
302 0 : peer->total_lat = 0;
303 0 : peer->stake = 0;
304 :
305 0 : fd_peer_t * peer_ele = fd_peer_pool_ele_acquire( policy->peers.pool );
306 0 : peer->pool_idx = fd_peer_pool_idx( policy->peers.pool, peer_ele );
307 0 : peer_ele->identity = *key;
308 0 : fd_peer_dlist_ele_push_tail( policy->peers.slow, peer_ele, policy->peers.pool );
309 0 : return peer;
310 0 : }
311 0 : return NULL;
312 0 : }
313 :
314 : fd_policy_peer_t *
315 0 : fd_policy_peer_query( fd_policy_t * policy, fd_pubkey_t const * key ) {
316 0 : return fd_policy_peer_map_query( policy->peers.map, *key, NULL );
317 0 : }
318 :
319 : int
320 0 : fd_policy_peer_remove( fd_policy_t * policy, fd_pubkey_t const * key ) {
321 0 : fd_policy_peer_t * peer = fd_policy_peer_map_query( policy->peers.map, *key, NULL );
322 0 : if( FD_UNLIKELY( !peer ) ) return 0;
323 0 : fd_peer_t * peer_ele = fd_peer_pool_ele( policy->peers.pool, peer->pool_idx );
324 0 : fd_policy_peer_map_remove( policy->peers.map, peer );
325 :
326 0 : if( FD_UNLIKELY( policy->peers.select.iter == fd_peer_pool_idx( policy->peers.pool, peer_ele ) ) ) {
327 : /* In general removal during iteration is safe, except when the iterator is on the peer to be removed. */
328 0 : fd_peer_dlist_t * dlist = policy->peers.select.stage == FD_POLICY_LATENCY_FAST ? policy->peers.fast : policy->peers.slow;
329 0 : policy->peers.select.iter = fd_peer_dlist_iter_fwd_next( policy->peers.select.iter, dlist, policy->peers.pool );
330 0 : }
331 :
332 0 : fd_peer_dlist_t * bucket = fd_policy_peer_latency_bucket( policy, peer->total_lat, peer->res_cnt );
333 0 : fd_peer_dlist_ele_remove( bucket, peer_ele, policy->peers.pool );
334 0 : fd_peer_pool_ele_release( policy->peers.pool, peer_ele );
335 0 : return 1;
336 0 : }
337 :
338 : void
339 0 : fd_policy_peer_request_update( fd_policy_t * policy, fd_pubkey_t const * to ) {
340 0 : fd_policy_peer_t * active = fd_policy_peer_query( policy, to );
341 0 : if( FD_LIKELY( active ) ) {
342 0 : active->req_cnt++;
343 0 : active->last_req_ts = fd_tickcount();
344 0 : if( FD_UNLIKELY( active->first_req_ts == 0 ) ) active->first_req_ts = active->last_req_ts;
345 0 : }
346 0 : }
347 :
348 : void
349 0 : fd_policy_peer_response_update( fd_policy_t * policy, fd_pubkey_t const * to, long rtt /* ns */ ) {
350 0 : fd_policy_peer_t * peer = fd_policy_peer_query( policy, to );
351 0 : if( FD_LIKELY( peer ) ) {
352 0 : long now = fd_tickcount();
353 0 : fd_peer_dlist_t * prev_bucket = fd_policy_peer_latency_bucket( policy, peer->total_lat, peer->res_cnt );
354 0 : peer->res_cnt++;
355 0 : if( FD_UNLIKELY( peer->first_resp_ts == 0 ) ) peer->first_resp_ts = now;
356 0 : peer->last_resp_ts = now;
357 0 : peer->total_lat += rtt;
358 0 : fd_peer_dlist_t * new_bucket = fd_policy_peer_latency_bucket( policy, peer->total_lat, peer->res_cnt );
359 :
360 0 : if( prev_bucket != new_bucket ) {
361 0 : fd_peer_t * peer_ele = fd_peer_pool_ele( policy->peers.pool, peer->pool_idx );
362 0 : fd_peer_dlist_ele_remove ( prev_bucket, peer_ele, policy->peers.pool );
363 0 : fd_peer_dlist_ele_push_tail( new_bucket, peer_ele, policy->peers.pool );
364 0 : }
365 0 : }
366 0 : }
367 :
368 : void
369 0 : fd_policy_reset( fd_policy_t * policy, fd_forest_t * forest ) {
370 0 : policy->iterf = fd_forest_iter_init( forest );
371 0 : policy->tsreset = ts_ms( fd_log_wallclock() );
372 0 : }
373 :
374 : void
375 0 : fd_policy_set_turbine_slot0( fd_policy_t * policy, ulong slot ) {
376 0 : policy->turbine_slot0 = slot;
377 0 : }
378 :
|