Line data Source code
1 : #include "fd_policy.h"
2 : #include "../../disco/metrics/fd_metrics.h"
3 :
4 : #define NONCE_NULL (UINT_MAX)
5 0 : #define DEFER_REPAIR_MS (200UL)
6 0 : #define TARGET_TICK_PER_SLOT (64.0)
7 0 : #define MS_PER_TICK (400.0 / TARGET_TICK_PER_SLOT)
8 :
9 : void *
10 0 : fd_policy_new( void * shmem, ulong dedup_max, ulong peer_max, ulong seed ) {
11 :
12 0 : if( FD_UNLIKELY( !shmem ) ) {
13 0 : FD_LOG_WARNING(( "NULL mem" ));
14 0 : return NULL;
15 0 : }
16 :
17 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_policy_align() ) ) ) {
18 0 : FD_LOG_WARNING(( "misaligned mem" ));
19 0 : return NULL;
20 0 : }
21 :
22 0 : ulong footprint = fd_policy_footprint( dedup_max, peer_max );
23 0 : fd_memset( shmem, 0, footprint );
24 :
25 0 : int lg_peer_max = fd_ulong_find_msb( fd_ulong_pow2_up( peer_max ) );
26 0 : FD_SCRATCH_ALLOC_INIT( l, shmem );
27 0 : fd_policy_t * policy = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_align(), sizeof(fd_policy_t) );
28 0 : void * dedup_map = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_map_align(), fd_policy_dedup_map_footprint ( dedup_max ) );
29 0 : void * dedup_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_pool_align(), fd_policy_dedup_pool_footprint( dedup_max ) );
30 0 : void * dedup_lru = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_lru_align(), fd_policy_dedup_lru_footprint() );
31 0 : void * peers = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_peer_map_align(), fd_policy_peer_map_footprint ( lg_peer_max ) );
32 0 : void * peers_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_peer_pool_align(), fd_peer_pool_footprint ( peer_max ) );
33 0 : void * peers_fast = FD_SCRATCH_ALLOC_APPEND( l, fd_peer_dlist_align(), fd_peer_dlist_footprint() );
34 0 : void * peers_slow = FD_SCRATCH_ALLOC_APPEND( l, fd_peer_dlist_align(), fd_peer_dlist_footprint() );
35 0 : FD_TEST( FD_SCRATCH_ALLOC_FINI( l, fd_policy_align() ) == (ulong)shmem + footprint );
36 :
37 0 : policy->dedup.map = fd_policy_dedup_map_new ( dedup_map, dedup_max, seed );
38 0 : policy->dedup.pool = fd_policy_dedup_pool_new( dedup_pool, dedup_max );
39 0 : policy->dedup.lru = fd_policy_dedup_lru_new ( dedup_lru );
40 0 : policy->peers.map = fd_policy_peer_map_new ( peers, lg_peer_max, seed );
41 0 : policy->peers.pool = fd_peer_pool_new ( peers_pool, peer_max );
42 0 : policy->peers.fast = fd_peer_dlist_new ( peers_fast );
43 0 : policy->peers.slow = fd_peer_dlist_new ( peers_slow );
44 0 : policy->turbine_slot0 = ULONG_MAX;
45 0 : policy->nonce = 1;
46 :
47 0 : return shmem;
48 0 : }
49 :
50 : fd_policy_t *
51 0 : fd_policy_join( void * shpolicy ) {
52 0 : fd_policy_t * policy = (fd_policy_t *)shpolicy;
53 :
54 0 : if( FD_UNLIKELY( !policy ) ) {
55 0 : FD_LOG_WARNING(( "NULL policy" ));
56 0 : return NULL;
57 0 : }
58 :
59 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)policy, fd_policy_align() ) ) ) {
60 0 : FD_LOG_WARNING(( "misaligned policy" ));
61 0 : return NULL;
62 0 : }
63 :
64 0 : fd_wksp_t * wksp = fd_wksp_containing( policy );
65 0 : if( FD_UNLIKELY( !wksp ) ) {
66 0 : FD_LOG_WARNING(( "policy must be part of a workspace" ));
67 0 : return NULL;
68 0 : }
69 :
70 0 : policy->dedup.map = fd_policy_dedup_map_join ( policy->dedup.map );
71 0 : policy->dedup.pool = fd_policy_dedup_pool_join( policy->dedup.pool );
72 0 : policy->dedup.lru = fd_policy_dedup_lru_join ( policy->dedup.lru );
73 0 : policy->peers.map = fd_policy_peer_map_join ( policy->peers.map );
74 0 : policy->peers.pool = fd_peer_pool_join ( policy->peers.pool );
75 0 : policy->peers.fast = fd_peer_dlist_join ( policy->peers.fast );
76 0 : policy->peers.slow = fd_peer_dlist_join ( policy->peers.slow );
77 :
78 0 : policy->peers.select.iter = fd_peer_dlist_iter_fwd_init( policy->peers.slow, policy->peers.pool );
79 0 : policy->peers.select.stage = 0;
80 :
81 0 : return policy;
82 0 : }
83 :
84 : void *
85 0 : fd_policy_leave( fd_policy_t const * policy ) {
86 :
87 0 : if( FD_UNLIKELY( !policy ) ) {
88 0 : FD_LOG_WARNING(( "NULL policy" ));
89 0 : return NULL;
90 0 : }
91 :
92 0 : return (void *)policy;
93 0 : }
94 :
95 : void *
96 0 : fd_policy_delete( void * policy ) {
97 :
98 0 : if( FD_UNLIKELY( !policy ) ) {
99 0 : FD_LOG_WARNING(( "NULL policy" ));
100 0 : return NULL;
101 0 : }
102 :
103 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)policy, fd_policy_align() ) ) ) {
104 0 : FD_LOG_WARNING(( "misaligned policy" ));
105 0 : return NULL;
106 0 : }
107 :
108 0 : return policy;
109 0 : }
110 :
111 : /* dedup_evict evicts the first element returned by the map iterator. */
112 :
113 : static void
114 0 : dedup_evict( fd_policy_t * policy ) {
115 0 : fd_policy_dedup_ele_t * ele = fd_policy_dedup_lru_ele_pop_head( policy->dedup.lru, policy->dedup.pool );
116 0 : fd_policy_dedup_map_ele_remove( policy->dedup.map, &ele->key, NULL, policy->dedup.pool );
117 0 : fd_policy_dedup_pool_ele_release( policy->dedup.pool, ele );
118 0 : }
119 :
120 : /* dedup_next returns 1 if key is deduped, 0 otherwise. */
121 : static int
122 0 : dedup_next( fd_policy_t * policy, ulong key, long now ) {
123 0 : fd_policy_dedup_t * dedup = &policy->dedup;
124 0 : fd_policy_dedup_ele_t * ele = fd_policy_dedup_map_ele_query( dedup->map, &key, NULL, dedup->pool );
125 0 : if( FD_UNLIKELY( !ele ) ) {
126 0 : if( FD_UNLIKELY( !fd_policy_dedup_pool_free( dedup->pool ) ) ) dedup_evict( policy );
127 0 : ele = fd_policy_dedup_pool_ele_acquire( dedup->pool );
128 0 : ele->key = key;
129 0 : ele->req_ts = 0;
130 0 : fd_policy_dedup_map_ele_insert ( dedup->map, ele, dedup->pool );
131 0 : fd_policy_dedup_lru_ele_push_tail( dedup->lru, ele, dedup->pool );
132 0 : }
133 0 : if( FD_LIKELY( now < ele->req_ts + (long)FD_POLICY_DEDUP_TIMEOUT ) ) {
134 0 : return 1;
135 0 : }
136 0 : ele->req_ts = now;
137 0 : return 0;
138 0 : }
139 :
140 0 : static ulong ts_ms( long wallclock ) {
141 0 : return (ulong)wallclock / (ulong)1e6;
142 0 : }
143 :
144 : static int
145 0 : passes_throttle_threshold( fd_policy_t * policy, fd_forest_blk_t * ele ) {
146 0 : if( FD_UNLIKELY( ele->slot < policy->turbine_slot0 ) ) return 1;
147 : /* Essentially is checking if current duration of block ( from the
148 : first shred received until now ) is greater than the highest tick
149 : received + 200ms. */
150 0 : double current_duration = (double)(fd_tickcount() - ele->first_shred_ts) / fd_tempo_tick_per_ns(NULL);
151 0 : double tick_plus_buffer = (ele->est_buffered_tick_recv * MS_PER_TICK + DEFER_REPAIR_MS) * 1e6; // change to 400e6 for a slot duration policy
152 :
153 0 : if( current_duration >= tick_plus_buffer ){
154 0 : FD_MCNT_INC( REPAIR, EAGER_REPAIR_AGGRESSES, 1 );
155 0 : return 1;
156 0 : }
157 0 : return 0;
158 0 : }
159 :
160 : fd_pubkey_t const *
161 0 : fd_policy_peer_select( fd_policy_t * policy ) {
162 0 : fd_peer_dlist_t * best_dlist = policy->peers.fast;
163 0 : fd_peer_dlist_t * worst_dlist = policy->peers.slow;
164 0 : fd_peer_t * pool = policy->peers.pool;
165 :
166 0 : if( FD_UNLIKELY( fd_peer_pool_used( policy->peers.pool ) == 0 ) ) return NULL;
167 :
168 0 : fd_peer_dlist_t * dlist = bucket_stages[policy->peers.select.stage] == FD_POLICY_LATENCY_FAST ? best_dlist : worst_dlist;
169 :
170 0 : while( FD_UNLIKELY( fd_peer_dlist_iter_done( policy->peers.select.iter, dlist, pool ) ) ) {
171 0 : policy->peers.select.stage = (policy->peers.select.stage + 1) % (sizeof(bucket_stages) / sizeof(uint));
172 0 : dlist = bucket_stages[policy->peers.select.stage] == FD_POLICY_LATENCY_FAST ? best_dlist : worst_dlist;
173 0 : policy->peers.select.iter = fd_peer_dlist_iter_fwd_init( dlist, pool );
174 0 : }
175 0 : fd_peer_t * select = fd_peer_dlist_iter_ele( policy->peers.select.iter, dlist, pool );
176 0 : policy->peers.select.iter = fd_peer_dlist_iter_fwd_next( policy->peers.select.iter, dlist, pool );
177 0 : return &select->identity;
178 0 : }
179 :
180 : fd_repair_msg_t const *
181 0 : fd_policy_next( fd_policy_t * policy, fd_forest_t * forest, fd_repair_t * repair, long now, ulong highest_known_slot, int * charge_busy ) {
182 0 : fd_forest_blk_t * pool = fd_forest_pool( forest );
183 0 : fd_forest_subtlist_t * subtlist = fd_forest_subtlist( forest );
184 0 : *charge_busy = 0;
185 :
186 0 : if( FD_UNLIKELY( forest->root == ULONG_MAX ) ) return NULL;
187 0 : if( FD_UNLIKELY( fd_peer_pool_used( policy->peers.pool ) == 0 ) ) return NULL;
188 :
189 0 : fd_repair_msg_t * out = NULL;
190 0 : ulong now_ms = ts_ms( now );
191 :
192 0 : for( fd_forest_subtlist_iter_t iter = fd_forest_subtlist_iter_fwd_init( subtlist, pool );
193 0 : !fd_forest_subtlist_iter_done ( iter, subtlist, pool );
194 0 : iter = fd_forest_subtlist_iter_fwd_next( iter, subtlist, pool ) ) {
195 0 : *charge_busy = 1;
196 0 : fd_forest_blk_t * orphan = fd_forest_subtlist_iter_ele( iter, subtlist, pool );
197 0 : ulong key = fd_policy_dedup_key( FD_REPAIR_KIND_ORPHAN, orphan->slot, UINT_MAX );
198 0 : if( FD_UNLIKELY( !dedup_next( policy, key, now ) ) ) {
199 0 : out = fd_repair_orphan( repair, fd_policy_peer_select( policy ), now_ms, policy->nonce, orphan->slot );
200 0 : policy->nonce++;
201 0 : return out;
202 0 : }
203 0 : }
204 :
205 : /* Select a slot to operate on 🔪. Advance either the orphan iter or
206 : regular iter. */
207 0 : fd_forest_iter_t * iter = NULL;
208 0 : if( FD_UNLIKELY( fd_forest_reqslist_is_empty( fd_forest_reqslist( forest ), fd_forest_reqspool( forest ) ) ) ) {
209 : /* If the main tree has nothing to iterate at the moment, we can
210 : request down the ORPHAN trees on slots we know about. */
211 0 : iter = &forest->orphiter;
212 0 : } else {
213 0 : iter = &forest->iter;
214 0 : }
215 :
216 0 : fd_forest_iter_next( iter, forest );
217 0 : if( FD_UNLIKELY( fd_forest_iter_done( iter, forest ) ) ) {
218 : // This happens when we have already requested all the shreds we know about.
219 0 : return NULL;
220 0 : }
221 :
222 0 : fd_forest_blk_t * ele = fd_forest_pool_ele( pool, iter->ele_idx );
223 0 : if( FD_UNLIKELY( !passes_throttle_threshold( policy, ele ) ) ) {
224 : /* When we are at the head of the turbine, we should give turbine the
225 : chance to complete the shreds. Agave waits 200ms from the
226 : estimated "correct time" of the highest shred received to repair.
227 : i.e. if we've received the first 200 shreds, the 200th has a tick
228 : of x. Translate that to millis, and we should wait to request shred
229 : 201 until x + 200ms. If we have a hole, i.e. first 200 shreds
230 : receive except shred 100, and the 101th shred has a tick of y, we
231 : should wait until y + 200ms to request shred 100.
232 :
233 : Here we did not pass the timeout threshold, so we are not ready
234 : to repair this slot yet. But it's possible we have another fork
235 : that we need to repair... so we just should skip to the next SLOT
236 : in the main tree iterator. The likelihood that this ele is the
237 : head of turbine is high, which means that the shred_idx of the
238 : iterf is likely to be UINT_MAX, which means calling
239 : fd_forest_iter_next will advance the iterf to the next slot. */
240 0 : iter->shred_idx = UINT_MAX;
241 : /* TODO: Heinous... but the easiest way to ensure this slot gets
242 : added back to the requests deque is if we set the shred_idx to
243 : UINT_MAX, but maybe there should be an explicit API for it. */
244 :
245 0 : return NULL;
246 0 : }
247 :
248 0 : *charge_busy = 1;
249 :
250 0 : if( FD_UNLIKELY( iter->shred_idx == UINT_MAX ) ) {
251 0 : if( FD_UNLIKELY( ele->slot < highest_known_slot ) ) {
252 : // We'll never know the the highest shred for the current turbine slot, so there's no point in requesting it.
253 0 : out = fd_repair_highest_shred( repair, fd_policy_peer_select( policy ), now_ms, policy->nonce, ele->slot, 0 );
254 0 : policy->nonce++;
255 0 : }
256 0 : } else {
257 0 : out = fd_repair_shred( repair, fd_policy_peer_select( policy ), now_ms, policy->nonce, ele->slot, iter->shred_idx );
258 0 : policy->nonce++;
259 0 : if( FD_UNLIKELY( ele->first_req_ts == 0 ) ) ele->first_req_ts = fd_tickcount();
260 0 : }
261 0 : return out;
262 0 : }
263 :
264 : fd_policy_peer_t const *
265 0 : fd_policy_peer_insert( fd_policy_t * policy, fd_pubkey_t const * key, fd_ip4_port_t const * addr ) {
266 0 : fd_policy_peer_t * peer_map = policy->peers.map;
267 0 : fd_policy_peer_t * peer = fd_policy_peer_map_query( peer_map, *key, NULL );
268 0 : if( FD_UNLIKELY( !peer && fd_policy_peer_map_key_cnt( peer_map ) < fd_policy_peer_map_key_max( peer_map ) ) ) {
269 0 : peer = fd_policy_peer_map_insert( policy->peers.map, *key );
270 0 : peer->key = *key;
271 0 : peer->ip4 = addr->addr;
272 0 : peer->port = addr->port;
273 0 : peer->req_cnt = 0;
274 0 : peer->res_cnt = 0;
275 0 : peer->first_req_ts = 0;
276 0 : peer->last_req_ts = 0;
277 0 : peer->first_resp_ts = 0;
278 0 : peer->last_resp_ts = 0;
279 0 : peer->total_lat = 0;
280 0 : peer->stake = 0;
281 :
282 0 : fd_peer_t * peer_ele = fd_peer_pool_ele_acquire( policy->peers.pool );
283 0 : peer->pool_idx = fd_peer_pool_idx( policy->peers.pool, peer_ele );
284 0 : peer_ele->identity = *key;
285 0 : fd_peer_dlist_ele_push_tail( policy->peers.slow, peer_ele, policy->peers.pool );
286 0 : return peer;
287 0 : }
288 0 : return NULL;
289 0 : }
290 :
291 : fd_policy_peer_t *
292 0 : fd_policy_peer_query( fd_policy_t * policy, fd_pubkey_t const * key ) {
293 0 : if( FD_UNLIKELY( memcmp( key->key, null_pubkey.key, 32UL ) == 0 ) ) {
294 0 : FD_LOG_WARNING(( "Repair policy peer with null pubkey." ));
295 0 : return NULL;
296 0 : };
297 0 : return fd_policy_peer_map_query( policy->peers.map, *key, NULL );
298 0 : }
299 :
300 : int
301 0 : fd_policy_peer_remove( fd_policy_t * policy, fd_pubkey_t const * key ) {
302 0 : fd_policy_peer_t * peer = fd_policy_peer_map_query( policy->peers.map, *key, NULL );
303 0 : if( FD_UNLIKELY( !peer ) ) return 0;
304 0 : fd_peer_t * peer_ele = fd_peer_pool_ele( policy->peers.pool, peer->pool_idx );
305 :
306 0 : if( FD_UNLIKELY( policy->peers.select.iter == fd_peer_pool_idx( policy->peers.pool, peer_ele ) ) ) {
307 : /* In general removal during iteration is safe, except when the iterator is on the peer to be removed. */
308 0 : fd_peer_dlist_t * dlist = bucket_stages[policy->peers.select.stage] == FD_POLICY_LATENCY_FAST ? policy->peers.fast : policy->peers.slow;
309 0 : policy->peers.select.iter = fd_peer_dlist_iter_fwd_next( policy->peers.select.iter, dlist, policy->peers.pool );
310 0 : }
311 :
312 0 : fd_peer_dlist_t * bucket = fd_policy_peer_latency_bucket( policy, peer->total_lat, peer->res_cnt );
313 0 : fd_peer_dlist_ele_remove( bucket, peer_ele, policy->peers.pool );
314 0 : fd_peer_pool_ele_release( policy->peers.pool, peer_ele );
315 :
316 0 : fd_policy_peer_map_remove( policy->peers.map, peer );
317 0 : return 1;
318 0 : }
319 :
320 : void
321 0 : fd_policy_peer_request_update( fd_policy_t * policy, fd_pubkey_t const * to ) {
322 0 : fd_policy_peer_t * active = fd_policy_peer_query( policy, to );
323 0 : if( FD_LIKELY( active ) ) {
324 0 : active->req_cnt++;
325 0 : active->last_req_ts = fd_tickcount();
326 0 : if( FD_UNLIKELY( active->first_req_ts == 0 ) ) active->first_req_ts = active->last_req_ts;
327 0 : }
328 0 : }
329 :
330 : void
331 0 : fd_policy_peer_response_update( fd_policy_t * policy, fd_pubkey_t const * to, long rtt /* ns */ ) {
332 0 : fd_policy_peer_t * peer = fd_policy_peer_query( policy, to );
333 0 : if( FD_LIKELY( peer ) ) {
334 0 : long now = fd_tickcount();
335 0 : fd_peer_dlist_t * prev_bucket = fd_policy_peer_latency_bucket( policy, peer->total_lat, peer->res_cnt );
336 0 : peer->res_cnt++;
337 0 : if( FD_UNLIKELY( peer->first_resp_ts == 0 ) ) peer->first_resp_ts = now;
338 0 : peer->last_resp_ts = now;
339 0 : peer->total_lat += rtt;
340 0 : fd_peer_dlist_t * new_bucket = fd_policy_peer_latency_bucket( policy, peer->total_lat, peer->res_cnt );
341 :
342 0 : if( prev_bucket != new_bucket ) {
343 0 : fd_peer_t * peer_ele = fd_peer_pool_ele( policy->peers.pool, peer->pool_idx );
344 0 : fd_peer_dlist_ele_remove ( prev_bucket, peer_ele, policy->peers.pool );
345 0 : fd_peer_dlist_ele_push_tail( new_bucket, peer_ele, policy->peers.pool );
346 0 : }
347 0 : }
348 0 : }
349 :
350 : void
351 0 : fd_policy_set_turbine_slot0( fd_policy_t * policy, ulong slot ) {
352 0 : policy->turbine_slot0 = slot;
353 0 : }
354 :
|