Line data Source code
1 : #include "fd_sspeer_selector.h"
2 :
3 : struct fd_sspeer_private {
4 : fd_ip4_port_t addr;
5 : ulong full_slot;
6 : ulong incr_slot;
7 : uchar full_hash[ FD_HASH_FOOTPRINT ];
8 : uchar incr_hash[ FD_HASH_FOOTPRINT ];
9 : ulong latency;
10 : ulong score;
11 : int valid;
12 :
13 : struct {
14 : ulong next;
15 : } pool;
16 :
17 : struct {
18 : ulong next;
19 : ulong prev;
20 : } map;
21 :
22 : struct {
23 : ulong parent;
24 : ulong left;
25 : ulong right;
26 : ulong prio;
27 : } score_treap;
28 : };
29 :
30 : typedef struct fd_sspeer_private fd_sspeer_private_t;
31 :
32 : #define POOL_NAME peer_pool
33 9 : #define POOL_T fd_sspeer_private_t
34 : #define POOL_IDX_T ulong
35 196674 : #define POOL_NEXT pool.next
36 : #include "../../../util/tmpl/fd_pool.c"
37 :
38 : #define MAP_NAME peer_map
39 45 : #define MAP_KEY addr
40 0 : #define MAP_ELE_T fd_sspeer_private_t
41 : #define MAP_KEY_T fd_ip4_port_t
42 105 : #define MAP_PREV map.prev
43 204 : #define MAP_NEXT map.next
44 150 : #define MAP_KEY_EQ(k0,k1) ((k0)->l==(k1)->l)
45 90 : #define MAP_KEY_HASH(key,seed) (seed^(key)->l)
46 : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
47 : #include "../../../util/tmpl/fd_map_chain.c"
48 :
49 72 : #define COMPARE_WORSE(x,y) ( (x)->score<(y)->score )
50 :
51 : #define TREAP_T fd_sspeer_private_t
52 : #define TREAP_NAME score_treap
53 : #define TREAP_QUERY_T void * /* We don't use query ... */
54 : #define TREAP_CMP(a,b) (__extension__({ (void)(a); (void)(b); -1; })) /* which means we don't need to give a real
55 : implementation to cmp either */
56 207 : #define TREAP_IDX_T ulong
57 72 : #define TREAP_LT COMPARE_WORSE
58 183 : #define TREAP_PARENT score_treap.parent
59 222 : #define TREAP_LEFT score_treap.left
60 99 : #define TREAP_RIGHT score_treap.right
61 87 : #define TREAP_PRIO score_treap.prio
62 : #include "../../../util/tmpl/fd_treap.c"
63 :
64 45 : #define DEFAULT_SLOTS_BEHIND (1000UL*1000UL) /* 1,000,000 slots behind */
65 45 : #define DEFAULT_PEER_LATENCY (100L*1000L*1000L) /* 100ms */
66 :
67 : #define FD_SSPEER_SELECTOR_DEBUG 0
68 :
69 : struct fd_sspeer_selector_private {
70 : fd_sspeer_private_t * pool;
71 : peer_map_t * map;
72 : score_treap_t * score_treap;
73 : score_treap_t * shadow_score_treap;
74 : ulong * peer_idx_list;
75 : fd_sscluster_slot_t cluster_slot;
76 : int incremental_snapshot_fetch;
77 :
78 : ulong magic; /* ==FD_SSPEER_SELECTOR_MAGIC */
79 : };
80 :
81 : FD_FN_CONST ulong
82 21 : fd_sspeer_selector_align( void ) {
83 21 : return fd_ulong_max( alignof( fd_sspeer_selector_t), fd_ulong_max( peer_pool_align(), fd_ulong_max( score_treap_align(), alignof(ulong) ) ) );
84 21 : }
85 :
86 : FD_FN_CONST ulong
87 3 : fd_sspeer_selector_footprint( ulong max_peers ) {
88 3 : ulong l;
89 3 : l = FD_LAYOUT_INIT;
90 3 : l = FD_LAYOUT_APPEND( l, alignof(fd_sspeer_selector_t), sizeof(fd_sspeer_selector_t) );
91 3 : l = FD_LAYOUT_APPEND( l, peer_pool_align(), peer_pool_footprint( 2UL*max_peers ) );
92 3 : l = FD_LAYOUT_APPEND( l, peer_map_align(), peer_map_footprint( peer_map_chain_cnt_est( 2UL*max_peers ) ) );
93 3 : l = FD_LAYOUT_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
94 3 : l = FD_LAYOUT_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
95 3 : l = FD_LAYOUT_APPEND( l, alignof(ulong), max_peers * sizeof(ulong) );
96 3 : return FD_LAYOUT_FINI( l, fd_sspeer_selector_align() );
97 3 : }
98 :
99 : void *
100 : fd_sspeer_selector_new( void * shmem,
101 : ulong max_peers,
102 : int incremental_snapshot_fetch,
103 3 : ulong seed ) {
104 3 : if( FD_UNLIKELY( !shmem ) ) {
105 0 : FD_LOG_WARNING(( "NULL shmem" ));
106 0 : return NULL;
107 0 : }
108 :
109 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_sspeer_selector_align() ) ) ) {
110 0 : FD_LOG_WARNING(( "unaligned shmem" ));
111 0 : return NULL;
112 0 : }
113 :
114 3 : if( FD_UNLIKELY( max_peers < 1UL ) ) {
115 0 : FD_LOG_WARNING(( "max_peers must be at least 1" ));
116 0 : return NULL;
117 0 : }
118 :
119 3 : FD_SCRATCH_ALLOC_INIT( l, shmem );
120 3 : fd_sspeer_selector_t * selector = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sspeer_selector_t), sizeof(fd_sspeer_selector_t) );
121 3 : void * _pool = FD_SCRATCH_ALLOC_APPEND( l, peer_pool_align(), peer_pool_footprint( 2UL*max_peers ) );
122 3 : void * _map = FD_SCRATCH_ALLOC_APPEND( l, peer_map_align(), peer_map_footprint( peer_map_chain_cnt_est( 2UL*max_peers ) ) );
123 3 : void * _score_treap = FD_SCRATCH_ALLOC_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
124 3 : void * _shadow_score_treap = FD_SCRATCH_ALLOC_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
125 3 : void * _peer_idx_list = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), max_peers * sizeof(ulong) );
126 :
127 0 : selector->pool = peer_pool_join( peer_pool_new( _pool, max_peers ) );
128 3 : selector->map = peer_map_join( peer_map_new( _map, peer_map_chain_cnt_est( 2UL*max_peers ), seed ) );
129 3 : selector->score_treap = score_treap_join( score_treap_new( _score_treap, max_peers ) );
130 3 : selector->shadow_score_treap = score_treap_join( score_treap_new( _shadow_score_treap, max_peers ) );
131 3 : selector->peer_idx_list = (ulong *)_peer_idx_list;
132 :
133 3 : selector->cluster_slot.full = 0UL;
134 3 : selector->cluster_slot.incremental = 0UL;
135 3 : selector->incremental_snapshot_fetch = incremental_snapshot_fetch;
136 :
137 3 : FD_COMPILER_MFENCE();
138 3 : FD_VOLATILE( selector->magic ) = FD_SSPEER_SELECTOR_MAGIC;
139 3 : FD_COMPILER_MFENCE();
140 :
141 3 : return (void *)selector;
142 3 : }
143 :
144 : fd_sspeer_selector_t *
145 3 : fd_sspeer_selector_join( void * shselector ) {
146 3 : if( FD_UNLIKELY( !shselector ) ) {
147 0 : FD_LOG_WARNING(( "NULL shselector" ));
148 0 : return NULL;
149 0 : }
150 :
151 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shselector, fd_sspeer_selector_align() ) ) ) {
152 0 : FD_LOG_WARNING(( "misaligned shselector" ));
153 0 : return NULL;
154 0 : }
155 :
156 3 : fd_sspeer_selector_t * selector = (fd_sspeer_selector_t *)shselector;
157 :
158 3 : if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
159 0 : FD_LOG_WARNING(( "bad magic" ));
160 0 : return NULL;
161 0 : }
162 :
163 3 : return selector;
164 3 : }
165 :
166 : void *
167 3 : fd_sspeer_selector_leave( fd_sspeer_selector_t * selector ) {
168 3 : if( FD_UNLIKELY( !selector ) ) {
169 0 : FD_LOG_WARNING(( "NULL selector" ));
170 0 : return NULL;
171 0 : }
172 :
173 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)selector, fd_sspeer_selector_align() ) ) ) {
174 0 : FD_LOG_WARNING(( "misaligned selector" ));
175 0 : return NULL;
176 0 : }
177 :
178 3 : if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
179 0 : FD_LOG_WARNING(( "bad magic" ));
180 0 : return NULL;
181 0 : }
182 :
183 3 : selector->pool = peer_pool_leave( selector->pool );
184 3 : selector->map = peer_map_leave( selector->map );
185 3 : selector->score_treap = score_treap_leave( selector->score_treap );
186 3 : selector->shadow_score_treap = score_treap_leave( selector->shadow_score_treap );
187 :
188 3 : return (void *)selector;
189 3 : }
190 :
191 : void *
192 3 : fd_sspeer_selector_delete( void * shselector ) {
193 3 : if( FD_UNLIKELY( !shselector ) ) {
194 0 : FD_LOG_WARNING(( "NULL shselector" ));
195 0 : return NULL;
196 0 : }
197 :
198 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shselector, fd_sspeer_selector_align() ) ) ) {
199 0 : FD_LOG_WARNING(( "misaligned shselector" ));
200 0 : return NULL;
201 0 : }
202 :
203 3 : fd_sspeer_selector_t * selector = (fd_sspeer_selector_t *)shselector;
204 :
205 3 : if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
206 0 : FD_LOG_WARNING(( "bad magic" ));
207 0 : return NULL;
208 0 : }
209 :
210 3 : selector->pool = peer_pool_delete( selector->pool );
211 3 : selector->map = peer_map_delete( selector->map );
212 3 : selector->score_treap = score_treap_delete( selector->score_treap );
213 3 : selector->shadow_score_treap = score_treap_delete( selector->shadow_score_treap );
214 :
215 3 : FD_COMPILER_MFENCE();
216 3 : FD_VOLATILE( selector->magic ) = 0UL;
217 3 : FD_COMPILER_MFENCE();
218 :
219 3 : return (void *)selector;
220 3 : }
221 :
222 : /* Calculates a score for a peer given its latency and its resolved
223 : full and incremental slots */
224 : ulong
225 : fd_sspeer_selector_score( fd_sspeer_selector_t * selector,
226 : ulong peer_latency,
227 : ulong full_slot,
228 45 : ulong incr_slot ) {
229 45 : static const ulong slots_behind_penalty = 1000UL;
230 45 : ulong slot = ULONG_MAX;
231 45 : ulong slots_behind = DEFAULT_SLOTS_BEHIND;
232 45 : peer_latency = peer_latency!=ULONG_MAX ? peer_latency : DEFAULT_PEER_LATENCY;
233 :
234 45 : if( FD_LIKELY( full_slot!=ULONG_MAX ) ) {
235 39 : if( FD_UNLIKELY( incr_slot==ULONG_MAX ) ) {
236 0 : slot = full_slot;
237 0 : slots_behind = selector->cluster_slot.full>slot ? selector->cluster_slot.full - slot : 0UL;
238 39 : } else {
239 39 : slot = incr_slot;
240 39 : slots_behind = selector->cluster_slot.incremental>slot ? selector->cluster_slot.incremental - slot : 0UL;
241 39 : }
242 39 : }
243 :
244 : /* TODO: come up with a better/more dynamic score function */
245 45 : return peer_latency + slots_behind_penalty*slots_behind;
246 45 : }
247 :
248 : /* Updates a peer's score with new values for latency and/or resolved
249 : full/incremental slots */
250 : static void
251 : fd_sspeer_selector_update( fd_sspeer_selector_t * selector,
252 : fd_sspeer_private_t * peer,
253 : ulong latency,
254 : ulong full_slot,
255 : ulong incr_slot,
256 : uchar const full_hash[ FD_HASH_FOOTPRINT ],
257 0 : uchar const incr_hash[ FD_HASH_FOOTPRINT ] ) {
258 0 : score_treap_ele_remove( selector->score_treap, peer, selector->pool );
259 :
260 0 : ulong peer_latency = latency!=ULONG_MAX ? latency : peer->latency;
261 0 : peer->score = fd_sspeer_selector_score( selector, peer_latency, full_slot, incr_slot );
262 :
263 0 : peer->full_slot = full_slot!=ULONG_MAX ? full_slot : peer->full_slot;
264 0 : peer->incr_slot = incr_slot!=ULONG_MAX ? incr_slot : peer->incr_slot;
265 0 : if( FD_LIKELY( full_hash ) ) fd_memcpy( peer->full_hash, full_hash, FD_HASH_FOOTPRINT );
266 0 : if( FD_LIKELY( incr_hash ) ) fd_memcpy( peer->incr_hash, incr_hash, FD_HASH_FOOTPRINT );
267 :
268 0 : if( FD_LIKELY( latency!=ULONG_MAX ) ) {
269 0 : peer->latency = latency;
270 0 : }
271 :
272 0 : score_treap_ele_insert( selector->score_treap, peer, selector->pool );
273 0 : }
274 :
275 : ulong
276 : fd_sspeer_selector_add( fd_sspeer_selector_t * selector,
277 : fd_ip4_port_t addr,
278 : ulong latency,
279 : ulong full_slot,
280 : ulong incr_slot,
281 : uchar const full_hash[ FD_HASH_FOOTPRINT ],
282 21 : uchar const incr_hash[ FD_HASH_FOOTPRINT ] ) {
283 21 : fd_sspeer_private_t * peer = peer_map_ele_query( selector->map, &addr, NULL, selector->pool );
284 21 : if( FD_LIKELY( peer ) ) {
285 0 : fd_sspeer_selector_update( selector, peer, latency, full_slot, incr_slot, full_hash, incr_hash );
286 21 : } else {
287 21 : if( FD_UNLIKELY( !peer_pool_free( selector->pool ) ) ) return ULONG_MAX;
288 :
289 21 : peer = peer_pool_ele_acquire( selector->pool );
290 21 : peer->addr = addr;
291 21 : peer->latency = latency;
292 21 : peer->score = fd_sspeer_selector_score( selector, latency, full_slot, incr_slot );
293 21 : peer->full_slot = full_slot;
294 21 : peer->incr_slot = incr_slot;
295 21 : if( FD_LIKELY( full_hash ) ) fd_memcpy( peer->full_hash, full_hash, FD_HASH_FOOTPRINT );
296 21 : if( FD_LIKELY( incr_hash ) ) fd_memcpy( peer->incr_hash, incr_hash, FD_HASH_FOOTPRINT );
297 21 : peer_map_ele_insert( selector->map, peer, selector->pool );
298 21 : score_treap_ele_insert( selector->score_treap, peer, selector->pool );
299 21 : }
300 21 : peer->valid = peer->latency!=ULONG_MAX && peer->full_slot!=ULONG_MAX;
301 21 : return peer->score;
302 21 : }
303 :
304 : void
305 : fd_sspeer_selector_remove( fd_sspeer_selector_t * selector,
306 0 : fd_ip4_port_t addr ) {
307 0 : fd_sspeer_private_t * peer = peer_map_ele_query( selector->map, &addr, NULL, selector->pool );
308 0 : if( FD_UNLIKELY( !peer ) ) return;
309 :
310 0 : score_treap_ele_remove( selector->score_treap, peer, selector->pool );
311 0 : peer_map_ele_remove_fast( selector->map, peer, selector->pool );
312 0 : peer_pool_ele_release( selector->pool, peer );
313 0 : }
314 :
315 : fd_sspeer_t
316 : fd_sspeer_selector_best( fd_sspeer_selector_t * selector,
317 : int incremental,
318 24 : ulong base_slot ) {
319 24 : if( FD_UNLIKELY( incremental ) ) {
320 9 : FD_TEST( base_slot!=ULONG_MAX );
321 9 : }
322 :
323 24 : for( score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( selector->score_treap, selector->pool );
324 30 : !score_treap_fwd_iter_done( iter );
325 30 : iter = score_treap_fwd_iter_next( iter, selector->pool ) ) {
326 30 : fd_sspeer_private_t const * peer = score_treap_fwd_iter_ele_const( iter, selector->pool );
327 30 : if( FD_LIKELY( peer->valid &&
328 30 : (!incremental ||
329 30 : (incremental && peer->full_slot==base_slot) ) ) ) {
330 24 : fd_sspeer_t best = {
331 24 : .addr = peer->addr,
332 24 : .full_slot = peer->full_slot,
333 24 : .incr_slot = peer->incr_slot,
334 24 : .score = peer->score,
335 24 : };
336 24 : fd_memcpy( best.full_hash, peer->full_hash, FD_HASH_FOOTPRINT );
337 24 : fd_memcpy( best.incr_hash, peer->incr_hash, FD_HASH_FOOTPRINT );
338 24 : return best;
339 24 : }
340 30 : }
341 :
342 0 : return (fd_sspeer_t){
343 0 : .addr = { .l=0UL },
344 0 : .full_slot = ULONG_MAX,
345 0 : .incr_slot = ULONG_MAX,
346 0 : .score = ULONG_MAX,
347 0 : .full_hash = {0},
348 0 : .incr_hash = {0},
349 0 : };
350 24 : }
351 :
352 : void
353 : fd_sspeer_selector_process_cluster_slot( fd_sspeer_selector_t * selector,
354 : ulong full_slot,
355 9 : ulong incr_slot ) {
356 9 : if( full_slot==ULONG_MAX && incr_slot==ULONG_MAX ) return;
357 :
358 9 : FD_TEST( full_slot!=ULONG_MAX );
359 9 : if( FD_LIKELY( selector->incremental_snapshot_fetch ) ) {
360 : /* incremental slot is less than or equal to cluster incremental slot */
361 9 : if( FD_UNLIKELY( incr_slot!=ULONG_MAX && selector->cluster_slot.incremental!=ULONG_MAX && incr_slot<=selector->cluster_slot.incremental ) ) return;
362 : /* incremental slot is less than or equal to cluster full slot when cluster incremental slot does not exist */
363 9 : else if( FD_UNLIKELY( incr_slot!=ULONG_MAX && selector->cluster_slot.incremental==ULONG_MAX && incr_slot<=selector->cluster_slot.full ) ) return;
364 : /* full slot is less than cluster full slot when incremental slot does not exist */
365 9 : else if( FD_UNLIKELY( incr_slot==ULONG_MAX && full_slot<=selector->cluster_slot.full ) ) return;
366 9 : } else {
367 0 : if( FD_UNLIKELY( full_slot<=selector->cluster_slot.full ) ) return;
368 0 : }
369 :
370 9 : selector->cluster_slot.full = full_slot;
371 9 : selector->cluster_slot.incremental = incr_slot;
372 :
373 9 : if( FD_UNLIKELY( score_treap_ele_cnt( selector->score_treap )==0UL ) ) return;
374 :
375 : /* Rescore all peers
376 : TODO: make more performant, maybe make a treap rebalance API */
377 6 : ulong idx = 0UL;
378 6 : for( score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( selector->score_treap, selector->pool );
379 30 : !score_treap_fwd_iter_done( iter );
380 24 : iter = score_treap_fwd_iter_next( iter, selector->pool ) ) {
381 24 : fd_sspeer_private_t const * peer = score_treap_fwd_iter_ele_const( iter, selector->pool );
382 24 : fd_sspeer_private_t * shadow_peer = peer_pool_ele_acquire( selector->pool );
383 24 : shadow_peer->latency = peer->latency;
384 24 : shadow_peer->full_slot = peer->full_slot;
385 24 : shadow_peer->incr_slot = peer->incr_slot;
386 24 : shadow_peer->addr = peer->addr;
387 24 : shadow_peer->score = fd_sspeer_selector_score( selector, shadow_peer->latency, shadow_peer->full_slot, shadow_peer->incr_slot );
388 24 : shadow_peer->valid = peer->valid;
389 24 : fd_memcpy( shadow_peer->full_hash, peer->full_hash, FD_HASH_FOOTPRINT );
390 24 : fd_memcpy( shadow_peer->incr_hash, peer->incr_hash, FD_HASH_FOOTPRINT );
391 24 : score_treap_ele_insert( selector->shadow_score_treap, shadow_peer, selector->pool );
392 24 : selector->peer_idx_list[ idx++ ] = peer_pool_idx( selector->pool, peer );
393 24 : peer_map_ele_remove( selector->map, &peer->addr, NULL, selector->pool );
394 24 : peer_map_ele_insert( selector->map, shadow_peer, selector->pool );
395 24 : }
396 :
397 : /* clear score treap*/
398 30 : for( ulong i=0UL; i<idx; i++ ) {
399 24 : fd_sspeer_private_t * peer = peer_pool_ele( selector->pool, selector->peer_idx_list[ i ] );
400 24 : score_treap_ele_remove( selector->score_treap, peer, selector->pool );
401 24 : peer_pool_ele_release( selector->pool, peer );
402 24 : }
403 :
404 6 : score_treap_t * tmp = selector->score_treap;
405 6 : selector->score_treap = selector->shadow_score_treap;
406 6 : selector->shadow_score_treap = tmp;
407 :
408 : #if FD_SSPEER_SELECTOR_DEBUG
409 : FD_TEST( score_treap_verify( selector->score_treap, selector->pool )==0 );
410 : #endif
411 6 : }
412 :
413 : fd_sscluster_slot_t
414 0 : fd_sspeer_selector_cluster_slot( fd_sspeer_selector_t * selector ) {
415 0 : return selector->cluster_slot;
416 0 : }
|