Line data Source code
1 : #include "fd_sspeer_selector.h"
2 : #include "../../../util/bits/fd_sat.h"
3 : #include "../../../util/log/fd_log.h"
4 :
5 : static int
6 : fd_sspeer_key_private_eq( fd_sspeer_key_t const * k0,
7 456 : fd_sspeer_key_t const * k1 ) {
8 456 : if( k0->is_url!=k1->is_url ) return 0;
9 420 : if( k0->is_url ) {
10 201 : return !strncmp( k0->url.hostname, k1->url.hostname, sizeof(k0->url.hostname) )
11 201 : && k0->url.resolved_addr.l==k1->url.resolved_addr.l;
12 201 : }
13 219 : return !memcmp( k0->pubkey, k1->pubkey, FD_PUBKEY_FOOTPRINT );
14 420 : }
15 :
16 : static ulong
17 : fd_sspeer_key_private_hash( fd_sspeer_key_t const * key,
18 1575 : ulong seed ) {
19 1575 : if( key->is_url ) {
20 : /* Use strnlen in case the string is not properly \0 terminated.
21 : Ideally, one would prefer sizeof(key->url.hostname) but that
22 : requires guaranteed zero-padding. */
23 798 : ulong h = fd_hash( seed, key->url.hostname, strnlen( key->url.hostname, sizeof(key->url.hostname) ) );
24 : /* fd_ip4_port_t is not a complete 64bit ulong, therefore compose
25 : the word from its parts to avoid random unused bytes. */
26 798 : ulong a = (ulong)key->url.resolved_addr.addr | ( ((ulong)key->url.resolved_addr.port) << 32 );
27 : /* Chaining "a" through fd_hash would give better avalanche
28 : properties, but it is probably overkill for a chain hash map. */
29 798 : return h ^ a;
30 798 : }
31 777 : return fd_hash( seed, key->pubkey, FD_PUBKEY_FOOTPRINT );
32 1575 : }
33 :
34 : struct fd_sspeer_private {
35 : fd_sspeer_key_t key;
36 : fd_ip4_port_t addr;
37 : ulong full_slot;
38 : ulong incr_slot;
39 : uchar full_hash[ FD_HASH_FOOTPRINT ];
40 : uchar incr_hash[ FD_HASH_FOOTPRINT ];
41 : ulong latency;
42 : ulong score;
43 : int valid;
44 :
45 : struct {
46 : ulong next;
47 : } pool;
48 :
49 : struct {
50 : ulong next;
51 : ulong prev;
52 : } map_by_key;
53 :
54 : struct {
55 : ulong next;
56 : ulong prev;
57 : } map_by_addr;
58 :
59 : struct {
60 : ulong parent;
61 : ulong left;
62 : ulong right;
63 : ulong prio;
64 : } score_treap;
65 : };
66 :
67 : typedef struct fd_sspeer_private fd_sspeer_private_t;
68 :
69 : #define POOL_NAME peer_pool
70 270 : #define POOL_T fd_sspeer_private_t
71 : #define POOL_IDX_T ulong
72 8652126 : #define POOL_NEXT pool.next
73 : #include "../../../util/tmpl/fd_pool.c"
74 :
75 : #define MAP_NAME peer_map_by_key
76 858 : #define MAP_KEY key
77 435 : #define MAP_ELE_T fd_sspeer_private_t
78 : #define MAP_KEY_T fd_sspeer_key_t
79 678 : #define MAP_PREV map_by_key.prev
80 1695 : #define MAP_NEXT map_by_key.next
81 456 : #define MAP_KEY_EQ(k0,k1) (fd_sspeer_key_private_eq(k0,k1))
82 1575 : #define MAP_KEY_HASH(key,seed) (fd_sspeer_key_private_hash(key,seed))
83 : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
84 : #include "../../../util/tmpl/fd_map_chain.c"
85 :
86 : #define MAP_NAME peer_map_by_addr
87 729 : #define MAP_KEY addr
88 429 : #define MAP_ELE_T fd_sspeer_private_t
89 : #define MAP_KEY_T fd_ip4_port_t
90 816 : #define MAP_PREV map_by_addr.prev
91 1467 : #define MAP_NEXT map_by_addr.next
92 60 : #define MAP_KEY_EQ(k0,k1) ((k0)->l==(k1)->l)
93 810 : #define MAP_KEY_HASH(key,seed) (seed^(key)->l)
94 : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
95 : #define MAP_MULTI 1
96 : #include "../../../util/tmpl/fd_map_chain.c"
97 :
98 888 : #define COMPARE_WORSE(x,y) ( (x)->score<(y)->score )
99 :
100 : #define TREAP_T fd_sspeer_private_t
101 : #define TREAP_NAME score_treap
102 : #define TREAP_QUERY_T void * /* We don't use query ... */
103 : #define TREAP_CMP(a,b) (__extension__({ (void)(a); (void)(b); -1; })) /* which means we don't need to give a real
104 : implementation to cmp either */
105 2973 : #define TREAP_IDX_T ulong
106 888 : #define TREAP_LT COMPARE_WORSE
107 2145 : #define TREAP_PARENT score_treap.parent
108 2019 : #define TREAP_LEFT score_treap.left
109 1278 : #define TREAP_RIGHT score_treap.right
110 8652459 : #define TREAP_PRIO score_treap.prio
111 : #include "../../../util/tmpl/fd_treap.c"
112 :
113 525 : #define DEFAULT_SLOTS_BEHIND (1000UL*1000UL) /* 1,000,000 slots behind */
114 : /* Assumed latency (in nanos) for peers that have not been pinged yet.
115 : Pings are sent immediately on peer discovery, so this default is
116 : short-lived. 100ms is a neutral middle-ground: high enough that
117 : any peer with a measured latency is preferred, low enough that slot
118 : distance still meaningfully differentiates unpinged peers. */
119 531 : #define DEFAULT_PEER_LATENCY (100UL*1000UL*1000UL) /* 100ms */
120 525 : #define DEFAULT_SLOTS_BEHIND_PENALTY (1000UL)
121 :
122 : #define FD_SSPEER_SELECTOR_DEBUG 0
123 :
124 : struct fd_sspeer_selector_private {
125 : fd_sspeer_private_t * pool;
126 : peer_map_by_key_t * map_by_key;
127 : peer_map_by_addr_t * map_by_addr;
128 : score_treap_t * score_treap;
129 : score_treap_t * shadow_score_treap;
130 : ulong * peer_idx_list;
131 : fd_sscluster_slot_t cluster_slot;
132 : int incremental_snapshot_fetch;
133 : ulong max_peers;
134 :
135 : ulong magic; /* ==FD_SSPEER_SELECTOR_MAGIC */
136 : };
137 :
138 : FD_FN_CONST ulong
139 396 : fd_sspeer_selector_align( void ) {
140 396 : return fd_ulong_max( alignof( fd_sspeer_selector_t), fd_ulong_max( peer_pool_align(),
141 396 : fd_ulong_max( peer_map_by_key_align(), fd_ulong_max( peer_map_by_addr_align(),
142 396 : fd_ulong_max( score_treap_align(), alignof(ulong) ) ) ) ) );
143 396 : }
144 :
145 : FD_FN_CONST ulong
146 12 : fd_sspeer_selector_footprint( ulong max_peers ) {
147 12 : ulong l;
148 12 : l = FD_LAYOUT_INIT;
149 12 : l = FD_LAYOUT_APPEND( l, alignof(fd_sspeer_selector_t), sizeof(fd_sspeer_selector_t) );
150 12 : l = FD_LAYOUT_APPEND( l, peer_pool_align(), peer_pool_footprint( 2UL*max_peers ) );
151 12 : l = FD_LAYOUT_APPEND( l, peer_map_by_key_align(), peer_map_by_key_footprint( peer_map_by_key_chain_cnt_est( 2UL*max_peers ) ) );
152 12 : l = FD_LAYOUT_APPEND( l, peer_map_by_addr_align(), peer_map_by_addr_footprint( peer_map_by_addr_chain_cnt_est( 2UL*max_peers ) ) );
153 12 : l = FD_LAYOUT_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
154 12 : l = FD_LAYOUT_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
155 12 : l = FD_LAYOUT_APPEND( l, alignof(ulong), max_peers * sizeof(ulong) );
156 12 : return FD_LAYOUT_FINI( l, fd_sspeer_selector_align() );
157 12 : }
158 :
159 : void *
160 : fd_sspeer_selector_new( void * shmem,
161 : ulong max_peers,
162 : int incremental_snapshot_fetch,
163 90 : ulong seed ) {
164 90 : if( FD_UNLIKELY( !shmem ) ) {
165 0 : FD_LOG_WARNING(( "NULL shmem" ));
166 0 : return NULL;
167 0 : }
168 :
169 90 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_sspeer_selector_align() ) ) ) {
170 0 : FD_LOG_WARNING(( "unaligned shmem" ));
171 0 : return NULL;
172 0 : }
173 :
174 90 : if( FD_UNLIKELY( max_peers < 1UL ) ) {
175 0 : FD_LOG_WARNING(( "max_peers must be at least 1" ));
176 0 : return NULL;
177 0 : }
178 :
179 90 : FD_SCRATCH_ALLOC_INIT( l, shmem );
180 90 : fd_sspeer_selector_t * selector = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sspeer_selector_t), sizeof(fd_sspeer_selector_t) );
181 90 : void * _pool = FD_SCRATCH_ALLOC_APPEND( l, peer_pool_align(), peer_pool_footprint( 2UL*max_peers ) );
182 90 : void * _map = FD_SCRATCH_ALLOC_APPEND( l, peer_map_by_key_align(), peer_map_by_key_footprint( peer_map_by_key_chain_cnt_est( 2UL*max_peers ) ) );
183 90 : void * _multimap_by_addr = FD_SCRATCH_ALLOC_APPEND( l, peer_map_by_addr_align(), peer_map_by_addr_footprint( peer_map_by_addr_chain_cnt_est( 2UL*max_peers ) ) );
184 90 : void * _score_treap = FD_SCRATCH_ALLOC_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
185 90 : void * _shadow_score_treap = FD_SCRATCH_ALLOC_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
186 90 : void * _peer_idx_list = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), max_peers * sizeof(ulong) );
187 :
188 0 : selector->pool = peer_pool_join( peer_pool_new( _pool, 2UL*max_peers ) );
189 : /* Seed treap priorities so the treap is balanced. */
190 90 : score_treap_seed( selector->pool, 2UL*max_peers, seed );
191 90 : selector->map_by_key = peer_map_by_key_join( peer_map_by_key_new( _map, peer_map_by_key_chain_cnt_est( 2UL*max_peers ), seed ) );
192 90 : selector->map_by_addr = peer_map_by_addr_join( peer_map_by_addr_new( _multimap_by_addr, peer_map_by_addr_chain_cnt_est( 2UL*max_peers ), seed ) );
193 90 : selector->score_treap = score_treap_join( score_treap_new( _score_treap, max_peers ) );
194 90 : selector->shadow_score_treap = score_treap_join( score_treap_new( _shadow_score_treap, max_peers ) );
195 90 : selector->peer_idx_list = (ulong *)_peer_idx_list;
196 90 : selector->max_peers = max_peers;
197 :
198 90 : selector->cluster_slot.full = 0UL;
199 90 : selector->cluster_slot.incremental = FD_SSPEER_SLOT_UNKNOWN;
200 90 : selector->incremental_snapshot_fetch = incremental_snapshot_fetch;
201 :
202 90 : FD_COMPILER_MFENCE();
203 90 : FD_VOLATILE( selector->magic ) = FD_SSPEER_SELECTOR_MAGIC;
204 90 : FD_COMPILER_MFENCE();
205 :
206 90 : return (void *)selector;
207 90 : }
208 :
209 : fd_sspeer_selector_t *
210 90 : fd_sspeer_selector_join( void * shselector ) {
211 90 : if( FD_UNLIKELY( !shselector ) ) {
212 0 : FD_LOG_WARNING(( "NULL shselector" ));
213 0 : return NULL;
214 0 : }
215 :
216 90 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shselector, fd_sspeer_selector_align() ) ) ) {
217 0 : FD_LOG_WARNING(( "misaligned shselector" ));
218 0 : return NULL;
219 0 : }
220 :
221 90 : fd_sspeer_selector_t * selector = (fd_sspeer_selector_t *)shselector;
222 :
223 90 : if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
224 0 : FD_LOG_WARNING(( "bad magic" ));
225 0 : return NULL;
226 0 : }
227 :
228 90 : return selector;
229 90 : }
230 :
231 : void *
232 90 : fd_sspeer_selector_leave( fd_sspeer_selector_t * selector ) {
233 90 : if( FD_UNLIKELY( !selector ) ) {
234 0 : FD_LOG_WARNING(( "NULL selector" ));
235 0 : return NULL;
236 0 : }
237 :
238 90 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)selector, fd_sspeer_selector_align() ) ) ) {
239 0 : FD_LOG_WARNING(( "misaligned selector" ));
240 0 : return NULL;
241 0 : }
242 :
243 90 : if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
244 0 : FD_LOG_WARNING(( "bad magic" ));
245 0 : return NULL;
246 0 : }
247 :
248 90 : selector->pool = peer_pool_leave( selector->pool );
249 90 : selector->map_by_key = peer_map_by_key_leave( selector->map_by_key );
250 90 : selector->map_by_addr = peer_map_by_addr_leave( selector->map_by_addr );
251 90 : selector->score_treap = score_treap_leave( selector->score_treap );
252 90 : selector->shadow_score_treap = score_treap_leave( selector->shadow_score_treap );
253 :
254 90 : return (void *)selector;
255 90 : }
256 :
257 : void *
258 90 : fd_sspeer_selector_delete( void * shselector ) {
259 90 : if( FD_UNLIKELY( !shselector ) ) {
260 0 : FD_LOG_WARNING(( "NULL shselector" ));
261 0 : return NULL;
262 0 : }
263 :
264 90 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shselector, fd_sspeer_selector_align() ) ) ) {
265 0 : FD_LOG_WARNING(( "misaligned shselector" ));
266 0 : return NULL;
267 0 : }
268 :
269 90 : fd_sspeer_selector_t * selector = (fd_sspeer_selector_t *)shselector;
270 :
271 90 : if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
272 0 : FD_LOG_WARNING(( "bad magic" ));
273 0 : return NULL;
274 0 : }
275 :
276 90 : selector->pool = peer_pool_delete( selector->pool );
277 90 : selector->map_by_key = peer_map_by_key_delete( selector->map_by_key );
278 90 : selector->map_by_addr = peer_map_by_addr_delete( selector->map_by_addr );
279 90 : selector->score_treap = score_treap_delete( selector->score_treap );
280 90 : selector->shadow_score_treap = score_treap_delete( selector->shadow_score_treap );
281 :
282 90 : FD_COMPILER_MFENCE();
283 90 : FD_VOLATILE( selector->magic ) = 0UL;
284 90 : FD_COMPILER_MFENCE();
285 :
286 90 : return (void *)selector;
287 90 : }
288 :
289 : /* Calculates a score for a peer given its latency and its resolved
290 : full and incremental slots */
291 : static ulong
292 : fd_sspeer_selector_score( fd_sspeer_selector_t const * selector,
293 : ulong peer_latency,
294 : ulong full_slot,
295 525 : ulong incr_slot ) {
296 525 : peer_latency = peer_latency!=FD_SSPEER_LATENCY_UNKNOWN ? peer_latency : DEFAULT_PEER_LATENCY;
297 :
298 525 : ulong slots_behind = DEFAULT_SLOTS_BEHIND;
299 :
300 525 : if( FD_LIKELY( full_slot!=FD_SSPEER_SLOT_UNKNOWN ) ) {
301 507 : if( FD_LIKELY( incr_slot!=FD_SSPEER_SLOT_UNKNOWN &&
302 507 : selector->cluster_slot.incremental!=FD_SSPEER_SLOT_UNKNOWN ) ) {
303 345 : slots_behind = selector->cluster_slot.incremental>incr_slot ? selector->cluster_slot.incremental - incr_slot : 0UL;
304 345 : } else {
305 : /* Either the peer has no incremental or the cluster has no
306 : incremental reference yet. Fall back to comparing full_slot
307 : against the cluster full slot. */
308 162 : slots_behind = selector->cluster_slot.full>full_slot ? selector->cluster_slot.full - full_slot : 0UL;
309 162 : }
310 507 : }
311 :
312 : /* Using saturating arithmetic to avoid overflow and cap at
313 : FD_SSPEER_SCORE_MAX. */
314 525 : ulong penalty = fd_ulong_sat_mul( DEFAULT_SLOTS_BEHIND_PENALTY, slots_behind );
315 525 : ulong score = fd_ulong_sat_add( peer_latency, penalty );
316 525 : return fd_ulong_min( score, FD_SSPEER_SCORE_MAX );
317 525 : }
318 :
319 : /* Validates slot arguments for both new and existing peers. Returns
320 : 0 on success, -1 on failure due to incr_slot<full_slot, and -2 on
321 : failure due to full_slot==UNKNOWN with incr_slot!=UNKNOWN. The
322 : caller passes in the effective (already-resolved) full_slot and
323 : incr_slot values. No log on failure (the caller is responsible
324 : for logging whenever needed).
325 :
326 : Two invariants are enforced:
327 : 1. When both slots are known, incr_slot must be >= full_slot.
328 : 2. An incremental slot requires a known full slot. */
329 : static int
330 : fd_sspeer_validate_slot_args( ulong full_slot,
331 426 : ulong incr_slot ) {
332 426 : if( FD_UNLIKELY( incr_slot!=FD_SSPEER_SLOT_UNKNOWN &&
333 426 : full_slot!=FD_SSPEER_SLOT_UNKNOWN &&
334 426 : incr_slot<full_slot ) ) {
335 9 : return -1;
336 9 : }
337 :
338 417 : if( FD_UNLIKELY( full_slot==FD_SSPEER_SLOT_UNKNOWN &&
339 417 : incr_slot!=FD_SSPEER_SLOT_UNKNOWN ) ) {
340 9 : return -2;
341 9 : }
342 :
343 408 : return 0;
344 417 : }
345 :
346 : /* Updates a peer's score with new values for latency and/or resolved
347 : full/incremental slots. Returns FD_SSPEER_UPDATE_SUCCESS on
348 : success, or the specific fd_sspeer_validate_slot_args error code
349 : on failure without modifying the peer or any data structure.
350 :
351 : Slot-based incremental clearing: when the caller provides
352 : incr_slot==UNKNOWN and full_slot!=UNKNOWN, the peer's existing
353 : incremental data is cleared if it is stale (peer->incr_slot <
354 : full_slot). Otherwise, the existing incremental data is preserved. */
355 : static int
356 : fd_sspeer_selector_update( fd_sspeer_selector_t * selector,
357 : fd_sspeer_private_t * peer,
358 : ulong latency,
359 : ulong full_slot,
360 : ulong incr_slot,
361 : uchar const full_hash[ FD_HASH_FOOTPRINT ],
362 99 : uchar const incr_hash[ FD_HASH_FOOTPRINT ] ) {
363 99 : ulong peer_latency = latency!=FD_SSPEER_LATENCY_UNKNOWN ? latency : peer->latency;
364 99 : ulong peer_full_slot = full_slot!=FD_SSPEER_SLOT_UNKNOWN ? full_slot : peer->full_slot;
365 :
366 99 : ulong peer_incr_slot;
367 99 : int clear_incr = 0;
368 99 : if( incr_slot!=FD_SSPEER_SLOT_UNKNOWN ) {
369 39 : peer_incr_slot = incr_slot;
370 60 : } else if( full_slot!=FD_SSPEER_SLOT_UNKNOWN &&
371 60 : peer->incr_slot!=FD_SSPEER_SLOT_UNKNOWN &&
372 60 : peer->incr_slot<full_slot ) {
373 : /* The caller is providing a new full_slot that has advanced past
374 : the peer's existing incremental — the incremental is stale. */
375 12 : peer_incr_slot = FD_SSPEER_SLOT_UNKNOWN;
376 12 : clear_incr = 1;
377 48 : } else {
378 48 : peer_incr_slot = peer->incr_slot;
379 48 : }
380 :
381 99 : int validate_err = fd_sspeer_validate_slot_args( peer_full_slot, peer_incr_slot );
382 99 : if( FD_UNLIKELY( validate_err ) ) return validate_err;
383 :
384 90 : score_treap_ele_remove( selector->score_treap, peer, selector->pool );
385 :
386 90 : peer->score = fd_sspeer_selector_score( selector, peer_latency, peer_full_slot, peer_incr_slot );
387 :
388 90 : peer->latency = peer_latency;
389 90 : peer->full_slot = peer_full_slot;
390 90 : peer->incr_slot = peer_incr_slot;
391 90 : if( FD_LIKELY( full_hash ) ) {
392 36 : fd_memcpy( peer->full_hash, full_hash, FD_HASH_FOOTPRINT );
393 36 : }
394 90 : if( FD_UNLIKELY( clear_incr ) ) {
395 12 : fd_memset( peer->incr_hash, 0, FD_HASH_FOOTPRINT );
396 78 : } else if( FD_LIKELY( incr_hash ) ) {
397 15 : fd_memcpy( peer->incr_hash, incr_hash, FD_HASH_FOOTPRINT );
398 15 : }
399 :
400 90 : score_treap_ele_insert( selector->score_treap, peer, selector->pool );
401 90 : return FD_SSPEER_UPDATE_SUCCESS;
402 99 : }
403 :
404 : int
405 : fd_sspeer_selector_update_on_resolve( fd_sspeer_selector_t * selector,
406 : fd_sspeer_key_t const * key,
407 : ulong full_slot,
408 : ulong incr_slot,
409 : uchar const full_hash[ FD_HASH_FOOTPRINT ],
410 42 : uchar const incr_hash[ FD_HASH_FOOTPRINT ] ) {
411 42 : if( FD_UNLIKELY( key==NULL ) ) return FD_SSPEER_UPDATE_ERR_NULL_KEY;
412 39 : fd_sspeer_private_t * peer = peer_map_by_key_ele_query( selector->map_by_key, key, NULL, selector->pool );
413 39 : if( FD_UNLIKELY( peer==NULL ) ) return FD_SSPEER_UPDATE_ERR_NOT_FOUND;
414 30 : int update_status = fd_sspeer_selector_update( selector, peer, FD_SSPEER_LATENCY_UNKNOWN, full_slot, incr_slot, full_hash, incr_hash );
415 30 : if( FD_UNLIKELY( update_status!=FD_SSPEER_UPDATE_SUCCESS ) ) return FD_SSPEER_UPDATE_ERR_INVALID_ARG;
416 24 : peer->valid = peer->full_slot!=FD_SSPEER_SLOT_UNKNOWN;
417 24 : return FD_SSPEER_UPDATE_SUCCESS;
418 30 : }
419 :
420 : ulong
421 : fd_sspeer_selector_update_on_ping( fd_sspeer_selector_t * selector,
422 : fd_ip4_port_t addr,
423 21 : ulong latency ) {
424 21 : ulong ele_idx = peer_map_by_addr_idx_query_const( selector->map_by_addr, &addr, ULONG_MAX, selector->pool );
425 21 : ulong cnt = 0UL;
426 45 : for(;;) {
427 45 : if( FD_UNLIKELY( ele_idx==ULONG_MAX ) ) break;
428 24 : fd_sspeer_private_t * peer = selector->pool + ele_idx;
429 : /* Update cannot fail here: slots are FD_SSPEER_SLOT_UNKNOWN and
430 : hashes are NULL, so fd_sspeer_validate_slot_args always
431 : returns FD_SSPEER_UPDATE_SUCCESS (no clear_incr trigger,
432 : no incr<full violation). */
433 24 : int update_status = fd_sspeer_selector_update( selector, peer, latency,
434 24 : FD_SSPEER_SLOT_UNKNOWN, FD_SSPEER_SLOT_UNKNOWN,
435 24 : NULL, NULL );
436 24 : if( FD_UNLIKELY( update_status!=FD_SSPEER_UPDATE_SUCCESS ) ) {
437 : /* A warning is a tradeoff between crashing with FD_LOG_CRIT and
438 : potentially missing the log altogether with FD_LOG_DEBUG. */
439 0 : if( peer->key.is_url ) {
440 0 : FD_LOG_WARNING(( "unexpected selector update returned %d for peer %s " FD_IP4_ADDR_FMT ":%hu",
441 0 : update_status, peer->key.url.hostname,
442 0 : FD_IP4_ADDR_FMT_ARGS( peer->key.url.resolved_addr.addr ), fd_ushort_bswap( peer->key.url.resolved_addr.port ) ));
443 0 : } else {
444 0 : FD_BASE58_ENCODE_32_BYTES( peer->key.pubkey->uc, peer_pubkey_b58 );
445 0 : FD_LOG_WARNING(( "unexpected selector update returned %d for peer %s " FD_IP4_ADDR_FMT ":%hu",
446 0 : update_status, peer_pubkey_b58,
447 0 : FD_IP4_ADDR_FMT_ARGS( peer->addr.addr ), fd_ushort_bswap( peer->addr.port ) ));
448 0 : }
449 0 : }
450 24 : ele_idx = peer_map_by_addr_idx_next_const( ele_idx, ULONG_MAX, selector->pool );
451 24 : cnt++;
452 24 : }
453 21 : return cnt;
454 21 : }
455 :
456 : ulong
457 : fd_sspeer_selector_add( fd_sspeer_selector_t * selector,
458 : fd_sspeer_key_t const * key,
459 : fd_ip4_port_t addr,
460 : ulong latency,
461 : ulong full_slot,
462 : ulong incr_slot,
463 : uchar const full_hash[ FD_HASH_FOOTPRINT ],
464 399 : uchar const incr_hash[ FD_HASH_FOOTPRINT ] ) {
465 399 : if( FD_UNLIKELY( key==NULL ) ) return FD_SSPEER_SCORE_INVALID;
466 : /* A peer without a valid address cannot be added to the selector.
467 : For an existing peer changing from a valid address to 0, it is
468 : the caller's responsibility to remove them. */
469 393 : if( FD_UNLIKELY( !addr.l ) ) return FD_SSPEER_SCORE_INVALID;
470 :
471 381 : fd_sspeer_private_t * peer = peer_map_by_key_ele_query( selector->map_by_key, key, NULL, selector->pool );
472 381 : if( FD_LIKELY( peer ) ) {
473 45 : int update_status = fd_sspeer_selector_update( selector, peer, latency, full_slot, incr_slot, full_hash, incr_hash );
474 45 : if( FD_UNLIKELY( update_status!=FD_SSPEER_UPDATE_SUCCESS ) ) return FD_SSPEER_SCORE_INVALID;
475 : /* Update the addr map after the selector update so that the peer
476 : is not mutated when the update fails. */
477 42 : if( FD_UNLIKELY( peer->addr.l!=addr.l ) ) {
478 6 : peer_map_by_addr_ele_remove_fast( selector->map_by_addr, peer, selector->pool );
479 6 : peer->addr = addr;
480 6 : peer_map_by_addr_ele_insert( selector->map_by_addr, peer, selector->pool );
481 6 : }
482 336 : } else {
483 336 : if( FD_UNLIKELY( !peer_pool_free( selector->pool ) ) ) {
484 0 : FD_LOG_WARNING(( "peer selector pool exhausted" ));
485 0 : return FD_SSPEER_SCORE_INVALID;
486 0 : }
487 336 : if( FD_UNLIKELY( score_treap_ele_cnt(selector->score_treap)>=selector->max_peers ) ) {
488 9 : FD_LOG_WARNING(( "peer selector at max capacity" ));
489 9 : return FD_SSPEER_SCORE_INVALID;
490 9 : }
491 :
492 327 : if( FD_UNLIKELY( fd_sspeer_validate_slot_args( full_slot, incr_slot ) ) ) {
493 9 : return FD_SSPEER_SCORE_INVALID;
494 9 : }
495 :
496 318 : peer = peer_pool_ele_acquire( selector->pool );
497 318 : peer->key = *key;
498 318 : peer->addr = addr;
499 318 : peer->latency = latency;
500 318 : peer->score = fd_sspeer_selector_score( selector, latency, full_slot, incr_slot );
501 318 : peer->full_slot = full_slot;
502 318 : peer->incr_slot = incr_slot;
503 318 : if( FD_LIKELY( full_hash ) ) fd_memcpy( peer->full_hash, full_hash, FD_HASH_FOOTPRINT );
504 306 : else fd_memset( peer->full_hash, 0, FD_HASH_FOOTPRINT );
505 : /* full_hash and incr_hash are treated independently here. */
506 318 : if( FD_LIKELY( incr_hash ) ) fd_memcpy( peer->incr_hash, incr_hash, FD_HASH_FOOTPRINT );
507 306 : else fd_memset( peer->incr_hash, 0, FD_HASH_FOOTPRINT );
508 318 : peer_map_by_key_ele_insert( selector->map_by_key, peer, selector->pool );
509 318 : peer_map_by_addr_ele_insert( selector->map_by_addr, peer, selector->pool );
510 318 : score_treap_ele_insert( selector->score_treap, peer, selector->pool );
511 318 : }
512 360 : peer->valid = peer->full_slot!=FD_SSPEER_SLOT_UNKNOWN;
513 360 : return peer->score;
514 381 : }
515 :
516 : void
517 : fd_sspeer_selector_remove( fd_sspeer_selector_t * selector,
518 303 : fd_sspeer_key_t const * key ) {
519 303 : if( FD_UNLIKELY( key==NULL ) ) return;
520 297 : fd_sspeer_private_t * peer = peer_map_by_key_ele_query( selector->map_by_key, key, NULL, selector->pool );
521 297 : if( FD_UNLIKELY( peer==NULL ) ) return;
522 282 : score_treap_ele_remove( selector->score_treap, peer, selector->pool );
523 282 : peer_map_by_key_ele_remove_fast( selector->map_by_key, peer, selector->pool );
524 282 : peer_map_by_addr_ele_remove_fast( selector->map_by_addr, peer, selector->pool );
525 282 : peer_pool_ele_release( selector->pool, peer );
526 282 : }
527 :
528 : void
529 : fd_sspeer_selector_remove_by_addr( fd_sspeer_selector_t * selector,
530 24 : fd_ip4_port_t addr ) {
531 60 : for(;;) {
532 60 : fd_sspeer_private_t * peer = peer_map_by_addr_ele_remove( selector->map_by_addr, &addr, NULL, selector->pool );
533 60 : if( FD_UNLIKELY( peer==NULL ) ) break;
534 36 : score_treap_ele_remove( selector->score_treap, peer, selector->pool );
535 36 : peer_map_by_key_ele_remove_fast( selector->map_by_key, peer, selector->pool );
536 36 : peer_pool_ele_release( selector->pool, peer );
537 36 : }
538 24 : }
539 :
540 : fd_sspeer_t
541 : fd_sspeer_selector_best( fd_sspeer_selector_t * selector,
542 : int incremental,
543 282 : ulong base_slot ) {
544 282 : if( FD_UNLIKELY( incremental && base_slot==FD_SSPEER_SLOT_UNKNOWN ) ) {
545 3 : FD_LOG_WARNING(( "incremental selection requires a valid base_slot" ));
546 3 : return (fd_sspeer_t){
547 3 : .addr = { .l=0UL },
548 3 : .full_slot = FD_SSPEER_SLOT_UNKNOWN,
549 3 : .incr_slot = FD_SSPEER_SLOT_UNKNOWN,
550 3 : .score = FD_SSPEER_SCORE_INVALID,
551 3 : .full_hash = {0},
552 3 : .incr_hash = {0},
553 3 : };
554 3 : }
555 :
556 279 : for( score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( selector->score_treap, selector->pool );
557 336 : !score_treap_fwd_iter_done( iter );
558 297 : iter = score_treap_fwd_iter_next( iter, selector->pool ) ) {
559 297 : fd_sspeer_private_t const * peer = score_treap_fwd_iter_ele_const( iter, selector->pool );
560 : /* For full selection (!incremental), any valid peer is eligible.
561 : For incremental selection, the peer must serve the same base full
562 : snapshot and must actually offer an incremental snapshot. */
563 297 : if( FD_LIKELY( peer->valid &&
564 297 : (!incremental ||
565 297 : (peer->full_slot==base_slot && peer->incr_slot!=FD_SSPEER_SLOT_UNKNOWN) ) ) ) {
566 240 : fd_sspeer_t best = {
567 240 : .addr = peer->addr,
568 240 : .full_slot = peer->full_slot,
569 240 : .incr_slot = peer->incr_slot,
570 240 : .score = peer->score,
571 240 : };
572 240 : fd_memcpy( best.full_hash, peer->full_hash, FD_HASH_FOOTPRINT );
573 240 : fd_memcpy( best.incr_hash, peer->incr_hash, FD_HASH_FOOTPRINT );
574 240 : return best;
575 240 : }
576 297 : }
577 :
578 39 : return (fd_sspeer_t){
579 39 : .addr = { .l=0UL },
580 39 : .full_slot = FD_SSPEER_SLOT_UNKNOWN,
581 39 : .incr_slot = FD_SSPEER_SLOT_UNKNOWN,
582 39 : .score = FD_SSPEER_SCORE_INVALID,
583 39 : .full_hash = {0},
584 39 : .incr_hash = {0},
585 39 : };
586 279 : }
587 :
588 : void
589 : fd_sspeer_selector_process_cluster_slot( fd_sspeer_selector_t * selector,
590 : ulong full_slot,
591 180 : ulong incr_slot ) {
592 180 : if( FD_UNLIKELY( full_slot==FD_SSPEER_SLOT_UNKNOWN ) ) return;
593 :
594 : /* Reject cluster slot updates where the incremental slot is before
595 : the full slot. Both must be known for the check to apply. Genesis
596 : (full_slot=0, incr_slot=0) is supported. */
597 174 : if( FD_UNLIKELY( incr_slot!=FD_SSPEER_SLOT_UNKNOWN && incr_slot<full_slot ) ) return;
598 :
599 171 : if( FD_LIKELY( selector->incremental_snapshot_fetch ) ) {
600 : /* The full slot must never regress, regardless of incr_slot. */
601 150 : if( FD_UNLIKELY( full_slot<selector->cluster_slot.full ) ) return;
602 :
603 : /* Reject updates that do not advance the cluster slot.
604 : incr_slot | stored incr | reject when
605 : --------------|---------------|--------------------------------
606 : valid | valid | incr_slot < stored.incremental
607 : | | OR (incr_slot == stored.incremental
608 : | | AND full_slot <= stored.full)
609 : valid | _SLOT_UNKNOWN | incr_slot < stored.full
610 : | | (strict: genesis accepted)
611 : _SLOT_UNKNOWN | valid | full_slot <= stored.full
612 : _SLOT_UNKNOWN | _SLOT_UNKNOWN | full_slot <= stored.full */
613 141 : if( FD_UNLIKELY( incr_slot!=FD_SSPEER_SLOT_UNKNOWN ) ) {
614 120 : if( FD_UNLIKELY( selector->cluster_slot.incremental!=FD_SSPEER_SLOT_UNKNOWN ) ) {
615 45 : if( FD_UNLIKELY( ( incr_slot<selector->cluster_slot.incremental ||
616 45 : ( incr_slot==selector->cluster_slot.incremental &&
617 45 : full_slot<=selector->cluster_slot.full ) ) ) ) return;
618 75 : } else {
619 75 : if( FD_UNLIKELY( incr_slot<selector->cluster_slot.full ) ) return;
620 75 : }
621 120 : } else if( FD_UNLIKELY( full_slot<=selector->cluster_slot.full ) ) return;
622 :
623 141 : } else {
624 21 : if( FD_UNLIKELY( full_slot<=selector->cluster_slot.full ) ) return;
625 21 : }
626 :
627 141 : selector->cluster_slot.full = full_slot;
628 141 : if( FD_LIKELY( incr_slot!=FD_SSPEER_SLOT_UNKNOWN ) ) {
629 108 : selector->cluster_slot.incremental = incr_slot;
630 108 : } else if( FD_UNLIKELY( selector->cluster_slot.incremental!=FD_SSPEER_SLOT_UNKNOWN &&
631 33 : selector->cluster_slot.incremental<full_slot ) ) {
632 : /* The full slot advanced past the incremental slot, so the
633 : incremental reference is stale and must be invalidated. */
634 9 : selector->cluster_slot.incremental = FD_SSPEER_SLOT_UNKNOWN;
635 9 : }
636 :
637 141 : if( FD_UNLIKELY( score_treap_ele_cnt( selector->score_treap )==0UL ) ) return;
638 :
639 : /* Rescore all peers
640 : TODO: make more performant, maybe make a treap rebalance API */
641 33 : ulong idx = 0UL;
642 33 : for( score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( selector->score_treap, selector->pool );
643 150 : !score_treap_fwd_iter_done( iter );
644 117 : iter = score_treap_fwd_iter_next( iter, selector->pool ) ) {
645 : /* Do not remove the peer from the treap while the iterator is
646 : running. Removing from peer_map(s) here is ok. */
647 117 : fd_sspeer_private_t * peer = score_treap_fwd_iter_ele( iter, selector->pool );
648 117 : fd_sspeer_private_t * shadow_peer = peer_pool_ele_acquire( selector->pool );
649 117 : shadow_peer->latency = peer->latency;
650 117 : shadow_peer->full_slot = peer->full_slot;
651 117 : shadow_peer->incr_slot = peer->incr_slot;
652 117 : shadow_peer->addr = peer->addr;
653 117 : shadow_peer->key = peer->key;
654 117 : shadow_peer->score = fd_sspeer_selector_score( selector, shadow_peer->latency, shadow_peer->full_slot, shadow_peer->incr_slot );
655 117 : shadow_peer->valid = peer->valid;
656 117 : fd_memcpy( shadow_peer->full_hash, peer->full_hash, FD_HASH_FOOTPRINT );
657 117 : fd_memcpy( shadow_peer->incr_hash, peer->incr_hash, FD_HASH_FOOTPRINT );
658 117 : score_treap_ele_insert( selector->shadow_score_treap, shadow_peer, selector->pool );
659 117 : selector->peer_idx_list[ idx++ ] = peer_pool_idx( selector->pool, peer );
660 117 : peer_map_by_key_ele_remove_fast( selector->map_by_key, peer, selector->pool );
661 117 : peer_map_by_addr_ele_remove_fast( selector->map_by_addr, peer, selector->pool );
662 117 : peer_map_by_key_ele_insert( selector->map_by_key, shadow_peer, selector->pool );
663 117 : peer_map_by_addr_ele_insert( selector->map_by_addr, shadow_peer, selector->pool );
664 117 : }
665 :
666 : /* clear score treap*/
667 150 : for( ulong i=0UL; i<idx; i++ ) {
668 117 : fd_sspeer_private_t * peer = peer_pool_ele( selector->pool, selector->peer_idx_list[ i ] );
669 117 : score_treap_ele_remove( selector->score_treap, peer, selector->pool );
670 117 : peer_pool_ele_release( selector->pool, peer );
671 117 : }
672 :
673 33 : score_treap_t * tmp = selector->score_treap;
674 33 : selector->score_treap = selector->shadow_score_treap;
675 33 : selector->shadow_score_treap = tmp;
676 :
677 : #if FD_SSPEER_SELECTOR_DEBUG
678 : FD_TEST( score_treap_verify( selector->score_treap, selector->pool )==0 );
679 : #endif
680 33 : }
681 :
682 : fd_sscluster_slot_t
683 111 : fd_sspeer_selector_cluster_slot( fd_sspeer_selector_t * selector ) {
684 111 : return selector->cluster_slot;
685 111 : }
686 :
687 : ulong
688 180 : fd_sspeer_selector_peer_map_by_key_ele_cnt( fd_sspeer_selector_t * selector ) {
689 180 : ulong cnt = 0UL;
690 180 : for( peer_map_by_key_iter_t iter = peer_map_by_key_iter_init( selector->map_by_key, selector->pool );
691 777 : !peer_map_by_key_iter_done( iter, selector->map_by_key, selector->pool );
692 597 : iter = peer_map_by_key_iter_next( iter, selector->map_by_key, selector->pool ) ) {
693 597 : cnt++;
694 597 : }
695 180 : return cnt;
696 180 : }
697 :
698 : ulong
699 156 : fd_sspeer_selector_peer_map_by_addr_ele_cnt( fd_sspeer_selector_t * selector ) {
700 156 : ulong cnt = 0UL;
701 156 : for( peer_map_by_addr_iter_t iter = peer_map_by_addr_iter_init( selector->map_by_addr, selector->pool );
702 546 : !peer_map_by_addr_iter_done( iter, selector->map_by_addr, selector->pool );
703 390 : iter = peer_map_by_addr_iter_next( iter, selector->map_by_addr, selector->pool ) ) {
704 390 : cnt++;
705 390 : }
706 156 : return cnt;
707 156 : }
|