Line data Source code
1 : #include "fd_sspeer_selector.h"
2 :
3 : struct fd_sspeer_private {
4 : fd_ip4_port_t addr;
5 : fd_ssinfo_t ssinfo;
6 : ulong latency;
7 : ulong score;
8 :
9 : struct {
10 : ulong next;
11 : } pool;
12 :
13 : struct {
14 : ulong next;
15 : ulong prev;
16 : } map;
17 :
18 : struct {
19 : ulong parent;
20 : ulong left;
21 : ulong right;
22 : ulong prio;
23 : } score_treap;
24 : };
25 :
26 : typedef struct fd_sspeer_private fd_sspeer_private_t;
27 :
28 : #define POOL_NAME peer_pool
29 9 : #define POOL_T fd_sspeer_private_t
30 : #define POOL_IDX_T ulong
31 196674 : #define POOL_NEXT pool.next
32 : #include "../../../util/tmpl/fd_pool.c"
33 :
34 : #define MAP_NAME peer_map
35 45 : #define MAP_KEY addr
36 0 : #define MAP_ELE_T fd_sspeer_private_t
37 : #define MAP_KEY_T fd_ip4_port_t
38 105 : #define MAP_PREV map.prev
39 204 : #define MAP_NEXT map.next
40 150 : #define MAP_KEY_EQ(k0,k1) ((k0)->l==(k1)->l)
41 90 : #define MAP_KEY_HASH(key,seed) (seed^(key)->l)
42 : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
43 : #include "../../../util/tmpl/fd_map_chain.c"
44 :
45 72 : #define COMPARE_WORSE(x,y) ( (x)->score<(y)->score )
46 :
47 : #define TREAP_T fd_sspeer_private_t
48 : #define TREAP_NAME score_treap
49 : #define TREAP_QUERY_T void * /* We don't use query ... */
50 : #define TREAP_CMP(a,b) (__extension__({ (void)(a); (void)(b); -1; })) /* which means we don't need to give a real
51 : implementation to cmp either */
52 207 : #define TREAP_IDX_T ulong
53 72 : #define TREAP_LT COMPARE_WORSE
54 183 : #define TREAP_PARENT score_treap.parent
55 222 : #define TREAP_LEFT score_treap.left
56 99 : #define TREAP_RIGHT score_treap.right
57 87 : #define TREAP_PRIO score_treap.prio
58 : #include "../../../util/tmpl/fd_treap.c"
59 :
60 45 : #define DEFAULT_SLOTS_BEHIND (1000UL*1000UL) /* 1,000,000 slots behind */
61 45 : #define DEFAULT_PEER_LATENCY (100L*1000L*1000L) /* 100ms */
62 :
63 : #define FD_SSPEER_SELECTOR_DEBUG 0
64 :
65 : struct fd_sspeer_selector_private {
66 : fd_sspeer_private_t * pool;
67 : peer_map_t * map;
68 : score_treap_t * score_treap;
69 : score_treap_t * shadow_score_treap;
70 : ulong * peer_idx_list;
71 : fd_sscluster_slot_t cluster_slot;
72 : int incremental_snapshot_fetch;
73 :
74 : ulong magic; /* ==FD_SSPEER_SELECTOR_MAGIC */
75 : };
76 :
77 : FD_FN_CONST ulong
78 21 : fd_sspeer_selector_align( void ) {
79 21 : return fd_ulong_max( alignof( fd_sspeer_selector_t), fd_ulong_max( peer_pool_align(), fd_ulong_max( score_treap_align(), alignof(ulong) ) ) );
80 21 : }
81 :
82 : FD_FN_CONST ulong
83 3 : fd_sspeer_selector_footprint( ulong max_peers ) {
84 3 : ulong l;
85 3 : l = FD_LAYOUT_INIT;
86 3 : l = FD_LAYOUT_APPEND( l, alignof(fd_sspeer_selector_t), sizeof(fd_sspeer_selector_t) );
87 3 : l = FD_LAYOUT_APPEND( l, peer_pool_align(), peer_pool_footprint( 2UL*max_peers ) );
88 3 : l = FD_LAYOUT_APPEND( l, peer_map_align(), peer_map_footprint( peer_map_chain_cnt_est( 2UL*max_peers ) ) );
89 3 : l = FD_LAYOUT_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
90 3 : l = FD_LAYOUT_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
91 3 : l = FD_LAYOUT_APPEND( l, alignof(ulong), max_peers * sizeof(ulong) );
92 3 : return FD_LAYOUT_FINI( l, fd_sspeer_selector_align() );
93 3 : }
94 :
95 : void *
96 : fd_sspeer_selector_new( void * shmem,
97 : ulong max_peers,
98 : int incremental_snapshot_fetch,
99 3 : ulong seed ) {
100 3 : if( FD_UNLIKELY( !shmem ) ) {
101 0 : FD_LOG_WARNING(( "NULL shmem" ));
102 0 : return NULL;
103 0 : }
104 :
105 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_sspeer_selector_align() ) ) ) {
106 0 : FD_LOG_WARNING(( "unaligned shmem" ));
107 0 : return NULL;
108 0 : }
109 :
110 3 : if( FD_UNLIKELY( max_peers < 1UL ) ) {
111 0 : FD_LOG_WARNING(( "max_peers must be at least 1" ));
112 0 : return NULL;
113 0 : }
114 :
115 3 : FD_SCRATCH_ALLOC_INIT( l, shmem );
116 3 : fd_sspeer_selector_t * selector = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sspeer_selector_t), sizeof(fd_sspeer_selector_t) );
117 3 : void * _pool = FD_SCRATCH_ALLOC_APPEND( l, peer_pool_align(), peer_pool_footprint( 2UL*max_peers ) );
118 3 : void * _map = FD_SCRATCH_ALLOC_APPEND( l, peer_map_align(), peer_map_footprint( peer_map_chain_cnt_est( 2UL*max_peers ) ) );
119 3 : void * _score_treap = FD_SCRATCH_ALLOC_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
120 3 : void * _shadow_score_treap = FD_SCRATCH_ALLOC_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
121 3 : void * _peer_idx_list = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), max_peers * sizeof(ulong) );
122 :
123 0 : selector->pool = peer_pool_join( peer_pool_new( _pool, max_peers ) );
124 3 : selector->map = peer_map_join( peer_map_new( _map, peer_map_chain_cnt_est( 2UL*max_peers ), seed ) );
125 3 : selector->score_treap = score_treap_join( score_treap_new( _score_treap, max_peers ) );
126 3 : selector->shadow_score_treap = score_treap_join( score_treap_new( _shadow_score_treap, max_peers ) );
127 3 : selector->peer_idx_list = (ulong *)_peer_idx_list;
128 :
129 3 : selector->cluster_slot.full = 0UL;
130 3 : selector->cluster_slot.incremental = 0UL;
131 3 : selector->incremental_snapshot_fetch = incremental_snapshot_fetch;
132 :
133 3 : FD_COMPILER_MFENCE();
134 3 : FD_VOLATILE( selector->magic ) = FD_SSPEER_SELECTOR_MAGIC;
135 3 : FD_COMPILER_MFENCE();
136 :
137 3 : return (void *)selector;
138 3 : }
139 :
140 : fd_sspeer_selector_t *
141 3 : fd_sspeer_selector_join( void * shselector ) {
142 3 : if( FD_UNLIKELY( !shselector ) ) {
143 0 : FD_LOG_WARNING(( "NULL shselector" ));
144 0 : return NULL;
145 0 : }
146 :
147 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shselector, fd_sspeer_selector_align() ) ) ) {
148 0 : FD_LOG_WARNING(( "misaligned shselector" ));
149 0 : return NULL;
150 0 : }
151 :
152 3 : fd_sspeer_selector_t * selector = (fd_sspeer_selector_t *)shselector;
153 :
154 3 : if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
155 0 : FD_LOG_WARNING(( "bad magic" ));
156 0 : return NULL;
157 0 : }
158 :
159 3 : return selector;
160 3 : }
161 :
162 : void *
163 3 : fd_sspeer_selector_leave( fd_sspeer_selector_t * selector ) {
164 3 : if( FD_UNLIKELY( !selector ) ) {
165 0 : FD_LOG_WARNING(( "NULL selector" ));
166 0 : return NULL;
167 0 : }
168 :
169 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)selector, fd_sspeer_selector_align() ) ) ) {
170 0 : FD_LOG_WARNING(( "misaligned selector" ));
171 0 : return NULL;
172 0 : }
173 :
174 3 : if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
175 0 : FD_LOG_WARNING(( "bad magic" ));
176 0 : return NULL;
177 0 : }
178 :
179 3 : selector->pool = peer_pool_leave( selector->pool );
180 3 : selector->map = peer_map_leave( selector->map );
181 3 : selector->score_treap = score_treap_leave( selector->score_treap );
182 3 : selector->shadow_score_treap = score_treap_leave( selector->shadow_score_treap );
183 :
184 3 : return (void *)selector;
185 3 : }
186 :
187 : void *
188 3 : fd_sspeer_selector_delete( void * shselector ) {
189 3 : if( FD_UNLIKELY( !shselector ) ) {
190 0 : FD_LOG_WARNING(( "NULL shselector" ));
191 0 : return NULL;
192 0 : }
193 :
194 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shselector, fd_sspeer_selector_align() ) ) ) {
195 0 : FD_LOG_WARNING(( "misaligned shselector" ));
196 0 : return NULL;
197 0 : }
198 :
199 3 : fd_sspeer_selector_t * selector = (fd_sspeer_selector_t *)shselector;
200 :
201 3 : if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
202 0 : FD_LOG_WARNING(( "bad magic" ));
203 0 : return NULL;
204 0 : }
205 :
206 3 : selector->pool = peer_pool_delete( selector->pool );
207 3 : selector->map = peer_map_delete( selector->map );
208 3 : selector->score_treap = score_treap_delete( selector->score_treap );
209 3 : selector->shadow_score_treap = score_treap_delete( selector->shadow_score_treap );
210 :
211 3 : FD_COMPILER_MFENCE();
212 3 : FD_VOLATILE( selector->magic ) = 0UL;
213 3 : FD_COMPILER_MFENCE();
214 :
215 3 : return (void *)selector;
216 3 : }
217 :
218 : /* Calculates a score for a peer given its latency and ssinfo */
219 : ulong
220 : fd_sspeer_selector_score( fd_sspeer_selector_t * selector,
221 : ulong peer_latency,
222 45 : fd_ssinfo_t const * ssinfo ) {
223 45 : static const ulong slots_behind_penalty = 1000UL;
224 45 : ulong slot = ULONG_MAX;
225 45 : ulong slots_behind = DEFAULT_SLOTS_BEHIND;
226 45 : peer_latency = peer_latency!=ULONG_MAX ? peer_latency : DEFAULT_PEER_LATENCY;
227 :
228 45 : if( FD_LIKELY( ssinfo && ssinfo->full.slot!=ULONG_MAX ) ) {
229 39 : if( FD_UNLIKELY( ssinfo->incremental.slot==ULONG_MAX ) ) {
230 0 : slot = ssinfo->full.slot;
231 0 : slots_behind = selector->cluster_slot.full>slot ? selector->cluster_slot.full - slot : 0UL;
232 39 : } else {
233 39 : slot = ssinfo->incremental.slot;
234 39 : slots_behind = selector->cluster_slot.incremental>slot ? selector->cluster_slot.incremental - slot : 0UL;
235 39 : }
236 39 : }
237 :
238 : /* TODO: come up with a better/more dynamic score function */
239 45 : return peer_latency + slots_behind_penalty*slots_behind;
240 45 : }
241 :
242 : /* Updates a peer's score with new values for latency and/or ssinfo */
243 : static void
244 : fd_sspeer_selector_update( fd_sspeer_selector_t * selector,
245 : fd_sspeer_private_t * peer,
246 : ulong latency,
247 0 : fd_ssinfo_t const * ssinfo ) {
248 0 : score_treap_ele_remove( selector->score_treap, peer, selector->pool );
249 :
250 0 : ulong peer_latency = latency!=ULONG_MAX ? latency : peer->latency;
251 0 : fd_ssinfo_t const * peer_ssinfo = ssinfo ? ssinfo : &peer->ssinfo;
252 :
253 0 : peer->score = fd_sspeer_selector_score( selector, peer_latency, peer_ssinfo );
254 :
255 0 : if( FD_LIKELY( ssinfo ) ) {
256 0 : peer->ssinfo = *ssinfo;
257 0 : }
258 :
259 0 : if( FD_LIKELY( latency!=ULONG_MAX ) ) {
260 0 : peer->latency = latency;
261 0 : }
262 :
263 0 : score_treap_ele_insert( selector->score_treap, peer, selector->pool );
264 0 : }
265 :
266 : ulong
267 : fd_sspeer_selector_add( fd_sspeer_selector_t * selector,
268 : fd_ip4_port_t addr,
269 : ulong latency,
270 21 : fd_ssinfo_t const * ssinfo ) {
271 21 : fd_sspeer_private_t * peer = peer_map_ele_query( selector->map, &addr, NULL, selector->pool );
272 21 : if( FD_LIKELY( peer ) ) {
273 0 : fd_sspeer_selector_update( selector, peer, latency, ssinfo );
274 21 : } else {
275 21 : if( FD_UNLIKELY( !peer_pool_free( selector->pool ) ) ) return ULONG_MAX;
276 :
277 21 : peer = peer_pool_ele_acquire( selector->pool );
278 21 : FD_TEST( peer );
279 21 : if( FD_LIKELY( ssinfo ) ) {
280 18 : peer->ssinfo = *ssinfo;
281 18 : } else {
282 3 : peer->ssinfo.full.slot = ULONG_MAX;
283 3 : peer->ssinfo.incremental.slot = ULONG_MAX;
284 3 : peer->ssinfo.incremental.base_slot = ULONG_MAX;
285 3 : }
286 :
287 21 : peer->addr = addr;
288 21 : peer->latency = latency;
289 21 : peer->score = fd_sspeer_selector_score( selector, latency, ssinfo );
290 21 : peer_map_ele_insert( selector->map, peer, selector->pool );
291 21 : score_treap_ele_insert( selector->score_treap, peer, selector->pool );
292 21 : }
293 21 : return peer->score;
294 21 : }
295 :
296 : void
297 : fd_sspeer_selector_remove( fd_sspeer_selector_t * selector,
298 0 : fd_ip4_port_t addr ) {
299 0 : fd_sspeer_private_t * peer = peer_map_ele_query( selector->map, &addr, NULL, selector->pool );
300 0 : if( FD_UNLIKELY( !peer ) ) return;
301 :
302 0 : score_treap_ele_remove( selector->score_treap, peer, selector->pool );
303 0 : peer_map_ele_remove_fast( selector->map, peer, selector->pool );
304 0 : peer_pool_ele_release( selector->pool, peer );
305 0 : }
306 :
307 : fd_sspeer_t
308 : fd_sspeer_selector_best( fd_sspeer_selector_t * selector,
309 : int incremental,
310 24 : ulong base_slot ) {
311 24 : if( FD_UNLIKELY( incremental ) ) {
312 9 : FD_TEST( base_slot!=ULONG_MAX );
313 9 : }
314 :
315 24 : for( score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( selector->score_treap, selector->pool );
316 30 : !score_treap_fwd_iter_done( iter );
317 30 : iter = score_treap_fwd_iter_next( iter, selector->pool ) ) {
318 30 : fd_sspeer_private_t const * peer = score_treap_fwd_iter_ele_const( iter, selector->pool );
319 30 : if( FD_LIKELY( peer->ssinfo.full.slot!=ULONG_MAX &&
320 30 : (!incremental ||
321 30 : (incremental && peer->ssinfo.incremental.base_slot==base_slot) ) ) ) {
322 24 : return (fd_sspeer_t){
323 24 : .addr = peer->addr,
324 24 : .ssinfo = peer->ssinfo,
325 24 : .score = peer->score,
326 24 : };
327 24 : }
328 30 : }
329 :
330 0 : return (fd_sspeer_t){
331 0 : .addr={ .l=0UL },
332 0 : .ssinfo={ .full={ .slot=ULONG_MAX }, .incremental={ .base_slot=ULONG_MAX, .slot=ULONG_MAX } },
333 0 : .score=ULONG_MAX,
334 0 : };
335 24 : }
336 :
337 : void
338 : fd_sspeer_selector_process_cluster_slot( fd_sspeer_selector_t * selector,
339 : ulong full_slot,
340 9 : ulong incremental_slot ) {
341 9 : if( full_slot==ULONG_MAX && incremental_slot==ULONG_MAX ) return;
342 :
343 9 : FD_TEST( full_slot!=ULONG_MAX );
344 9 : if( FD_LIKELY( selector->incremental_snapshot_fetch ) ) {
345 : /* incremental slot is less than or equal to cluster incremental slot */
346 9 : if( FD_UNLIKELY( incremental_slot!=ULONG_MAX && selector->cluster_slot.incremental!=ULONG_MAX && incremental_slot<=selector->cluster_slot.incremental ) ) return;
347 : /* incremental slot is less than or equal to cluster full slot when cluster incremental slot does not exist */
348 9 : else if( FD_UNLIKELY( incremental_slot!=ULONG_MAX && selector->cluster_slot.incremental==ULONG_MAX && incremental_slot<=selector->cluster_slot.full ) ) return;
349 : /* full slot is less than cluster full slot when incremental slot does not exist */
350 9 : else if( FD_UNLIKELY( incremental_slot==ULONG_MAX && full_slot<=selector->cluster_slot.full ) ) return;
351 9 : } else {
352 0 : if( FD_UNLIKELY( full_slot<=selector->cluster_slot.full ) ) return;
353 0 : }
354 :
355 9 : selector->cluster_slot.full = full_slot;
356 9 : selector->cluster_slot.incremental = incremental_slot;
357 :
358 9 : if( FD_UNLIKELY( score_treap_ele_cnt( selector->score_treap )==0UL ) ) return;
359 :
360 : /* Rescore all peers
361 : TODO: make more performant, maybe make a treap rebalance API */
362 6 : ulong idx = 0UL;
363 6 : for( score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( selector->score_treap, selector->pool );
364 30 : !score_treap_fwd_iter_done( iter );
365 24 : iter = score_treap_fwd_iter_next( iter, selector->pool ) ) {
366 24 : fd_sspeer_private_t const * peer = score_treap_fwd_iter_ele_const( iter, selector->pool );
367 24 : fd_sspeer_private_t * shadow_peer = peer_pool_ele_acquire( selector->pool );
368 24 : shadow_peer->latency = peer->latency;
369 24 : shadow_peer->ssinfo = peer->ssinfo;
370 24 : shadow_peer->addr = peer->addr;
371 24 : shadow_peer->score = fd_sspeer_selector_score( selector, shadow_peer->latency, &shadow_peer->ssinfo );
372 24 : score_treap_ele_insert( selector->shadow_score_treap, shadow_peer, selector->pool );
373 24 : selector->peer_idx_list[ idx++ ] = peer_pool_idx( selector->pool, peer );
374 24 : peer_map_ele_remove( selector->map, &peer->addr, NULL, selector->pool );
375 24 : peer_map_ele_insert( selector->map, shadow_peer, selector->pool );
376 24 : }
377 :
378 : /* clear score treap*/
379 30 : for( ulong i=0UL; i<idx; i++ ) {
380 24 : fd_sspeer_private_t * peer = peer_pool_ele( selector->pool, selector->peer_idx_list[ i ] );
381 24 : score_treap_ele_remove( selector->score_treap, peer, selector->pool );
382 24 : peer_pool_ele_release( selector->pool, peer );
383 24 : }
384 :
385 6 : score_treap_t * tmp = selector->score_treap;
386 6 : selector->score_treap = selector->shadow_score_treap;
387 6 : selector->shadow_score_treap = tmp;
388 :
389 : #if FD_SSPEER_SELECTOR_DEBUG
390 : FD_TEST( score_treap_verify( selector->score_treap, selector->pool )==0 );
391 : #endif
392 6 : }
393 :
394 : fd_sscluster_slot_t
395 0 : fd_sspeer_selector_cluster_slot( fd_sspeer_selector_t * selector ) {
396 0 : return selector->cluster_slot;
397 0 : }
|