Line data Source code
1 : #include "fd_sspeer_selector.h"
2 :
3 : struct fd_sspeer_private {
4 : fd_ip4_port_t addr;
5 : ulong full_slot;
6 : ulong incr_slot;
7 : ulong latency;
8 : ulong score;
9 : int valid;
10 :
11 : struct {
12 : ulong next;
13 : } pool;
14 :
15 : struct {
16 : ulong next;
17 : ulong prev;
18 : } map;
19 :
20 : struct {
21 : ulong parent;
22 : ulong left;
23 : ulong right;
24 : ulong prio;
25 : } score_treap;
26 : };
27 :
28 : typedef struct fd_sspeer_private fd_sspeer_private_t;
29 :
30 : #define POOL_NAME peer_pool
31 9 : #define POOL_T fd_sspeer_private_t
32 : #define POOL_IDX_T ulong
33 196674 : #define POOL_NEXT pool.next
34 : #include "../../../util/tmpl/fd_pool.c"
35 :
36 : #define MAP_NAME peer_map
37 45 : #define MAP_KEY addr
38 0 : #define MAP_ELE_T fd_sspeer_private_t
39 : #define MAP_KEY_T fd_ip4_port_t
40 105 : #define MAP_PREV map.prev
41 204 : #define MAP_NEXT map.next
42 150 : #define MAP_KEY_EQ(k0,k1) ((k0)->l==(k1)->l)
43 90 : #define MAP_KEY_HASH(key,seed) (seed^(key)->l)
44 : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
45 : #include "../../../util/tmpl/fd_map_chain.c"
46 :
47 72 : #define COMPARE_WORSE(x,y) ( (x)->score<(y)->score )
48 :
49 : #define TREAP_T fd_sspeer_private_t
50 : #define TREAP_NAME score_treap
51 : #define TREAP_QUERY_T void * /* We don't use query ... */
52 : #define TREAP_CMP(a,b) (__extension__({ (void)(a); (void)(b); -1; })) /* which means we don't need to give a real
53 : implementation to cmp either */
54 207 : #define TREAP_IDX_T ulong
55 72 : #define TREAP_LT COMPARE_WORSE
56 183 : #define TREAP_PARENT score_treap.parent
57 222 : #define TREAP_LEFT score_treap.left
58 99 : #define TREAP_RIGHT score_treap.right
59 87 : #define TREAP_PRIO score_treap.prio
60 : #include "../../../util/tmpl/fd_treap.c"
61 :
62 45 : #define DEFAULT_SLOTS_BEHIND (1000UL*1000UL) /* 1,000,000 slots behind */
63 45 : #define DEFAULT_PEER_LATENCY (100L*1000L*1000L) /* 100ms */
64 :
65 : #define FD_SSPEER_SELECTOR_DEBUG 0
66 :
67 : struct fd_sspeer_selector_private {
68 : fd_sspeer_private_t * pool;
69 : peer_map_t * map;
70 : score_treap_t * score_treap;
71 : score_treap_t * shadow_score_treap;
72 : ulong * peer_idx_list;
73 : fd_sscluster_slot_t cluster_slot;
74 : int incremental_snapshot_fetch;
75 :
76 : ulong magic; /* ==FD_SSPEER_SELECTOR_MAGIC */
77 : };
78 :
79 : FD_FN_CONST ulong
80 21 : fd_sspeer_selector_align( void ) {
81 21 : return fd_ulong_max( alignof( fd_sspeer_selector_t), fd_ulong_max( peer_pool_align(), fd_ulong_max( score_treap_align(), alignof(ulong) ) ) );
82 21 : }
83 :
84 : FD_FN_CONST ulong
85 3 : fd_sspeer_selector_footprint( ulong max_peers ) {
86 3 : ulong l;
87 3 : l = FD_LAYOUT_INIT;
88 3 : l = FD_LAYOUT_APPEND( l, alignof(fd_sspeer_selector_t), sizeof(fd_sspeer_selector_t) );
89 3 : l = FD_LAYOUT_APPEND( l, peer_pool_align(), peer_pool_footprint( 2UL*max_peers ) );
90 3 : l = FD_LAYOUT_APPEND( l, peer_map_align(), peer_map_footprint( peer_map_chain_cnt_est( 2UL*max_peers ) ) );
91 3 : l = FD_LAYOUT_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
92 3 : l = FD_LAYOUT_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
93 3 : l = FD_LAYOUT_APPEND( l, alignof(ulong), max_peers * sizeof(ulong) );
94 3 : return FD_LAYOUT_FINI( l, fd_sspeer_selector_align() );
95 3 : }
96 :
97 : void *
98 : fd_sspeer_selector_new( void * shmem,
99 : ulong max_peers,
100 : int incremental_snapshot_fetch,
101 3 : ulong seed ) {
102 3 : if( FD_UNLIKELY( !shmem ) ) {
103 0 : FD_LOG_WARNING(( "NULL shmem" ));
104 0 : return NULL;
105 0 : }
106 :
107 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_sspeer_selector_align() ) ) ) {
108 0 : FD_LOG_WARNING(( "unaligned shmem" ));
109 0 : return NULL;
110 0 : }
111 :
112 3 : if( FD_UNLIKELY( max_peers < 1UL ) ) {
113 0 : FD_LOG_WARNING(( "max_peers must be at least 1" ));
114 0 : return NULL;
115 0 : }
116 :
117 3 : FD_SCRATCH_ALLOC_INIT( l, shmem );
118 3 : fd_sspeer_selector_t * selector = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sspeer_selector_t), sizeof(fd_sspeer_selector_t) );
119 3 : void * _pool = FD_SCRATCH_ALLOC_APPEND( l, peer_pool_align(), peer_pool_footprint( 2UL*max_peers ) );
120 3 : void * _map = FD_SCRATCH_ALLOC_APPEND( l, peer_map_align(), peer_map_footprint( peer_map_chain_cnt_est( 2UL*max_peers ) ) );
121 3 : void * _score_treap = FD_SCRATCH_ALLOC_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
122 3 : void * _shadow_score_treap = FD_SCRATCH_ALLOC_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
123 3 : void * _peer_idx_list = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), max_peers * sizeof(ulong) );
124 :
125 0 : selector->pool = peer_pool_join( peer_pool_new( _pool, max_peers ) );
126 3 : selector->map = peer_map_join( peer_map_new( _map, peer_map_chain_cnt_est( 2UL*max_peers ), seed ) );
127 3 : selector->score_treap = score_treap_join( score_treap_new( _score_treap, max_peers ) );
128 3 : selector->shadow_score_treap = score_treap_join( score_treap_new( _shadow_score_treap, max_peers ) );
129 3 : selector->peer_idx_list = (ulong *)_peer_idx_list;
130 :
131 3 : selector->cluster_slot.full = 0UL;
132 3 : selector->cluster_slot.incremental = 0UL;
133 3 : selector->incremental_snapshot_fetch = incremental_snapshot_fetch;
134 :
135 3 : FD_COMPILER_MFENCE();
136 3 : FD_VOLATILE( selector->magic ) = FD_SSPEER_SELECTOR_MAGIC;
137 3 : FD_COMPILER_MFENCE();
138 :
139 3 : return (void *)selector;
140 3 : }
141 :
142 : fd_sspeer_selector_t *
143 3 : fd_sspeer_selector_join( void * shselector ) {
144 3 : if( FD_UNLIKELY( !shselector ) ) {
145 0 : FD_LOG_WARNING(( "NULL shselector" ));
146 0 : return NULL;
147 0 : }
148 :
149 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shselector, fd_sspeer_selector_align() ) ) ) {
150 0 : FD_LOG_WARNING(( "misaligned shselector" ));
151 0 : return NULL;
152 0 : }
153 :
154 3 : fd_sspeer_selector_t * selector = (fd_sspeer_selector_t *)shselector;
155 :
156 3 : if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
157 0 : FD_LOG_WARNING(( "bad magic" ));
158 0 : return NULL;
159 0 : }
160 :
161 3 : return selector;
162 3 : }
163 :
164 : void *
165 3 : fd_sspeer_selector_leave( fd_sspeer_selector_t * selector ) {
166 3 : if( FD_UNLIKELY( !selector ) ) {
167 0 : FD_LOG_WARNING(( "NULL selector" ));
168 0 : return NULL;
169 0 : }
170 :
171 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)selector, fd_sspeer_selector_align() ) ) ) {
172 0 : FD_LOG_WARNING(( "misaligned selector" ));
173 0 : return NULL;
174 0 : }
175 :
176 3 : if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
177 0 : FD_LOG_WARNING(( "bad magic" ));
178 0 : return NULL;
179 0 : }
180 :
181 3 : selector->pool = peer_pool_leave( selector->pool );
182 3 : selector->map = peer_map_leave( selector->map );
183 3 : selector->score_treap = score_treap_leave( selector->score_treap );
184 3 : selector->shadow_score_treap = score_treap_leave( selector->shadow_score_treap );
185 :
186 3 : return (void *)selector;
187 3 : }
188 :
189 : void *
190 3 : fd_sspeer_selector_delete( void * shselector ) {
191 3 : if( FD_UNLIKELY( !shselector ) ) {
192 0 : FD_LOG_WARNING(( "NULL shselector" ));
193 0 : return NULL;
194 0 : }
195 :
196 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shselector, fd_sspeer_selector_align() ) ) ) {
197 0 : FD_LOG_WARNING(( "misaligned shselector" ));
198 0 : return NULL;
199 0 : }
200 :
201 3 : fd_sspeer_selector_t * selector = (fd_sspeer_selector_t *)shselector;
202 :
203 3 : if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
204 0 : FD_LOG_WARNING(( "bad magic" ));
205 0 : return NULL;
206 0 : }
207 :
208 3 : selector->pool = peer_pool_delete( selector->pool );
209 3 : selector->map = peer_map_delete( selector->map );
210 3 : selector->score_treap = score_treap_delete( selector->score_treap );
211 3 : selector->shadow_score_treap = score_treap_delete( selector->shadow_score_treap );
212 :
213 3 : FD_COMPILER_MFENCE();
214 3 : FD_VOLATILE( selector->magic ) = 0UL;
215 3 : FD_COMPILER_MFENCE();
216 :
217 3 : return (void *)selector;
218 3 : }
219 :
220 : /* Calculates a score for a peer given its latency and its resolved
221 : full and incremental slots */
222 : ulong
223 : fd_sspeer_selector_score( fd_sspeer_selector_t * selector,
224 : ulong peer_latency,
225 : ulong full_slot,
226 45 : ulong incr_slot ) {
227 45 : static const ulong slots_behind_penalty = 1000UL;
228 45 : ulong slot = ULONG_MAX;
229 45 : ulong slots_behind = DEFAULT_SLOTS_BEHIND;
230 45 : peer_latency = peer_latency!=ULONG_MAX ? peer_latency : DEFAULT_PEER_LATENCY;
231 :
232 45 : if( FD_LIKELY( full_slot!=ULONG_MAX ) ) {
233 39 : if( FD_UNLIKELY( incr_slot==ULONG_MAX ) ) {
234 0 : slot = full_slot;
235 0 : slots_behind = selector->cluster_slot.full>slot ? selector->cluster_slot.full - slot : 0UL;
236 39 : } else {
237 39 : slot = incr_slot;
238 39 : slots_behind = selector->cluster_slot.incremental>slot ? selector->cluster_slot.incremental - slot : 0UL;
239 39 : }
240 39 : }
241 :
242 : /* TODO: come up with a better/more dynamic score function */
243 45 : return peer_latency + slots_behind_penalty*slots_behind;
244 45 : }
245 :
246 : /* Updates a peer's score with new values for latency and/or resolved
247 : full/incremental slots */
248 : static void
249 : fd_sspeer_selector_update( fd_sspeer_selector_t * selector,
250 : fd_sspeer_private_t * peer,
251 : ulong latency,
252 : ulong full_slot,
253 0 : ulong incr_slot ) {
254 0 : score_treap_ele_remove( selector->score_treap, peer, selector->pool );
255 :
256 0 : ulong peer_latency = latency!=ULONG_MAX ? latency : peer->latency;
257 0 : peer->score = fd_sspeer_selector_score( selector, peer_latency, full_slot, incr_slot );
258 :
259 0 : peer->full_slot = full_slot!=ULONG_MAX ? full_slot : peer->full_slot;
260 0 : peer->incr_slot = incr_slot!=ULONG_MAX ? incr_slot : peer->incr_slot;
261 :
262 0 : if( FD_LIKELY( latency!=ULONG_MAX ) ) {
263 0 : peer->latency = latency;
264 0 : }
265 :
266 0 : score_treap_ele_insert( selector->score_treap, peer, selector->pool );
267 0 : }
268 :
269 : ulong
270 : fd_sspeer_selector_add( fd_sspeer_selector_t * selector,
271 : fd_ip4_port_t addr,
272 : ulong latency,
273 : ulong full_slot,
274 21 : ulong incr_slot ) {
275 21 : fd_sspeer_private_t * peer = peer_map_ele_query( selector->map, &addr, NULL, selector->pool );
276 21 : if( FD_LIKELY( peer ) ) {
277 0 : fd_sspeer_selector_update( selector, peer, latency, full_slot, incr_slot );
278 21 : } else {
279 21 : if( FD_UNLIKELY( !peer_pool_free( selector->pool ) ) ) return ULONG_MAX;
280 :
281 21 : peer = peer_pool_ele_acquire( selector->pool );
282 21 : peer->addr = addr;
283 21 : peer->latency = latency;
284 21 : peer->score = fd_sspeer_selector_score( selector, latency, full_slot, incr_slot );
285 21 : peer->full_slot = full_slot;
286 21 : peer->incr_slot = incr_slot;
287 21 : peer_map_ele_insert( selector->map, peer, selector->pool );
288 21 : score_treap_ele_insert( selector->score_treap, peer, selector->pool );
289 21 : }
290 21 : peer->valid = peer->latency!=ULONG_MAX && peer->full_slot!=ULONG_MAX;
291 21 : return peer->score;
292 21 : }
293 :
294 : void
295 : fd_sspeer_selector_remove( fd_sspeer_selector_t * selector,
296 0 : fd_ip4_port_t addr ) {
297 0 : fd_sspeer_private_t * peer = peer_map_ele_query( selector->map, &addr, NULL, selector->pool );
298 0 : if( FD_UNLIKELY( !peer ) ) return;
299 :
300 0 : score_treap_ele_remove( selector->score_treap, peer, selector->pool );
301 0 : peer_map_ele_remove_fast( selector->map, peer, selector->pool );
302 0 : peer_pool_ele_release( selector->pool, peer );
303 0 : }
304 :
305 : fd_sspeer_t
306 : fd_sspeer_selector_best( fd_sspeer_selector_t * selector,
307 : int incremental,
308 24 : ulong base_slot ) {
309 24 : if( FD_UNLIKELY( incremental ) ) {
310 9 : FD_TEST( base_slot!=ULONG_MAX );
311 9 : }
312 :
313 24 : for( score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( selector->score_treap, selector->pool );
314 30 : !score_treap_fwd_iter_done( iter );
315 30 : iter = score_treap_fwd_iter_next( iter, selector->pool ) ) {
316 30 : fd_sspeer_private_t const * peer = score_treap_fwd_iter_ele_const( iter, selector->pool );
317 30 : if( FD_LIKELY( peer->valid &&
318 30 : (!incremental ||
319 30 : (incremental && peer->full_slot==base_slot) ) ) ) {
320 24 : return (fd_sspeer_t){
321 24 : .addr = peer->addr,
322 24 : .full_slot = peer->full_slot,
323 24 : .incr_slot = peer->incr_slot,
324 24 : .score = peer->score,
325 24 : };
326 24 : }
327 30 : }
328 :
329 0 : return (fd_sspeer_t){
330 0 : .addr = { .l=0UL },
331 0 : .full_slot = ULONG_MAX,
332 0 : .incr_slot = ULONG_MAX,
333 0 : .score = ULONG_MAX,
334 0 : };
335 24 : }
336 :
337 : void
338 : fd_sspeer_selector_process_cluster_slot( fd_sspeer_selector_t * selector,
339 : ulong full_slot,
340 9 : ulong incr_slot ) {
341 9 : if( full_slot==ULONG_MAX && incr_slot==ULONG_MAX ) return;
342 :
343 9 : FD_TEST( full_slot!=ULONG_MAX );
344 9 : if( FD_LIKELY( selector->incremental_snapshot_fetch ) ) {
345 : /* incremental slot is less than or equal to cluster incremental slot */
346 9 : if( FD_UNLIKELY( incr_slot!=ULONG_MAX && selector->cluster_slot.incremental!=ULONG_MAX && incr_slot<=selector->cluster_slot.incremental ) ) return;
347 : /* incremental slot is less than or equal to cluster full slot when cluster incremental slot does not exist */
348 9 : else if( FD_UNLIKELY( incr_slot!=ULONG_MAX && selector->cluster_slot.incremental==ULONG_MAX && incr_slot<=selector->cluster_slot.full ) ) return;
349 : /* full slot is less than cluster full slot when incremental slot does not exist */
350 9 : else if( FD_UNLIKELY( incr_slot==ULONG_MAX && full_slot<=selector->cluster_slot.full ) ) return;
351 9 : } else {
352 0 : if( FD_UNLIKELY( full_slot<=selector->cluster_slot.full ) ) return;
353 0 : }
354 :
355 9 : selector->cluster_slot.full = full_slot;
356 9 : selector->cluster_slot.incremental = incr_slot;
357 :
358 9 : if( FD_UNLIKELY( score_treap_ele_cnt( selector->score_treap )==0UL ) ) return;
359 :
360 : /* Rescore all peers
361 : TODO: make more performant, maybe make a treap rebalance API */
362 6 : ulong idx = 0UL;
363 6 : for( score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( selector->score_treap, selector->pool );
364 30 : !score_treap_fwd_iter_done( iter );
365 24 : iter = score_treap_fwd_iter_next( iter, selector->pool ) ) {
366 24 : fd_sspeer_private_t const * peer = score_treap_fwd_iter_ele_const( iter, selector->pool );
367 24 : fd_sspeer_private_t * shadow_peer = peer_pool_ele_acquire( selector->pool );
368 24 : shadow_peer->latency = peer->latency;
369 24 : shadow_peer->full_slot = peer->full_slot;
370 24 : shadow_peer->incr_slot = peer->incr_slot;
371 24 : shadow_peer->addr = peer->addr;
372 24 : shadow_peer->score = fd_sspeer_selector_score( selector, shadow_peer->latency, shadow_peer->full_slot, shadow_peer->incr_slot );
373 24 : score_treap_ele_insert( selector->shadow_score_treap, shadow_peer, selector->pool );
374 24 : selector->peer_idx_list[ idx++ ] = peer_pool_idx( selector->pool, peer );
375 24 : peer_map_ele_remove( selector->map, &peer->addr, NULL, selector->pool );
376 24 : peer_map_ele_insert( selector->map, shadow_peer, selector->pool );
377 24 : }
378 :
379 : /* clear score treap*/
380 30 : for( ulong i=0UL; i<idx; i++ ) {
381 24 : fd_sspeer_private_t * peer = peer_pool_ele( selector->pool, selector->peer_idx_list[ i ] );
382 24 : score_treap_ele_remove( selector->score_treap, peer, selector->pool );
383 24 : peer_pool_ele_release( selector->pool, peer );
384 24 : }
385 :
386 6 : score_treap_t * tmp = selector->score_treap;
387 6 : selector->score_treap = selector->shadow_score_treap;
388 6 : selector->shadow_score_treap = tmp;
389 :
390 : #if FD_SSPEER_SELECTOR_DEBUG
391 : FD_TEST( score_treap_verify( selector->score_treap, selector->pool )==0 );
392 : #endif
393 6 : }
394 :
395 : fd_sscluster_slot_t
396 0 : fd_sspeer_selector_cluster_slot( fd_sspeer_selector_t * selector ) {
397 0 : return selector->cluster_slot;
398 0 : }
|