Line data Source code
1 : #include "fd_sspeer_selector.h"
2 : #include "../../../util/bits/fd_sat.h"
3 : #include "../../../util/log/fd_log.h"
4 :
5 : struct fd_sspeer_private {
6 : fd_sspeer_key_t key;
7 : fd_ip4_port_t addr;
8 : ulong full_slot;
9 : ulong incr_slot;
10 : uchar full_hash[ FD_HASH_FOOTPRINT ];
11 : uchar incr_hash[ FD_HASH_FOOTPRINT ];
12 : ulong latency;
13 : ulong score;
14 : int valid;
15 :
16 : struct {
17 : ulong next;
18 : } pool;
19 :
20 : struct {
21 : ulong next;
22 : ulong prev;
23 : } map_by_key;
24 :
25 : struct {
26 : ulong next;
27 : ulong prev;
28 : } map_by_addr;
29 :
30 : struct {
31 : ulong parent;
32 : ulong left;
33 : ulong right;
34 : ulong prio;
35 : } score_treap;
36 : };
37 :
38 : typedef struct fd_sspeer_private fd_sspeer_private_t;
39 :
40 : #define POOL_NAME peer_pool
41 372 : #define POOL_T fd_sspeer_private_t
42 : #define POOL_IDX_T ulong
43 14356341 : #define POOL_NEXT pool.next
44 : #include "../../../util/tmpl/fd_pool.c"
45 :
46 : #define MAP_NAME peer_map_by_key
47 1611 : #define MAP_KEY key
48 813 : #define MAP_ELE_T fd_sspeer_private_t
49 : #define MAP_KEY_T fd_sspeer_key_t
50 1092 : #define MAP_PREV map_by_key.prev
51 2490 : #define MAP_NEXT map_by_key.next
52 681 : #define MAP_KEY_EQ(k0,k1) (fd_sspeer_key_eq(k0,k1))
53 2766 : #define MAP_KEY_HASH(key,seed) (fd_sspeer_key_hash(key,seed))
54 : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
55 : #include "../../../util/tmpl/fd_map_chain.c"
56 :
57 : #define MAP_NAME peer_map_by_addr
58 1503 : #define MAP_KEY addr
59 786 : #define MAP_ELE_T fd_sspeer_private_t
60 : #define MAP_KEY_T fd_ip4_port_t
61 1122 : #define MAP_PREV map_by_addr.prev
62 2184 : #define MAP_NEXT map_by_addr.next
63 81 : #define MAP_KEY_EQ(k0,k1) ((k0)->l==(k1)->l)
64 1629 : #define MAP_KEY_HASH(key,seed) (seed^(key)->l)
65 : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
66 : #define MAP_MULTI 1
67 : #include "../../../util/tmpl/fd_map_chain.c"
68 :
69 1347 : #define COMPARE_WORSE(x,y) ( (x)->score<(y)->score )
70 :
71 : #define TREAP_T fd_sspeer_private_t
72 : #define TREAP_NAME score_treap
73 : #define TREAP_QUERY_T void * /* We don't use query ... */
74 : #define TREAP_CMP(a,b) (__extension__({ (void)(a); (void)(b); -1; })) /* which means we don't need to give a real
75 : implementation to cmp either */
76 5028 : #define TREAP_IDX_T ulong
77 1347 : #define TREAP_LT COMPARE_WORSE
78 3939 : #define TREAP_PARENT score_treap.parent
79 3759 : #define TREAP_LEFT score_treap.left
80 2532 : #define TREAP_RIGHT score_treap.right
81 14356635 : #define TREAP_PRIO score_treap.prio
82 : #include "../../../util/tmpl/fd_treap.c"
83 :
84 906 : #define DEFAULT_SLOTS_BEHIND (1000UL*1000UL) /* 1,000,000 slots behind */
85 : /* Intentionally less than DEFAULT_SLOTS_BEHIND so that peers with an
86 : unknown slot are penalized more than the worst known peer. */
87 873 : #define MAX_SLOTS_BEHIND_CAP (864000UL) /* ~2 epochs worth of slots */
88 : /* Assumed latency (in nanos) for peers that have not been pinged yet.
89 : Pings are sent immediately on peer discovery, so this default is
90 : short-lived. 100ms is a neutral middle-ground: high enough that
91 : any peer with a measured latency is preferred, low enough that slot
92 : distance still meaningfully differentiates unpinged peers. */
93 912 : #define DEFAULT_PEER_LATENCY (100UL*1000UL*1000UL) /* 100ms */
94 906 : #define DEFAULT_SLOTS_BEHIND_PENALTY (1000UL)
95 :
96 : #define FD_SSPEER_SELECTOR_DEBUG 0
97 :
98 : struct fd_sspeer_selector_private {
99 : fd_sspeer_private_t * pool;
100 : peer_map_by_key_t * map_by_key;
101 : peer_map_by_addr_t * map_by_addr;
102 : score_treap_t * score_treap;
103 : score_treap_t * shadow_score_treap;
104 : ulong * peer_idx_list;
105 : fd_sscluster_slot_t cluster_slot;
106 : ulong max_peers;
107 : int cluster_slot_dirty;
108 :
109 : ulong magic; /* ==FD_SSPEER_SELECTOR_MAGIC */
110 : };
111 :
112 : FD_FN_CONST ulong
113 768 : fd_sspeer_selector_align( void ) {
114 768 : return fd_ulong_max( alignof( fd_sspeer_selector_t), fd_ulong_max( peer_pool_align(),
115 768 : fd_ulong_max( peer_map_by_key_align(), fd_ulong_max( peer_map_by_addr_align(),
116 768 : fd_ulong_max( score_treap_align(), alignof(ulong) ) ) ) ) );
117 768 : }
118 :
119 : FD_FN_CONST ulong
120 54 : fd_sspeer_selector_footprint( ulong max_peers ) {
121 54 : ulong l;
122 54 : l = FD_LAYOUT_INIT;
123 54 : l = FD_LAYOUT_APPEND( l, alignof(fd_sspeer_selector_t), sizeof(fd_sspeer_selector_t) );
124 54 : l = FD_LAYOUT_APPEND( l, peer_pool_align(), peer_pool_footprint( 2UL*max_peers ) );
125 54 : l = FD_LAYOUT_APPEND( l, peer_map_by_key_align(), peer_map_by_key_footprint( peer_map_by_key_chain_cnt_est( 2UL*max_peers ) ) );
126 54 : l = FD_LAYOUT_APPEND( l, peer_map_by_addr_align(), peer_map_by_addr_footprint( peer_map_by_addr_chain_cnt_est( 2UL*max_peers ) ) );
127 54 : l = FD_LAYOUT_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
128 54 : l = FD_LAYOUT_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
129 54 : l = FD_LAYOUT_APPEND( l, alignof(ulong), max_peers * sizeof(ulong) ); /* pool indices for shadow treap swap */
130 54 : return FD_LAYOUT_FINI( l, fd_sspeer_selector_align() );
131 54 : }
132 :
133 : void *
134 : fd_sspeer_selector_new( void * shmem,
135 : ulong max_peers,
136 129 : ulong seed ) {
137 129 : if( FD_UNLIKELY( !shmem ) ) {
138 0 : FD_LOG_WARNING(( "NULL shmem" ));
139 0 : return NULL;
140 0 : }
141 :
142 129 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_sspeer_selector_align() ) ) ) {
143 0 : FD_LOG_WARNING(( "unaligned shmem" ));
144 0 : return NULL;
145 0 : }
146 :
147 129 : if( FD_UNLIKELY( max_peers < 1UL ) ) {
148 0 : FD_LOG_WARNING(( "max_peers must be at least 1" ));
149 0 : return NULL;
150 0 : }
151 :
152 129 : FD_SCRATCH_ALLOC_INIT( l, shmem );
153 129 : fd_sspeer_selector_t * selector = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sspeer_selector_t), sizeof(fd_sspeer_selector_t) );
154 129 : void * _pool = FD_SCRATCH_ALLOC_APPEND( l, peer_pool_align(), peer_pool_footprint( 2UL*max_peers ) );
155 129 : void * _map = FD_SCRATCH_ALLOC_APPEND( l, peer_map_by_key_align(), peer_map_by_key_footprint( peer_map_by_key_chain_cnt_est( 2UL*max_peers ) ) );
156 129 : void * _multimap_by_addr = FD_SCRATCH_ALLOC_APPEND( l, peer_map_by_addr_align(), peer_map_by_addr_footprint( peer_map_by_addr_chain_cnt_est( 2UL*max_peers ) ) );
157 129 : void * _score_treap = FD_SCRATCH_ALLOC_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
158 129 : void * _shadow_score_treap = FD_SCRATCH_ALLOC_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
159 129 : void * _peer_idx_list = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), max_peers * sizeof(ulong) ); /* pool indices for shadow treap swap */
160 :
161 129 : selector->pool = peer_pool_join( peer_pool_new( _pool, 2UL*max_peers ) );
162 : /* Seed treap priorities so the treap is balanced. */
163 129 : score_treap_seed( selector->pool, 2UL*max_peers, seed );
164 129 : selector->map_by_key = peer_map_by_key_join( peer_map_by_key_new( _map, peer_map_by_key_chain_cnt_est( 2UL*max_peers ), seed ) );
165 129 : selector->map_by_addr = peer_map_by_addr_join( peer_map_by_addr_new( _multimap_by_addr, peer_map_by_addr_chain_cnt_est( 2UL*max_peers ), seed ) );
166 129 : selector->score_treap = score_treap_join( score_treap_new( _score_treap, max_peers ) );
167 129 : selector->shadow_score_treap = score_treap_join( score_treap_new( _shadow_score_treap, max_peers ) );
168 129 : selector->peer_idx_list = (ulong *)_peer_idx_list;
169 129 : selector->max_peers = max_peers;
170 :
171 129 : selector->cluster_slot.full = 0UL;
172 129 : selector->cluster_slot.incremental = FD_SSPEER_SLOT_UNKNOWN;
173 129 : selector->cluster_slot_dirty = 0;
174 :
175 129 : FD_COMPILER_MFENCE();
176 129 : FD_VOLATILE( selector->magic ) = FD_SSPEER_SELECTOR_MAGIC;
177 129 : FD_COMPILER_MFENCE();
178 :
179 129 : return (void *)selector;
180 129 : }
181 :
182 : fd_sspeer_selector_t *
183 129 : fd_sspeer_selector_join( void * shselector ) {
184 129 : if( FD_UNLIKELY( !shselector ) ) {
185 0 : FD_LOG_WARNING(( "NULL shselector" ));
186 0 : return NULL;
187 0 : }
188 :
189 129 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shselector, fd_sspeer_selector_align() ) ) ) {
190 0 : FD_LOG_WARNING(( "misaligned shselector" ));
191 0 : return NULL;
192 0 : }
193 :
194 129 : fd_sspeer_selector_t * selector = (fd_sspeer_selector_t *)shselector;
195 :
196 129 : if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
197 0 : FD_LOG_WARNING(( "bad magic" ));
198 0 : return NULL;
199 0 : }
200 :
201 129 : return selector;
202 129 : }
203 :
204 : void *
205 114 : fd_sspeer_selector_leave( fd_sspeer_selector_t * selector ) {
206 114 : if( FD_UNLIKELY( !selector ) ) {
207 0 : FD_LOG_WARNING(( "NULL selector" ));
208 0 : return NULL;
209 0 : }
210 :
211 114 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)selector, fd_sspeer_selector_align() ) ) ) {
212 0 : FD_LOG_WARNING(( "misaligned selector" ));
213 0 : return NULL;
214 0 : }
215 :
216 114 : if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
217 0 : FD_LOG_WARNING(( "bad magic" ));
218 0 : return NULL;
219 0 : }
220 :
221 114 : selector->pool = peer_pool_leave( selector->pool );
222 114 : selector->map_by_key = peer_map_by_key_leave( selector->map_by_key );
223 114 : selector->map_by_addr = peer_map_by_addr_leave( selector->map_by_addr );
224 114 : selector->score_treap = score_treap_leave( selector->score_treap );
225 114 : selector->shadow_score_treap = score_treap_leave( selector->shadow_score_treap );
226 :
227 114 : return (void *)selector;
228 114 : }
229 :
230 : void *
231 114 : fd_sspeer_selector_delete( void * shselector ) {
232 114 : if( FD_UNLIKELY( !shselector ) ) {
233 0 : FD_LOG_WARNING(( "NULL shselector" ));
234 0 : return NULL;
235 0 : }
236 :
237 114 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shselector, fd_sspeer_selector_align() ) ) ) {
238 0 : FD_LOG_WARNING(( "misaligned shselector" ));
239 0 : return NULL;
240 0 : }
241 :
242 114 : fd_sspeer_selector_t * selector = (fd_sspeer_selector_t *)shselector;
243 :
244 114 : if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
245 0 : FD_LOG_WARNING(( "bad magic" ));
246 0 : return NULL;
247 0 : }
248 :
249 114 : selector->pool = peer_pool_delete( selector->pool );
250 114 : selector->map_by_key = peer_map_by_key_delete( selector->map_by_key );
251 114 : selector->map_by_addr = peer_map_by_addr_delete( selector->map_by_addr );
252 114 : selector->score_treap = score_treap_delete( selector->score_treap );
253 114 : selector->shadow_score_treap = score_treap_delete( selector->shadow_score_treap );
254 :
255 114 : FD_COMPILER_MFENCE();
256 114 : FD_VOLATILE( selector->magic ) = 0UL;
257 114 : FD_COMPILER_MFENCE();
258 :
259 114 : return (void *)selector;
260 114 : }
261 :
262 : /* Calculates a score for a peer given its latency and its resolved
263 : full and incremental slots */
264 : static ulong
265 : fd_sspeer_selector_score( fd_sspeer_selector_t const * selector,
266 : ulong peer_latency,
267 : ulong full_slot,
268 906 : ulong incr_slot ) {
269 906 : peer_latency = peer_latency!=FD_SSPEER_LATENCY_UNKNOWN ? peer_latency : DEFAULT_PEER_LATENCY;
270 :
271 906 : ulong slots_behind = DEFAULT_SLOTS_BEHIND;
272 :
273 906 : if( FD_LIKELY( full_slot!=FD_SSPEER_SLOT_UNKNOWN ) ) {
274 873 : if( FD_LIKELY( incr_slot!=FD_SSPEER_SLOT_UNKNOWN &&
275 873 : selector->cluster_slot.incremental!=FD_SSPEER_SLOT_UNKNOWN ) ) {
276 321 : slots_behind = selector->cluster_slot.incremental>incr_slot ? selector->cluster_slot.incremental - incr_slot : 0UL;
277 552 : } else {
278 : /* Either the peer has no incremental or the cluster has no
279 : incremental reference yet. Fall back to comparing full_slot
280 : against the cluster full slot. */
281 552 : slots_behind = selector->cluster_slot.full>full_slot ? selector->cluster_slot.full - full_slot : 0UL;
282 552 : }
283 873 : slots_behind = fd_ulong_min( slots_behind, MAX_SLOTS_BEHIND_CAP );
284 873 : }
285 :
286 : /* Using saturating arithmetic to avoid overflow and cap at
287 : FD_SSPEER_SCORE_MAX. */
288 906 : ulong penalty = fd_ulong_sat_mul( DEFAULT_SLOTS_BEHIND_PENALTY, slots_behind );
289 906 : ulong score = fd_ulong_sat_add( peer_latency, penalty );
290 906 : return fd_ulong_min( score, FD_SSPEER_SCORE_MAX );
291 906 : }
292 :
293 : /* Validates slot arguments for both new and existing peers. Returns
294 : 0 on success, -1 on failure due to incr_slot<full_slot, and -2 on
295 : failure due to full_slot==UNKNOWN with incr_slot!=UNKNOWN. The
296 : caller passes in the effective (already-resolved) full_slot and
297 : incr_slot values. No log on failure (the caller is responsible
298 : for logging whenever needed).
299 :
300 : Two invariants are enforced:
301 : 1. When both slots are known, incr_slot must be >= full_slot.
302 : 2. An incremental slot requires a known full slot. */
303 : static int
304 : fd_sspeer_validate_slot_args( ulong full_slot,
305 660 : ulong incr_slot ) {
306 660 : if( FD_UNLIKELY( incr_slot!=FD_SSPEER_SLOT_UNKNOWN &&
307 660 : full_slot!=FD_SSPEER_SLOT_UNKNOWN &&
308 660 : incr_slot<full_slot ) ) {
309 6 : return -1;
310 6 : }
311 :
312 654 : if( FD_UNLIKELY( full_slot==FD_SSPEER_SLOT_UNKNOWN &&
313 654 : incr_slot!=FD_SSPEER_SLOT_UNKNOWN ) ) {
314 9 : return -2;
315 9 : }
316 :
317 645 : return 0;
318 654 : }
319 :
320 : /* Updates a peer's score with new values for latency and/or resolved
321 : full/incremental slots. Returns FD_SSPEER_UPDATE_SUCCESS on
322 : success, or the specific fd_sspeer_validate_slot_args error code
323 : on failure without modifying the peer or any data structure.
324 :
325 : Slot-based incremental clearing: when the caller provides
326 : incr_slot==UNKNOWN and full_slot!=UNKNOWN, the peer's existing
327 : incremental data is cleared if it is stale (peer->incr_slot <
328 : full_slot). Otherwise, the existing incremental data is preserved. */
329 : static int
330 : fd_sspeer_selector_update( fd_sspeer_selector_t * selector,
331 : fd_sspeer_private_t * peer,
332 : ulong latency,
333 : ulong full_slot,
334 : ulong incr_slot,
335 : uchar const full_hash[ FD_HASH_FOOTPRINT ],
336 96 : uchar const incr_hash[ FD_HASH_FOOTPRINT ] ) {
337 96 : ulong peer_latency = latency!=FD_SSPEER_LATENCY_UNKNOWN ? latency : peer->latency;
338 96 : ulong peer_full_slot = full_slot!=FD_SSPEER_SLOT_UNKNOWN ? full_slot : peer->full_slot;
339 :
340 96 : ulong peer_incr_slot;
341 96 : int clear_incr = 0;
342 96 : if( incr_slot!=FD_SSPEER_SLOT_UNKNOWN ) {
343 36 : peer_incr_slot = incr_slot;
344 60 : } else if( full_slot!=FD_SSPEER_SLOT_UNKNOWN &&
345 60 : peer->incr_slot!=FD_SSPEER_SLOT_UNKNOWN &&
346 60 : peer->incr_slot<full_slot ) {
347 : /* The caller is providing a new full_slot that has advanced past
348 : the peer's existing incremental, the incremental is stale. */
349 12 : peer_incr_slot = FD_SSPEER_SLOT_UNKNOWN;
350 12 : clear_incr = 1;
351 48 : } else {
352 48 : peer_incr_slot = peer->incr_slot;
353 48 : }
354 :
355 96 : int validate_err = fd_sspeer_validate_slot_args( peer_full_slot, peer_incr_slot );
356 96 : if( FD_UNLIKELY( validate_err ) ) return validate_err;
357 :
358 90 : score_treap_ele_remove( selector->score_treap, peer, selector->pool );
359 :
360 90 : peer->score = fd_sspeer_selector_score( selector, peer_latency, peer_full_slot, peer_incr_slot );
361 :
362 90 : peer->latency = peer_latency;
363 90 : peer->full_slot = peer_full_slot;
364 90 : peer->incr_slot = peer_incr_slot;
365 90 : if( FD_LIKELY( full_hash ) ) {
366 36 : fd_memcpy( peer->full_hash, full_hash, FD_HASH_FOOTPRINT );
367 36 : }
368 90 : if( FD_UNLIKELY( clear_incr ) ) {
369 12 : fd_memset( peer->incr_hash, 0, FD_HASH_FOOTPRINT );
370 78 : } else if( FD_LIKELY( incr_hash ) ) {
371 15 : fd_memcpy( peer->incr_hash, incr_hash, FD_HASH_FOOTPRINT );
372 15 : }
373 :
374 90 : score_treap_ele_insert( selector->score_treap, peer, selector->pool );
375 90 : return FD_SSPEER_UPDATE_SUCCESS;
376 96 : }
377 :
378 : ulong
379 : fd_sspeer_selector_update_on_ping( fd_sspeer_selector_t * selector,
380 : fd_ip4_port_t addr,
381 21 : ulong latency ) {
382 21 : ulong ele_idx = peer_map_by_addr_idx_query_const( selector->map_by_addr, &addr, ULONG_MAX, selector->pool );
383 21 : ulong cnt = 0UL;
384 45 : for(;;) {
385 45 : if( FD_UNLIKELY( ele_idx==ULONG_MAX ) ) break;
386 24 : fd_sspeer_private_t * peer = selector->pool + ele_idx;
387 : /* Update cannot fail here: slots are FD_SSPEER_SLOT_UNKNOWN and
388 : hashes are NULL, so fd_sspeer_validate_slot_args always
389 : returns FD_SSPEER_UPDATE_SUCCESS (no clear_incr trigger,
390 : no incr<full violation). */
391 24 : int update_status = fd_sspeer_selector_update( selector, peer, latency,
392 24 : FD_SSPEER_SLOT_UNKNOWN, FD_SSPEER_SLOT_UNKNOWN,
393 24 : NULL, NULL );
394 24 : if( FD_UNLIKELY( update_status!=FD_SSPEER_UPDATE_SUCCESS ) ) {
395 : /* A warning is a tradeoff between crashing with FD_LOG_CRIT and
396 : potentially missing the log altogether with FD_LOG_DEBUG. */
397 0 : if( peer->key.is_url ) {
398 0 : FD_LOG_WARNING(( "unexpected selector update returned %d for peer %s " FD_IP4_ADDR_FMT ":%hu",
399 0 : update_status, peer->key.url.hostname,
400 0 : FD_IP4_ADDR_FMT_ARGS( peer->key.url.resolved_addr.addr ), fd_ushort_bswap( peer->key.url.resolved_addr.port ) ));
401 0 : } else {
402 0 : FD_BASE58_ENCODE_32_BYTES( peer->key.pubkey->uc, peer_pubkey_b58 );
403 0 : FD_LOG_WARNING(( "unexpected selector update returned %d for peer %s " FD_IP4_ADDR_FMT ":%hu",
404 0 : update_status, peer_pubkey_b58,
405 0 : FD_IP4_ADDR_FMT_ARGS( peer->addr.addr ), fd_ushort_bswap( peer->addr.port ) ));
406 0 : }
407 0 : }
408 24 : ele_idx = peer_map_by_addr_idx_next_const( ele_idx, ULONG_MAX, selector->pool );
409 24 : cnt++;
410 24 : }
411 21 : return cnt;
412 21 : }
413 :
414 : ulong
415 : fd_sspeer_selector_add( fd_sspeer_selector_t * selector,
416 : fd_sspeer_key_t const * key,
417 : fd_ip4_port_t addr,
418 : ulong latency,
419 : ulong full_slot,
420 : ulong incr_slot,
421 : uchar const full_hash[ FD_HASH_FOOTPRINT ],
422 675 : uchar const incr_hash[ FD_HASH_FOOTPRINT ] ) {
423 675 : if( FD_UNLIKELY( key==NULL ) ) return FD_SSPEER_SCORE_INVALID;
424 : /* A peer without a valid address cannot be added to the selector.
425 : For an existing peer changing from a valid address to 0, it is
426 : the caller's responsibility to remove them. */
427 666 : if( FD_UNLIKELY( !addr.l ) ) return FD_SSPEER_SCORE_INVALID;
428 :
429 : /* Reject implausible slot values. FD_SSPEER_SLOT_UNKNOWN is allowed. */
430 654 : if( FD_UNLIKELY( full_slot!=FD_SSPEER_SLOT_UNKNOWN && full_slot>=FD_SSPEER_PLAUSIBLE_MAX_SLOT ) ) return FD_SSPEER_SCORE_INVALID;
431 648 : if( FD_UNLIKELY( incr_slot!=FD_SSPEER_SLOT_UNKNOWN && incr_slot>=FD_SSPEER_PLAUSIBLE_MAX_SLOT ) ) return FD_SSPEER_SCORE_INVALID;
432 :
433 645 : fd_sspeer_private_t * peer = peer_map_by_key_ele_query( selector->map_by_key, key, NULL, selector->pool );
434 645 : if( FD_LIKELY( peer ) ) {
435 72 : ulong old_full = peer->full_slot;
436 72 : ulong old_incr = peer->incr_slot;
437 72 : int update_status = fd_sspeer_selector_update( selector, peer, latency, full_slot, incr_slot, full_hash, incr_hash );
438 72 : if( FD_UNLIKELY( update_status!=FD_SSPEER_UPDATE_SUCCESS ) ) return FD_SSPEER_SCORE_INVALID;
439 66 : if( FD_UNLIKELY( peer->full_slot!=old_full || peer->incr_slot!=old_incr ) ) selector->cluster_slot_dirty = 1;
440 : /* Update the addr map after the selector update so that the peer
441 : is not mutated when the update fails. */
442 66 : if( FD_UNLIKELY( peer->addr.l!=addr.l ) ) {
443 6 : peer_map_by_addr_ele_remove_fast( selector->map_by_addr, peer, selector->pool );
444 6 : peer->addr = addr;
445 6 : peer_map_by_addr_ele_insert( selector->map_by_addr, peer, selector->pool );
446 6 : }
447 573 : } else {
448 573 : if( FD_UNLIKELY( !peer_pool_free( selector->pool ) ) ) {
449 0 : FD_LOG_WARNING(( "peer selector pool exhausted" ));
450 0 : return FD_SSPEER_SCORE_INVALID;
451 0 : }
452 573 : if( FD_UNLIKELY( score_treap_ele_cnt(selector->score_treap)>=selector->max_peers ) ) {
453 9 : FD_LOG_WARNING(( "peer selector at max capacity" ));
454 9 : return FD_SSPEER_SCORE_INVALID;
455 9 : }
456 :
457 564 : if( FD_UNLIKELY( fd_sspeer_validate_slot_args( full_slot, incr_slot ) ) ) {
458 9 : return FD_SSPEER_SCORE_INVALID;
459 9 : }
460 :
461 555 : peer = peer_pool_ele_acquire( selector->pool );
462 555 : peer->key = *key;
463 555 : peer->addr = addr;
464 555 : peer->latency = latency;
465 555 : peer->score = fd_sspeer_selector_score( selector, latency, full_slot, incr_slot );
466 555 : peer->full_slot = full_slot;
467 555 : peer->incr_slot = incr_slot;
468 555 : if( FD_LIKELY( full_hash ) ) fd_memcpy( peer->full_hash, full_hash, FD_HASH_FOOTPRINT );
469 543 : else fd_memset( peer->full_hash, 0, FD_HASH_FOOTPRINT );
470 : /* full_hash and incr_hash are treated independently here. */
471 555 : if( FD_LIKELY( incr_hash ) ) fd_memcpy( peer->incr_hash, incr_hash, FD_HASH_FOOTPRINT );
472 543 : else fd_memset( peer->incr_hash, 0, FD_HASH_FOOTPRINT );
473 555 : peer_map_by_key_ele_insert( selector->map_by_key, peer, selector->pool );
474 555 : peer_map_by_addr_ele_insert( selector->map_by_addr, peer, selector->pool );
475 555 : score_treap_ele_insert( selector->score_treap, peer, selector->pool );
476 555 : if( FD_LIKELY( full_slot!=FD_SSPEER_SLOT_UNKNOWN || incr_slot!=FD_SSPEER_SLOT_UNKNOWN ) ) selector->cluster_slot_dirty = 1;
477 555 : }
478 621 : peer->valid = peer->full_slot!=FD_SSPEER_SLOT_UNKNOWN;
479 621 : return peer->score;
480 645 : }
481 :
482 : void
483 : fd_sspeer_selector_remove( fd_sspeer_selector_t * selector,
484 516 : fd_sspeer_key_t const * key ) {
485 516 : if( FD_UNLIKELY( key==NULL ) ) return;
486 510 : fd_sspeer_private_t * peer = peer_map_by_key_ele_query( selector->map_by_key, key, NULL, selector->pool );
487 510 : if( FD_UNLIKELY( peer==NULL ) ) return;
488 495 : score_treap_ele_remove( selector->score_treap, peer, selector->pool );
489 495 : peer_map_by_key_ele_remove_fast( selector->map_by_key, peer, selector->pool );
490 495 : peer_map_by_addr_ele_remove_fast( selector->map_by_addr, peer, selector->pool );
491 495 : peer_pool_ele_release( selector->pool, peer );
492 495 : selector->cluster_slot_dirty = 1;
493 495 : }
494 :
495 : void
496 : fd_sspeer_selector_remove_by_addr( fd_sspeer_selector_t * selector,
497 48 : fd_ip4_port_t addr ) {
498 105 : for(;;) {
499 105 : fd_sspeer_private_t * peer = peer_map_by_addr_ele_remove( selector->map_by_addr, &addr, NULL, selector->pool );
500 105 : if( FD_UNLIKELY( peer==NULL ) ) break;
501 57 : score_treap_ele_remove( selector->score_treap, peer, selector->pool );
502 57 : peer_map_by_key_ele_remove_fast( selector->map_by_key, peer, selector->pool );
503 57 : peer_pool_ele_release( selector->pool, peer );
504 57 : selector->cluster_slot_dirty = 1;
505 57 : }
506 48 : }
507 :
508 : fd_sspeer_t
509 : fd_sspeer_selector_best( fd_sspeer_selector_t * selector,
510 : int incremental,
511 303 : ulong base_slot ) {
512 303 : if( FD_UNLIKELY( incremental && base_slot==FD_SSPEER_SLOT_UNKNOWN ) ) {
513 3 : FD_LOG_WARNING(( "incremental selection requires a valid base_slot" ));
514 3 : return (fd_sspeer_t){
515 3 : .key = { .is_url = 0 },
516 3 : .addr = { .l=0UL },
517 3 : .full_slot = FD_SSPEER_SLOT_UNKNOWN,
518 3 : .incr_slot = FD_SSPEER_SLOT_UNKNOWN,
519 3 : .score = FD_SSPEER_SCORE_INVALID,
520 3 : .full_hash = {0},
521 3 : .incr_hash = {0},
522 3 : };
523 3 : }
524 :
525 300 : for( score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( selector->score_treap, selector->pool );
526 351 : !score_treap_fwd_iter_done( iter );
527 306 : iter = score_treap_fwd_iter_next( iter, selector->pool ) ) {
528 306 : fd_sspeer_private_t const * peer = score_treap_fwd_iter_ele_const( iter, selector->pool );
529 : /* For full selection (!incremental), any valid peer is eligible.
530 : For incremental selection, the peer must serve the same base full
531 : snapshot and must actually offer an incremental snapshot. */
532 306 : if( FD_LIKELY( peer->valid &&
533 306 : (!incremental ||
534 306 : (peer->full_slot==base_slot && peer->incr_slot!=FD_SSPEER_SLOT_UNKNOWN) ) ) ) {
535 255 : fd_sspeer_t best = {
536 255 : .key = peer->key,
537 255 : .addr = peer->addr,
538 255 : .full_slot = peer->full_slot,
539 255 : .incr_slot = peer->incr_slot,
540 255 : .score = peer->score,
541 255 : };
542 255 : fd_memcpy( best.full_hash, peer->full_hash, FD_HASH_FOOTPRINT );
543 255 : fd_memcpy( best.incr_hash, peer->incr_hash, FD_HASH_FOOTPRINT );
544 255 : return best;
545 255 : }
546 306 : }
547 :
548 45 : return (fd_sspeer_t){
549 45 : .key = { .is_url = 0 },
550 45 : .addr = { .l=0UL },
551 45 : .full_slot = FD_SSPEER_SLOT_UNKNOWN,
552 45 : .incr_slot = FD_SSPEER_SLOT_UNKNOWN,
553 45 : .score = FD_SSPEER_SCORE_INVALID,
554 45 : .full_hash = {0},
555 45 : .incr_hash = {0},
556 45 : };
557 300 : }
558 :
559 : void
560 177 : fd_sspeer_selector_process_cluster_slot( fd_sspeer_selector_t * selector ) {
561 177 : if( FD_LIKELY( !selector->cluster_slot_dirty ) ) return;
562 165 : selector->cluster_slot_dirty = 0;
563 :
564 : /* Compute max full_slot and incr_slot across all tracked peers. */
565 165 : ulong max_full = 0UL;
566 165 : ulong max_incr = 0UL;
567 165 : ulong full_cnt = 0UL;
568 165 : ulong incr_cnt = 0UL;
569 165 : for( score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( selector->score_treap, selector->pool );
570 531 : !score_treap_fwd_iter_done( iter );
571 366 : iter = score_treap_fwd_iter_next( iter, selector->pool ) ) {
572 366 : fd_sspeer_private_t const * peer = score_treap_fwd_iter_ele_const( iter, selector->pool );
573 366 : if( FD_LIKELY( peer->full_slot!=FD_SSPEER_SLOT_UNKNOWN ) ) {
574 360 : max_full = fd_ulong_max( max_full, peer->full_slot );
575 360 : full_cnt++;
576 360 : }
577 366 : if( FD_LIKELY( peer->incr_slot!=FD_SSPEER_SLOT_UNKNOWN ) ) {
578 264 : max_incr = fd_ulong_max( max_incr, peer->incr_slot );
579 264 : incr_cnt++;
580 264 : }
581 366 : }
582 :
583 : /* If no known incr_slot among tracked peers, treat as unknown. */
584 165 : if( FD_UNLIKELY( incr_cnt==0UL ) ) {
585 42 : max_incr = FD_SSPEER_SLOT_UNKNOWN;
586 42 : }
587 :
588 : /* If no known full_slot among tracked peers, reset so that a
589 : subsequent add() is not scored against a stale cluster slot. */
590 165 : if( FD_UNLIKELY( full_cnt==0UL ) ) {
591 27 : selector->cluster_slot.full = 0UL;
592 27 : selector->cluster_slot.incremental = FD_SSPEER_SLOT_UNKNOWN;
593 27 : return;
594 27 : }
595 :
596 : /* The full and incremental maxima are computed from different peer
597 : subsets, so max_incr < max_full is possible. Clamp to maintain
598 : the invariant that incremental >= full. */
599 138 : if( FD_UNLIKELY( max_incr!=FD_SSPEER_SLOT_UNKNOWN &&
600 138 : max_incr<max_full ) ) {
601 3 : max_incr = max_full;
602 3 : }
603 :
604 : /* If neither max changed, no rescore needed. */
605 138 : if( FD_LIKELY( max_full==selector->cluster_slot.full &&
606 138 : max_incr==selector->cluster_slot.incremental ) ) return;
607 :
608 114 : selector->cluster_slot.full = max_full;
609 114 : selector->cluster_slot.incremental = max_incr;
610 :
611 : /* Rescore all peers using the shadow treap swap mechanism. */
612 114 : ulong idx = 0UL;
613 114 : for( score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( selector->score_treap, selector->pool );
614 375 : !score_treap_fwd_iter_done( iter );
615 261 : iter = score_treap_fwd_iter_next( iter, selector->pool ) ) {
616 261 : fd_sspeer_private_t * peer = score_treap_fwd_iter_ele( iter, selector->pool );
617 261 : fd_sspeer_private_t * shadow_peer = peer_pool_ele_acquire( selector->pool );
618 261 : shadow_peer->latency = peer->latency;
619 261 : shadow_peer->full_slot = peer->full_slot;
620 261 : shadow_peer->incr_slot = peer->incr_slot;
621 261 : shadow_peer->addr = peer->addr;
622 261 : shadow_peer->key = peer->key;
623 261 : shadow_peer->score = fd_sspeer_selector_score( selector, shadow_peer->latency, shadow_peer->full_slot, shadow_peer->incr_slot );
624 261 : shadow_peer->valid = peer->valid;
625 261 : fd_memcpy( shadow_peer->full_hash, peer->full_hash, FD_HASH_FOOTPRINT );
626 261 : fd_memcpy( shadow_peer->incr_hash, peer->incr_hash, FD_HASH_FOOTPRINT );
627 261 : score_treap_ele_insert( selector->shadow_score_treap, shadow_peer, selector->pool );
628 261 : selector->peer_idx_list[ idx++ ] = peer_pool_idx( selector->pool, peer );
629 261 : peer_map_by_key_ele_remove_fast( selector->map_by_key, peer, selector->pool );
630 261 : peer_map_by_addr_ele_remove_fast( selector->map_by_addr, peer, selector->pool );
631 261 : peer_map_by_key_ele_insert( selector->map_by_key, shadow_peer, selector->pool );
632 261 : peer_map_by_addr_ele_insert( selector->map_by_addr, shadow_peer, selector->pool );
633 261 : }
634 :
635 : /* clear score treap */
636 375 : for( ulong i=0UL; i<idx; i++ ) {
637 261 : fd_sspeer_private_t * peer = peer_pool_ele( selector->pool, selector->peer_idx_list[ i ] );
638 261 : score_treap_ele_remove( selector->score_treap, peer, selector->pool );
639 261 : peer_pool_ele_release( selector->pool, peer );
640 261 : }
641 :
642 114 : score_treap_t * tmp = selector->score_treap;
643 114 : selector->score_treap = selector->shadow_score_treap;
644 114 : selector->shadow_score_treap = tmp;
645 :
646 : #if FD_SSPEER_SELECTOR_DEBUG
647 : FD_TEST( score_treap_verify( selector->score_treap, selector->pool )==0 );
648 : #endif
649 114 : }
650 :
651 : fd_sscluster_slot_t
652 114 : fd_sspeer_selector_cluster_slot( fd_sspeer_selector_t * selector ) {
653 114 : return selector->cluster_slot;
654 114 : }
655 :
656 : ulong
657 231 : fd_sspeer_selector_peer_map_by_key_ele_cnt( fd_sspeer_selector_t * selector ) {
658 231 : ulong cnt = 0UL;
659 231 : for( peer_map_by_key_iter_t iter = peer_map_by_key_iter_init( selector->map_by_key, selector->pool );
660 828 : !peer_map_by_key_iter_done( iter, selector->map_by_key, selector->pool );
661 597 : iter = peer_map_by_key_iter_next( iter, selector->map_by_key, selector->pool ) ) {
662 597 : cnt++;
663 597 : }
664 231 : return cnt;
665 231 : }
666 :
667 : ulong
668 168 : fd_sspeer_selector_peer_map_by_addr_ele_cnt( fd_sspeer_selector_t * selector ) {
669 168 : ulong cnt = 0UL;
670 168 : for( peer_map_by_addr_iter_t iter = peer_map_by_addr_iter_init( selector->map_by_addr, selector->pool );
671 558 : !peer_map_by_addr_iter_done( iter, selector->map_by_addr, selector->pool );
672 390 : iter = peer_map_by_addr_iter_next( iter, selector->map_by_addr, selector->pool ) ) {
673 390 : cnt++;
674 390 : }
675 168 : return cnt;
676 168 : }
|