Line data Source code
1 : #include "fd_crds.h"
2 : #include "fd_crds_contact_info.c"
3 : #include "../fd_gossip_types.h"
4 :
5 : #include "../../../ballet/sha256/fd_sha256.h"
6 : #include "../../../funk/fd_funk_base.h" /* no link dependency, only using hash */
7 :
8 : #include <string.h>
9 :
10 : FD_STATIC_ASSERT( CRDS_MAX_CONTACT_INFO==FD_CONTACT_INFO_TABLE_SIZE,
11 : "CRDS_MAX_CONTACT_INFO must match FD_CONTACT_INFO_TABLE_SIZE" );
12 :
13 : struct fd_crds_key {
14 : uchar tag;
15 : uchar pubkey[ 32UL ];
16 : union {
17 : uchar vote_index;
18 : uchar epoch_slots_index;
19 : ushort duplicate_shred_index;
20 : };
21 : };
22 :
23 : typedef struct fd_crds_key fd_crds_key_t;
24 :
25 : /* The CRDS at a high level is just a list of all the messages we have
26 : received over gossip. These are called the CRDS values. Values
27 : are not arbitrary, and must conform to a strictly typed schema of
28 : around 10 different messages. */
29 :
30 : struct fd_crds_entry_private {
31 : /* The core operation of the CRDS is to "upsert" a value. Basically,
32 : all of the message types are keyed by the originators public key,
33 : and we only want to store the most recent message of each type.
34 :
35 : This key field is the key for the hash table. */
36 : fd_crds_key_t key;
37 :
38 : union {
39 : struct {
40 : fd_crds_contact_info_entry_t * ci;
41 : long instance_creation_wallclock_nanos;
42 : uchar is_active;
43 : ulong sampler_idx;
44 :
45 : /* A list of "fresh" contact info entries is maintained, holding
46 : entries that have been refreshed/inserted in the last 60s in
47 : upsertion order (oldest first).
48 :
49 : fd_crds_advance periodically checks for and removes peers from
50 : this list if they exceed the threshold. Peers removed in this
51 : loop are also re-scored in the peer sampler. This is different
52 : from dropping the CRDS entry entirely, which also removes the
53 : entry from this list. To avoid double-popping an entry we use
54 : in_list as a presence check prior to removing */
55 : struct {
56 : ulong prev;
57 : ulong next;
58 : uchar in_list; /* 1 if in the fresh list, 0 otherwise */
59 : } fresh_dlist;
60 :
61 : /* The contact info side table has a separate size limit, so
62 : we maintain a separate evict list to make space for new
63 : entries */
64 : struct {
65 : ulong prev;
66 : ulong next;
67 : } evict_dlist;
68 :
69 : /* TODO: stake-ordered treap/pq? */
70 : } contact_info;
71 : struct {
72 : ulong token;
73 : } node_instance;
74 : };
75 :
76 : /* When an originator creates a CRDS message, they attach their local
77 : wallclock time to it. This time is used to determine when a
78 : message should be upserted. If messages have the same key, the
79 : newer one (as created by the originator) is used.
80 :
81 : Messages encode wallclock in millis, firedancer converts
82 : them into nanos internally. */
83 : long wallclock_nanos;
84 :
85 : uchar value_bytes[ FD_GOSSIP_CRDS_MAX_SZ ];
86 : ushort value_sz;
87 :
88 : /* The value hash is the sha256 of the value_bytes. It is used in
89 : bloom filter generation and as a tiebreaker when a
90 : fd_crds_checks_fast call returns CHECK_UNDETERMINED. */
91 : uchar value_hash[ 32UL ];
92 : ulong num_duplicates;
93 : ulong stake;
94 :
95 : struct {
96 : ulong next;
97 : } pool;
98 :
99 : /* The CRDS needs to perform a variety of actions on the message table
100 : quickly, so there are various indexes woven through them values to
101 : support these actions. They are ...
102 :
103 : lookup is used to enable the core map<key, value> functionality
104 : described for upserts defined by value->key. */
105 : struct {
106 : ulong next;
107 : ulong prev;
108 : } lookup;
109 :
110 : /* The table has a fixed size message capacity, and supports eviction
111 : so insertion never fails. If the table is full and we wish to
112 : insert a new value, the "lowest priority" message is evicted to
113 : make room. This is accomplished with a treap sorted by stake, so
114 : the lowest stake message is removed. */
115 : struct {
116 : ulong parent;
117 : ulong left;
118 : ulong right;
119 : ulong prio;
120 : ulong next;
121 : ulong prev;
122 : } evict;
123 :
124 : /* Values in the table expire after a pre-determined amount of time,
125 : so we also keep a linked list of values sorted by creation time.
126 : The time used here is our nodes wallclock when we received the
127 : CRDS, not the originators local wallclock, which they could skew
128 : to cause their values to live longer.
129 :
130 : There are actually two lists that reuse the same pointers here,
131 : and a value will be in exactly one of the lists. One is for staked
132 : nodes, which values expire after 48 hours, and one is for unstaked
133 : nodes, which expire after 15 seconds. */
134 : struct {
135 : long wallclock_nanos;
136 : ulong prev;
137 : ulong next;
138 : } expire;
139 :
140 : /* In order to load balance pull request messages across peers, each
141 : message has a mask value that is mask_bits long. The pull request
142 : is only concerned with CRDS entires with a hash where the first
143 : mask_bits of the hash match the mask value.
144 :
145 : We need to be able to quickly iterate over all CRDS table entries
146 : matching a given mask. To do this, we store the first 8 bytes of
147 : the value_hash in a sorted treap. */
148 : struct {
149 : ulong hash_prefix;
150 : ulong parent;
151 : ulong left;
152 : ulong right;
153 : ulong next;
154 : ulong prev;
155 : ulong prio;
156 : } hash;
157 : };
158 :
159 : #define POOL_NAME crds_pool
160 24 : #define POOL_T fd_crds_entry_t
161 19350 : #define POOL_NEXT pool.next
162 :
163 : #include "../../../util/tmpl/fd_pool.c"
164 :
165 : #define TREAP_NAME evict_treap
166 : #define TREAP_T fd_crds_entry_t
167 : #define TREAP_QUERY_T void * /* We don't use query ... */
168 : #define TREAP_CMP(q,e) (__extension__({ (void)(q); (void)(e); -1; })) /* which means we don't need to give a real
169 : implementation to cmp either */
170 44595 : #define TREAP_IDX_T ulong
171 30756 : #define TREAP_LT(e0,e1) ((e0)->stake<(e1)->stake)
172 21255 : #define TREAP_PARENT evict.parent
173 11793 : #define TREAP_LEFT evict.left
174 7062 : #define TREAP_RIGHT evict.right
175 26508 : #define TREAP_PRIO evict.prio
176 : #define TREAP_OPTIMIZE_ITERATION 1
177 7062 : #define TREAP_NEXT evict.next
178 7062 : #define TREAP_PREV evict.prev
179 :
180 : #include "../../../util/tmpl/fd_treap.c"
181 :
182 : #define DLIST_NAME staked_expire_dlist
183 : #define DLIST_ELE_T fd_crds_entry_t
184 150 : #define DLIST_PREV expire.prev
185 150 : #define DLIST_NEXT expire.next
186 :
187 : #include "../../../util/tmpl/fd_dlist.c"
188 :
189 : #define DLIST_NAME unstaked_expire_dlist
190 : #define DLIST_ELE_T fd_crds_entry_t
191 6912 : #define DLIST_PREV expire.prev
192 6912 : #define DLIST_NEXT expire.next
193 :
194 : #include "../../../util/tmpl/fd_dlist.c"
195 :
196 :
197 : #define DLIST_NAME crds_contact_info_fresh_list
198 : #define DLIST_ELE_T fd_crds_entry_t
199 150 : #define DLIST_PREV contact_info.fresh_dlist.prev
200 150 : #define DLIST_NEXT contact_info.fresh_dlist.next
201 : #include "../../../util/tmpl/fd_dlist.c"
202 :
203 : #define DLIST_NAME crds_contact_info_evict_dlist
204 : #define DLIST_ELE_T fd_crds_entry_t
205 150 : #define DLIST_PREV contact_info.evict_dlist.prev
206 150 : #define DLIST_NEXT contact_info.evict_dlist.next
207 : #include "../../../util/tmpl/fd_dlist.c"
208 :
209 : #define TREAP_NAME hash_treap
210 : #define TREAP_T fd_crds_entry_t
211 : #define TREAP_QUERY_T ulong
212 201 : #define TREAP_CMP(q,e) ((q>e->hash.hash_prefix)-(q<e->hash.hash_prefix))
213 58212 : #define TREAP_IDX_T ulong
214 : #define TREAP_OPTIMIZE_ITERATION 1
215 7062 : #define TREAP_NEXT hash.next
216 7062 : #define TREAP_PREV hash.prev
217 53979 : #define TREAP_LT(e0,e1) ((e0)->hash.hash_prefix<(e1)->hash.hash_prefix)
218 37116 : #define TREAP_PARENT hash.parent
219 18777 : #define TREAP_LEFT hash.left
220 9507 : #define TREAP_RIGHT hash.right
221 35526 : #define TREAP_PRIO hash.prio
222 : #include "../../../util/tmpl/fd_treap.c"
223 :
224 : static inline ulong
225 : lookup_hash( fd_crds_key_t const * key,
226 16578 : ulong seed ) {
227 16578 : ulong hash_fn = ((ulong)key->tag)<<16;
228 16578 : switch( key->tag ) {
229 0 : case FD_GOSSIP_VALUE_VOTE:
230 0 : hash_fn ^= key->vote_index;
231 0 : break;
232 0 : case FD_GOSSIP_VALUE_EPOCH_SLOTS:
233 0 : hash_fn ^= key->epoch_slots_index;
234 0 : break;
235 0 : case FD_GOSSIP_VALUE_DUPLICATE_SHRED:
236 0 : hash_fn ^= key->duplicate_shred_index;
237 0 : break;
238 16578 : default:
239 16578 : break;
240 16578 : }
241 16578 : return fd_funk_rec_key_hash1( key->pubkey, seed^hash_fn );
242 16578 : }
243 :
244 : static inline int
245 : lookup_eq( fd_crds_key_t const * key0,
246 10293 : fd_crds_key_t const * key1 ) {
247 10293 : if( FD_UNLIKELY( key0->tag!=key1->tag ) ) return 0;
248 10293 : if( FD_UNLIKELY( !!memcmp( key0->pubkey, key1->pubkey, 32UL ) ) ) return 0;
249 7062 : switch( key0->tag ) {
250 0 : case FD_GOSSIP_VALUE_VOTE:
251 0 : return key0->vote_index==key1->vote_index;
252 0 : case FD_GOSSIP_VALUE_EPOCH_SLOTS:
253 0 : return key0->epoch_slots_index==key1->epoch_slots_index;
254 0 : case FD_GOSSIP_VALUE_DUPLICATE_SHRED:
255 0 : return key0->duplicate_shred_index==key1->duplicate_shred_index;
256 7062 : default:
257 7062 : break;
258 7062 : }
259 7062 : return 1;
260 7062 : }
261 :
262 : #define MAP_NAME lookup_map
263 : #define MAP_ELE_T fd_crds_entry_t
264 : #define MAP_KEY_T fd_crds_key_t
265 4758 : #define MAP_KEY key
266 26106 : #define MAP_IDX_T ulong
267 13776 : #define MAP_NEXT lookup.next
268 11217 : #define MAP_PREV lookup.prev
269 16578 : #define MAP_KEY_HASH(k,s) (lookup_hash( k, s ))
270 10293 : #define MAP_KEY_EQ(k0,k1) (lookup_eq( k0, k1 ))
271 : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
272 :
273 : #include "../../../util/tmpl/fd_map_chain.c"
274 :
275 : #include "fd_crds_peer_samplers.c"
276 :
277 : struct fd_crds_purged {
278 : uchar hash[ 32UL ];
279 : struct {
280 : ulong next;
281 : } pool;
282 :
283 : /* Similar to fd_crds_entry, we want the ability to query and iterate
284 : through value by hash[:8] to generate pull requests. */
285 : struct {
286 : ulong hash_prefix;
287 : ulong parent;
288 : ulong left;
289 : ulong right;
290 : ulong next;
291 : ulong prev;
292 : ulong prio;
293 : } treap;
294 :
295 : /* Similar to fd_crds_entry, we keep a linked list of purged values sorted
296 : by insertion time. The time used here is our node's wallclock.
297 :
298 : There are actually two (mutually exclusive) lists that reuse the same
299 : pointers here: one for "purged" entries that expire in 60s and one for
300 : "failed_inserts" that expire after 20s. */
301 : struct {
302 : long wallclock_nanos;
303 : ulong next;
304 : ulong prev;
305 : } expire;
306 : };
307 : typedef struct fd_crds_purged fd_crds_purged_t;
308 :
309 : #define POOL_NAME purged_pool
310 24 : #define POOL_T fd_crds_purged_t
311 11520 : #define POOL_NEXT pool.next
312 :
313 : #include "../../../util/tmpl/fd_pool.c"
314 :
315 : #define TREAP_NAME purged_treap
316 : #define TREAP_T fd_crds_purged_t
317 : #define TREAP_QUERY_T ulong
318 24705 : #define TREAP_CMP(q,e) ((q>e->treap.hash_prefix)-(q<e->treap.hash_prefix))
319 19611 : #define TREAP_IDX_T ulong
320 : #define TREAP_OPTIMIZE_ITERATION 1
321 2304 : #define TREAP_NEXT treap.next
322 2304 : #define TREAP_PREV treap.prev
323 24528 : #define TREAP_LT(e0,e1) ((e0)->treap.hash_prefix<(e1)->treap.hash_prefix)
324 15003 : #define TREAP_PARENT treap.parent
325 31242 : #define TREAP_LEFT treap.left
326 27009 : #define TREAP_RIGHT treap.right
327 18033 : #define TREAP_PRIO treap.prio
328 : #include "../../../util/tmpl/fd_treap.c"
329 :
330 : #define DLIST_NAME failed_inserts_dlist
331 : #define DLIST_ELE_T fd_crds_purged_t
332 0 : #define DLIST_PREV expire.prev
333 0 : #define DLIST_NEXT expire.next
334 :
335 : #include "../../../util/tmpl/fd_dlist.c"
336 :
337 : #define DLIST_NAME purged_dlist
338 : #define DLIST_ELE_T fd_crds_purged_t
339 2304 : #define DLIST_PREV expire.prev
340 2304 : #define DLIST_NEXT expire.next
341 :
342 : #include "../../../util/tmpl/fd_dlist.c"
343 :
344 : struct fd_crds_private {
345 : fd_gossip_out_ctx_t * gossip_update;
346 :
347 : fd_sha256_t sha256[1];
348 :
349 : int has_staked_node;
350 :
351 : fd_crds_entry_t * pool;
352 :
353 : evict_treap_t * evict_treap;
354 : staked_expire_dlist_t * staked_expire_dlist;
355 : unstaked_expire_dlist_t * unstaked_expire_dlist;
356 : hash_treap_t * hash_treap;
357 : lookup_map_t * lookup_map;
358 :
359 : struct {
360 : fd_crds_purged_t * pool;
361 : purged_treap_t * treap;
362 : purged_dlist_t * purged_dlist;
363 : failed_inserts_dlist_t * failed_inserts_dlist;
364 : } purged;
365 :
366 : struct {
367 : fd_crds_contact_info_entry_t * pool;
368 : crds_contact_info_fresh_list_t * fresh_dlist;
369 : crds_contact_info_evict_dlist_t * evict_dlist;
370 : } contact_info;
371 :
372 : crds_samplers_t samplers[1];
373 :
374 : fd_crds_metrics_t metrics[1];
375 :
376 : ulong magic;
377 : };
378 :
379 : FD_FN_CONST ulong
380 48 : fd_crds_align( void ) {
381 48 : return FD_CRDS_ALIGN;
382 48 : }
383 :
384 : FD_FN_CONST ulong
385 : fd_crds_footprint( ulong ele_max,
386 30 : ulong purged_max ) {
387 30 : ulong l;
388 30 : l = FD_LAYOUT_INIT;
389 30 : l = FD_LAYOUT_APPEND( l, FD_CRDS_ALIGN, sizeof(fd_crds_t) );
390 30 : l = FD_LAYOUT_APPEND( l, crds_pool_align(), crds_pool_footprint( ele_max ) );
391 30 : l = FD_LAYOUT_APPEND( l, evict_treap_align(), evict_treap_footprint( ele_max ) );
392 30 : l = FD_LAYOUT_APPEND( l, staked_expire_dlist_align(), staked_expire_dlist_footprint() );
393 30 : l = FD_LAYOUT_APPEND( l, unstaked_expire_dlist_align(), unstaked_expire_dlist_footprint() );
394 30 : l = FD_LAYOUT_APPEND( l, hash_treap_align(), hash_treap_footprint( ele_max ) );
395 30 : l = FD_LAYOUT_APPEND( l, lookup_map_align(), lookup_map_footprint( ele_max ) );
396 30 : l = FD_LAYOUT_APPEND( l, purged_pool_align(), purged_pool_footprint( purged_max ) );
397 30 : l = FD_LAYOUT_APPEND( l, purged_treap_align(), purged_treap_footprint( purged_max ) );
398 30 : l = FD_LAYOUT_APPEND( l, purged_dlist_align(), purged_dlist_footprint() );
399 30 : l = FD_LAYOUT_APPEND( l, failed_inserts_dlist_align(), failed_inserts_dlist_footprint() );
400 30 : l = FD_LAYOUT_APPEND( l, crds_contact_info_pool_align(), crds_contact_info_pool_footprint( CRDS_MAX_CONTACT_INFO ) );
401 30 : l = FD_LAYOUT_APPEND( l, crds_contact_info_fresh_list_align(), crds_contact_info_fresh_list_footprint() );
402 30 : l = FD_LAYOUT_APPEND( l, crds_contact_info_evict_dlist_align(), crds_contact_info_evict_dlist_footprint() );
403 30 : return FD_LAYOUT_FINI( l, FD_CRDS_ALIGN );
404 30 : }
405 :
406 : void *
407 : fd_crds_new( void * shmem,
408 : fd_rng_t * rng,
409 : ulong ele_max,
410 : ulong purged_max,
411 12 : fd_gossip_out_ctx_t * gossip_update_out ) {
412 12 : if( FD_UNLIKELY( !shmem ) ) {
413 0 : FD_LOG_WARNING(( "NULL shmem" ));
414 0 : return NULL;
415 0 : }
416 :
417 12 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_crds_align() ) ) ) {
418 0 : FD_LOG_WARNING(( "misaligned shmem" ));
419 0 : return NULL;
420 0 : }
421 :
422 12 : if( FD_UNLIKELY( !fd_ulong_is_pow2( ele_max ) ) ) {
423 0 : FD_LOG_WARNING(( "ele_max must be a power of 2" ));
424 0 : return NULL;
425 0 : }
426 :
427 12 : if( FD_UNLIKELY( !fd_ulong_is_pow2( purged_max ) ) ) {
428 0 : FD_LOG_WARNING(( "purged_max must be a power of 2" ));
429 0 : return NULL;
430 0 : }
431 :
432 12 : if( FD_UNLIKELY( !rng ) ) {
433 0 : FD_LOG_WARNING(( "NULL rng" ));
434 0 : return NULL;
435 0 : }
436 :
437 12 : if( FD_UNLIKELY( !gossip_update_out ) ) {
438 0 : FD_LOG_WARNING(( "NULL gossip_out" ));
439 0 : return NULL;
440 0 : }
441 :
442 12 : FD_SCRATCH_ALLOC_INIT( l, shmem );
443 12 : fd_crds_t * crds = FD_SCRATCH_ALLOC_APPEND( l, FD_CRDS_ALIGN, sizeof(fd_crds_t) );
444 12 : void * _pool = FD_SCRATCH_ALLOC_APPEND( l, crds_pool_align(), crds_pool_footprint( ele_max ) );
445 12 : void * _evict_treap = FD_SCRATCH_ALLOC_APPEND( l, evict_treap_align(), evict_treap_footprint( ele_max ) );
446 12 : void * _staked_expire_dlist = FD_SCRATCH_ALLOC_APPEND( l, staked_expire_dlist_align(), staked_expire_dlist_footprint() );
447 12 : void * _unstaked_expire_dlist = FD_SCRATCH_ALLOC_APPEND( l, unstaked_expire_dlist_align(), unstaked_expire_dlist_footprint() );
448 12 : void * _hash_treap = FD_SCRATCH_ALLOC_APPEND( l, hash_treap_align(), hash_treap_footprint( ele_max ) );
449 12 : void * _lookup_map = FD_SCRATCH_ALLOC_APPEND( l, lookup_map_align(), lookup_map_footprint( ele_max ) );
450 12 : void * _purged_pool = FD_SCRATCH_ALLOC_APPEND( l, purged_pool_align(), purged_pool_footprint( purged_max ) );
451 12 : void * _purged_treap = FD_SCRATCH_ALLOC_APPEND( l, purged_treap_align(), purged_treap_footprint( purged_max ) );
452 12 : void * _purged_dlist = FD_SCRATCH_ALLOC_APPEND( l, purged_dlist_align(), purged_dlist_footprint() );
453 12 : void * _failed_inserts_dlist = FD_SCRATCH_ALLOC_APPEND( l, failed_inserts_dlist_align(), failed_inserts_dlist_footprint() );
454 12 : void * _ci_pool = FD_SCRATCH_ALLOC_APPEND( l, crds_contact_info_pool_align(), crds_contact_info_pool_footprint( CRDS_MAX_CONTACT_INFO ) );
455 12 : void * _ci_dlist = FD_SCRATCH_ALLOC_APPEND( l, crds_contact_info_fresh_list_align(), crds_contact_info_fresh_list_footprint() );
456 12 : void * _ci_evict_dlist = FD_SCRATCH_ALLOC_APPEND( l, crds_contact_info_evict_dlist_align(), crds_contact_info_evict_dlist_footprint() );
457 12 : FD_TEST( FD_SCRATCH_ALLOC_FINI( l, FD_CRDS_ALIGN ) == (ulong)shmem + fd_crds_footprint( ele_max, purged_max ) );
458 :
459 12 : crds->pool = crds_pool_join( crds_pool_new( _pool, ele_max ) );
460 12 : FD_TEST( crds->pool );
461 :
462 12 : crds->evict_treap = evict_treap_join( evict_treap_new( _evict_treap, ele_max ) );
463 12 : FD_TEST( crds->evict_treap );
464 12 : evict_treap_seed( crds->pool, ele_max, fd_rng_ulong( rng ) );
465 :
466 12 : crds->staked_expire_dlist = staked_expire_dlist_join( staked_expire_dlist_new( _staked_expire_dlist ) );
467 12 : FD_TEST( crds->staked_expire_dlist );
468 :
469 12 : crds->unstaked_expire_dlist = unstaked_expire_dlist_join( unstaked_expire_dlist_new( _unstaked_expire_dlist ) );
470 12 : FD_TEST( crds->unstaked_expire_dlist );
471 :
472 12 : crds->hash_treap = hash_treap_join( hash_treap_new( _hash_treap, ele_max ) );
473 12 : FD_TEST( crds->hash_treap );
474 12 : hash_treap_seed( crds->pool, ele_max, fd_rng_ulong( rng ) );
475 :
476 12 : crds->lookup_map = lookup_map_join( lookup_map_new( _lookup_map, ele_max, fd_rng_ulong( rng ) ) );
477 12 : FD_TEST( crds->lookup_map );
478 :
479 12 : crds->purged.pool = purged_pool_join( purged_pool_new( _purged_pool, purged_max ) );
480 12 : FD_TEST( crds->purged.pool );
481 :
482 12 : crds->purged.treap = purged_treap_join( purged_treap_new( _purged_treap, purged_max ) );
483 12 : FD_TEST( crds->purged.treap );
484 12 : purged_treap_seed( crds->purged.pool, purged_max, fd_rng_ulong( rng ) );
485 :
486 12 : crds->purged.purged_dlist = purged_dlist_join( purged_dlist_new( _purged_dlist ) );
487 12 : FD_TEST( crds->purged.purged_dlist );
488 :
489 12 : crds->purged.failed_inserts_dlist = failed_inserts_dlist_join( failed_inserts_dlist_new( _failed_inserts_dlist ) );
490 12 : FD_TEST( crds->purged.failed_inserts_dlist );
491 :
492 12 : crds->contact_info.pool = crds_contact_info_pool_join( crds_contact_info_pool_new( _ci_pool, CRDS_MAX_CONTACT_INFO ) );
493 12 : FD_TEST( crds->contact_info.pool );
494 :
495 12 : crds->contact_info.fresh_dlist = crds_contact_info_fresh_list_join( crds_contact_info_fresh_list_new( _ci_dlist ) );
496 12 : FD_TEST( crds->contact_info.fresh_dlist );
497 :
498 12 : crds->contact_info.evict_dlist = crds_contact_info_evict_dlist_join( crds_contact_info_evict_dlist_new( _ci_evict_dlist ) );
499 12 : FD_TEST( crds->contact_info.evict_dlist );
500 :
501 12 : FD_TEST( fd_sha256_join( fd_sha256_new( crds->sha256 ) ) );
502 :
503 12 : crds_samplers_new( crds->samplers );
504 :
505 12 : memset( crds->metrics, 0, sizeof(fd_crds_metrics_t) );
506 :
507 12 : crds->gossip_update = gossip_update_out;
508 12 : crds->has_staked_node = 0;
509 :
510 12 : FD_COMPILER_MFENCE();
511 12 : FD_VOLATILE( crds->magic ) = FD_CRDS_MAGIC;
512 12 : FD_COMPILER_MFENCE();
513 :
514 12 : return (void *)crds;
515 12 : }
516 :
517 : fd_crds_t *
518 12 : fd_crds_join( void * shcrds ) {
519 12 : if( FD_UNLIKELY( !shcrds ) ) {
520 0 : FD_LOG_WARNING(( "NULL shcrds" ));
521 0 : return NULL;
522 0 : }
523 :
524 12 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shcrds, fd_crds_align() ) ) ) {
525 0 : FD_LOG_WARNING(( "misaligned shcrds" ));
526 0 : return NULL;
527 0 : }
528 :
529 12 : fd_crds_t * crds = (fd_crds_t *)shcrds;
530 :
531 12 : if( FD_UNLIKELY( crds->magic!=FD_CRDS_MAGIC ) ) {
532 0 : FD_LOG_WARNING(( "bad magic" ));
533 0 : return NULL;
534 0 : }
535 :
536 12 : return crds;
537 12 : }
538 :
539 : fd_crds_metrics_t const *
540 0 : fd_crds_metrics( fd_crds_t const * crds ) {
541 0 : return crds->metrics;
542 0 : }
543 :
544 : static inline void
545 : remove_contact_info( fd_crds_t * crds,
546 : fd_crds_entry_t * ci,
547 : long now,
548 0 : fd_stem_context_t * stem ) {
549 0 : if( FD_UNLIKELY( !stem ) ) return;
550 0 : fd_gossip_update_message_t * msg = fd_gossip_out_get_chunk( crds->gossip_update );
551 0 : msg->tag = FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE;
552 0 : msg->wallclock_nanos = now;
553 0 : fd_memcpy( msg->origin_pubkey, ci->key.pubkey, 32UL );
554 0 : msg->contact_info_remove.idx = crds_contact_info_pool_idx( crds->contact_info.pool, ci->contact_info.ci );
555 0 : fd_gossip_tx_publish_chunk( crds->gossip_update, stem, (ulong)msg->tag, FD_GOSSIP_UPDATE_SZ_CONTACT_INFO_REMOVE, now );
556 :
557 0 : if( FD_LIKELY( ci->stake ) ) crds->metrics->peer_staked_cnt--;
558 0 : else crds->metrics->peer_unstaked_cnt--;
559 :
560 0 : crds->metrics->peer_visible_stake -= ci->stake;
561 :
562 0 : if( FD_LIKELY( !!ci->contact_info.fresh_dlist.in_list ) ) {
563 0 : crds_contact_info_fresh_list_ele_remove( crds->contact_info.fresh_dlist, ci, crds->pool );
564 0 : }
565 0 : crds_contact_info_evict_dlist_ele_remove( crds->contact_info.evict_dlist, ci, crds->pool );
566 0 : crds_contact_info_pool_ele_release( crds->contact_info.pool, ci->contact_info.ci );
567 :
568 : /* FIXME: If the peer is in any active set bucket, it is NOT removed
569 : here. If the peer is re-inserted into the CRDS table in the future,
570 : it is added back into the bucket's sampler. This means a peer can
571 : be sampled in a bucket (at least) twice during
572 : fd_active_set_rotate. */
573 0 : crds_samplers_rem_peer( crds->samplers, ci );
574 0 : }
575 :
576 : ulong
577 0 : fd_crds_len( fd_crds_t const * crds ) {
578 0 : return crds_pool_used( crds->pool );
579 0 : }
580 :
581 : ulong
582 0 : fd_crds_purged_len( fd_crds_t const * crds ) {
583 0 : return purged_pool_used( crds->purged.pool );
584 0 : }
585 :
586 : void
587 : fd_crds_release( fd_crds_t * crds,
588 2304 : fd_crds_entry_t * value ) {
589 2304 : crds_pool_ele_release( crds->pool, value );
590 2304 : crds->metrics->count[ value->key.tag ]--;
591 2304 : }
592 :
593 : static inline void
594 : expire( fd_crds_t * crds,
595 : long now,
596 0 : fd_stem_context_t * stem ){
597 0 : static const long SLOT_DURATION_NANOS = 400L*1000L*1000L;
598 0 : static const long STAKED_EXPIRE_DURATION_NANOS = 432000L*SLOT_DURATION_NANOS;
599 0 : static const long UNSTAKED_EXPIRE_DURATION_NANOS = 15L*1000L*1000L*1000L;
600 :
601 0 : while( !staked_expire_dlist_is_empty( crds->staked_expire_dlist, crds->pool ) ) {
602 0 : fd_crds_entry_t * head = staked_expire_dlist_ele_peek_head( crds->staked_expire_dlist, crds->pool );
603 :
604 0 : if( FD_LIKELY( head->expire.wallclock_nanos>now-STAKED_EXPIRE_DURATION_NANOS ) ) break;
605 :
606 0 : staked_expire_dlist_ele_pop_head( crds->staked_expire_dlist, crds->pool );
607 0 : hash_treap_ele_remove( crds->hash_treap, head, crds->pool );
608 0 : lookup_map_ele_remove( crds->lookup_map, &head->key, NULL, crds->pool );
609 0 : evict_treap_ele_remove( crds->evict_treap, head, crds->pool );
610 :
611 0 : if( FD_UNLIKELY( head->key.tag==FD_GOSSIP_VALUE_CONTACT_INFO ) ) remove_contact_info( crds, head, now, stem );
612 0 : fd_crds_release( crds, head );
613 :
614 0 : crds->metrics->expired_cnt++;
615 0 : }
616 :
617 0 : long unstaked_expire_duration_nanos = fd_long_if( crds->has_staked_node,
618 0 : UNSTAKED_EXPIRE_DURATION_NANOS,
619 0 : STAKED_EXPIRE_DURATION_NANOS );
620 :
621 0 : while( !unstaked_expire_dlist_is_empty( crds->unstaked_expire_dlist, crds->pool ) ) {
622 0 : fd_crds_entry_t * head = unstaked_expire_dlist_ele_peek_head( crds->unstaked_expire_dlist, crds->pool );
623 :
624 0 : if( FD_LIKELY( head->expire.wallclock_nanos>now-unstaked_expire_duration_nanos ) ) break;
625 :
626 0 : unstaked_expire_dlist_ele_pop_head( crds->unstaked_expire_dlist, crds->pool );
627 0 : hash_treap_ele_remove( crds->hash_treap, head, crds->pool );
628 0 : lookup_map_ele_remove( crds->lookup_map, &head->key, NULL, crds->pool );
629 0 : evict_treap_ele_remove( crds->evict_treap, head, crds->pool );
630 :
631 0 : if( FD_UNLIKELY( head->key.tag==FD_GOSSIP_VALUE_CONTACT_INFO ) ) remove_contact_info( crds, head, now, stem );
632 0 : fd_crds_release( crds, head );
633 :
634 0 : crds->metrics->expired_cnt++;
635 0 : }
636 :
637 0 : while( !purged_dlist_is_empty( crds->purged.purged_dlist, crds->purged.pool ) ) {
638 0 : fd_crds_purged_t * head = purged_dlist_ele_peek_head( crds->purged.purged_dlist, crds->purged.pool );
639 :
640 0 : if( FD_LIKELY( head->expire.wallclock_nanos>now-60L*1000L*1000L*1000L ) ) break;
641 :
642 0 : purged_dlist_ele_pop_head( crds->purged.purged_dlist, crds->purged.pool );
643 0 : purged_treap_ele_remove( crds->purged.treap, head, crds->purged.pool );
644 0 : purged_pool_ele_release( crds->purged.pool, head );
645 :
646 0 : crds->metrics->purged_cnt--;
647 0 : crds->metrics->purged_expired_cnt++;
648 0 : }
649 :
650 0 : while( !failed_inserts_dlist_is_empty( crds->purged.failed_inserts_dlist, crds->purged.pool ) ) {
651 0 : fd_crds_purged_t * head = failed_inserts_dlist_ele_peek_head( crds->purged.failed_inserts_dlist, crds->purged.pool );
652 :
653 0 : if( FD_LIKELY( head->expire.wallclock_nanos>now-20L*1000L*1000L*1000L ) ) break;
654 :
655 0 : failed_inserts_dlist_ele_pop_head( crds->purged.failed_inserts_dlist, crds->purged.pool );
656 0 : purged_treap_ele_remove( crds->purged.treap, head, crds->purged.pool );
657 0 : purged_pool_ele_release( crds->purged.pool, head );
658 :
659 0 : crds->metrics->purged_cnt--;
660 0 : crds->metrics->purged_expired_cnt++;
661 0 : }
662 0 : }
663 :
664 : void
665 : unfresh( fd_crds_t * crds,
666 0 : long now ) {
667 0 : while( !crds_contact_info_fresh_list_is_empty( crds->contact_info.fresh_dlist, crds->pool ) ) {
668 0 : fd_crds_entry_t * head = crds_contact_info_fresh_list_ele_peek_head( crds->contact_info.fresh_dlist, crds->pool );
669 :
670 0 : if( FD_LIKELY( head->expire.wallclock_nanos>now-60L*1000L*1000L*1000L ) ) break;
671 :
672 0 : head = crds_contact_info_fresh_list_ele_pop_head( crds->contact_info.fresh_dlist, crds->pool );
673 0 : head->contact_info.fresh_dlist.in_list = 0;
674 0 : crds_samplers_upd_peer_at_idx( crds->samplers, head, head->contact_info.sampler_idx, now );
675 0 : }
676 0 : }
677 :
678 : void
679 : fd_crds_advance( fd_crds_t * crds,
680 : long now,
681 0 : fd_stem_context_t * stem ) {
682 0 : expire( crds, now, stem );
683 0 : unfresh( crds, now );
684 0 : }
685 :
686 : fd_crds_entry_t *
687 : fd_crds_acquire( fd_crds_t * crds,
688 : long now,
689 4758 : fd_stem_context_t * stem ) {
690 4758 : if( FD_UNLIKELY( !crds_pool_free( crds->pool ) ) ) {
691 0 : evict_treap_fwd_iter_t head = evict_treap_fwd_iter_init( crds->evict_treap, crds->pool );
692 0 : FD_TEST( !evict_treap_fwd_iter_done( head ) );
693 0 : fd_crds_entry_t * evict = evict_treap_fwd_iter_ele( head, crds->pool );
694 :
695 0 : if( FD_LIKELY( !evict->stake ) ) unstaked_expire_dlist_ele_remove( crds->unstaked_expire_dlist, evict, crds->pool );
696 0 : else staked_expire_dlist_ele_remove( crds->staked_expire_dlist, evict, crds->pool );
697 :
698 0 : hash_treap_ele_remove( crds->hash_treap, evict, crds->pool );
699 0 : lookup_map_ele_remove( crds->lookup_map, &evict->key, NULL, crds->pool );
700 0 : evict_treap_ele_remove( crds->evict_treap, evict, crds->pool );
701 0 : if( FD_UNLIKELY( evict->key.tag==FD_GOSSIP_VALUE_CONTACT_INFO ) ) remove_contact_info( crds, evict, now, stem );
702 :
703 0 : crds->metrics->evicted_cnt++;
704 :
705 0 : return evict;
706 4758 : } else {
707 4758 : return crds_pool_ele_acquire( crds->pool );
708 4758 : }
709 4758 : }
710 :
711 : int
712 0 : fd_crds_has_staked_node( fd_crds_t const * crds ) {
713 0 : return crds->has_staked_node;
714 0 : }
715 :
716 : static inline void
717 : generate_key( fd_gossip_view_crds_value_t const * view,
718 : uchar const * payload,
719 9366 : fd_crds_key_t * out_key ) {
720 9366 : out_key->tag = view->tag;
721 9366 : fd_memcpy( out_key->pubkey, payload+view->pubkey_off, 32UL );
722 :
723 9366 : switch( out_key->tag ) {
724 0 : case FD_GOSSIP_VALUE_VOTE:
725 0 : out_key->vote_index = view->vote->index;
726 0 : break;
727 0 : case FD_GOSSIP_VALUE_EPOCH_SLOTS:
728 0 : out_key->epoch_slots_index = view->epoch_slots->index;
729 0 : break;
730 0 : case FD_GOSSIP_VALUE_DUPLICATE_SHRED:
731 0 : out_key->duplicate_shred_index = view->duplicate_shred->index;
732 0 : break;
733 9366 : default:
734 9366 : break;
735 9366 : }
736 9366 : }
737 :
738 : void
739 : fd_crds_generate_hash( fd_sha256_t * sha,
740 : uchar const * crds_value,
741 : ulong crds_value_sz,
742 4758 : uchar out_hash[ static 32UL ] ){
743 4758 : fd_sha256_init( sha );
744 4758 : fd_sha256_append( sha, crds_value, crds_value_sz );
745 4758 : fd_sha256_fini( sha, out_hash );
746 4758 : }
747 :
748 : static inline void
749 : crds_entry_init( fd_gossip_view_crds_value_t const * view,
750 : fd_sha256_t * sha,
751 : uchar const * payload,
752 : ulong stake,
753 4758 : fd_crds_entry_t * out_value ) {
754 : /* Construct key */
755 4758 : fd_crds_key_t * key = &out_value->key;
756 4758 : generate_key( view, payload, key );
757 :
758 4758 : out_value->wallclock_nanos = view->wallclock_nanos;
759 4758 : out_value->stake = stake;
760 :
761 4758 : fd_crds_generate_hash( sha, payload+view->value_off, view->length, out_value->value_hash );
762 4758 : out_value->hash.hash_prefix = fd_ulong_bswap( fd_ulong_load_8( out_value->value_hash ) );
763 :
764 4758 : if( FD_UNLIKELY( view->tag==FD_GOSSIP_VALUE_NODE_INSTANCE ) ) {
765 0 : out_value->node_instance.token = view->node_instance->token;
766 4758 : } else if( FD_UNLIKELY( key->tag==FD_GOSSIP_VALUE_CONTACT_INFO ) ) {
767 150 : out_value->contact_info.instance_creation_wallclock_nanos = view->ci_view->contact_info->instance_creation_wallclock_nanos;
768 : /* Contact Info entry will be added to sampler upon successful insertion */
769 150 : out_value->contact_info.sampler_idx = SAMPLE_IDX_SENTINEL;
770 150 : }
771 4758 : }
772 :
773 : static inline void
774 : purged_init( fd_crds_purged_t * purged,
775 : uchar const * hash,
776 : ulong hash_prefix,
777 2304 : long now ) {
778 2304 : fd_memcpy( purged->hash, hash, 32UL );
779 2304 : purged->treap.hash_prefix = hash_prefix;
780 2304 : purged->expire.wallclock_nanos = now;
781 2304 : }
782 :
783 : void
784 : insert_purged( fd_crds_t * crds,
785 : uchar const * hash,
786 2304 : long now ) {
787 2304 : ulong hash_prefix = fd_ulong_bswap( fd_ulong_load_8( hash ) );
788 2304 : if( purged_treap_ele_query( crds->purged.treap, hash_prefix, crds->purged.pool ) ) {
789 0 : return;
790 0 : }
791 2304 : fd_crds_purged_t * purged;
792 2304 : if( FD_UNLIKELY( !purged_pool_free( crds->purged.pool ) ) ) {
793 0 : purged = purged_dlist_ele_pop_head( crds->purged.purged_dlist, crds->purged.pool );
794 0 : purged_treap_ele_remove( crds->purged.treap, purged, crds->purged.pool );
795 0 : if( FD_LIKELY( crds->metrics ) ) {
796 0 : crds->metrics->purged_evicted_cnt++;
797 0 : }
798 2304 : } else {
799 2304 : purged = purged_pool_ele_acquire( crds->purged.pool );
800 2304 : if( FD_LIKELY( crds->metrics ) ) {
801 2304 : crds->metrics->purged_cnt++;
802 2304 : }
803 2304 : }
804 2304 : purged_init( purged, hash, hash_prefix, now );
805 2304 : purged_treap_ele_insert( crds->purged.treap, purged, crds->purged.pool );
806 2304 : purged_dlist_ele_push_tail( crds->purged.purged_dlist, purged, crds->purged.pool );
807 2304 : }
808 :
809 : /* overrides_fast
810 : - returns 1 if candidate overrides existing (incumbent) CRDS value
811 : - returns 0 if candidate does not override existing CRDS value
812 : - return -1 if further checks are needed (e.g. hash comparison) */
813 : int
814 : overrides_fast( fd_crds_entry_t const * incumbent,
815 : fd_gossip_view_crds_value_t const * candidate,
816 2304 : uchar const * payload ){
817 2304 : long existing_wc = incumbent->wallclock_nanos;
818 2304 : long candidate_wc = candidate->wallclock_nanos;
819 2304 : long existing_ci_onset = incumbent->contact_info.instance_creation_wallclock_nanos;
820 2304 : long candidate_ci_onset = candidate->ci_view->contact_info->instance_creation_wallclock_nanos;
821 :
822 2304 : switch( candidate->tag ) {
823 0 : case FD_GOSSIP_VALUE_CONTACT_INFO:
824 0 : if( FD_UNLIKELY( candidate_ci_onset>existing_ci_onset ) ) return 1;
825 0 : else if( FD_UNLIKELY( candidate_ci_onset<existing_ci_onset ) ) return 0;
826 0 : else if( FD_UNLIKELY( candidate_wc>existing_wc ) ) return 1;
827 0 : else if( FD_UNLIKELY( candidate_wc<existing_wc ) ) return 0;
828 0 : break;
829 0 : case FD_GOSSIP_VALUE_NODE_INSTANCE:
830 0 : if( FD_LIKELY( candidate->node_instance->token==incumbent->node_instance.token ) ) break;
831 0 : else if( FD_LIKELY( memcmp( payload+candidate->pubkey_off, incumbent->key.pubkey, 32UL ) ) ) break;
832 0 : else if( FD_UNLIKELY( candidate_wc>existing_wc ) ) return 1;
833 0 : else if( FD_UNLIKELY( candidate_wc<existing_wc ) ) return 0;
834 0 : else if( candidate->node_instance->token<incumbent->node_instance.token ) return 0;;
835 0 : break;
836 2304 : default:
837 2304 : break;
838 2304 : }
839 :
840 2304 : if( FD_UNLIKELY( candidate_wc>existing_wc ) ) return 1;
841 0 : else if( FD_UNLIKELY( candidate_wc<existing_wc ) ) return 0;
842 0 : return -1;
843 2304 : }
844 :
845 :
846 : void
847 : fd_crds_insert_failed_insert( fd_crds_t * crds,
848 : uchar const * hash,
849 0 : long now ) {
850 0 : ulong hash_prefix = fd_ulong_bswap( fd_ulong_load_8( hash ) );
851 0 : if( purged_treap_ele_query( crds->purged.treap, hash_prefix, crds->purged.pool ) ) {
852 0 : return;
853 0 : }
854 0 : fd_crds_purged_t * failed;
855 0 : if( FD_UNLIKELY( !purged_pool_free( crds->purged.pool ) ) ) {
856 0 : failed = failed_inserts_dlist_ele_pop_head( crds->purged.failed_inserts_dlist, crds->purged.pool );
857 0 : purged_treap_ele_remove( crds->purged.treap, failed, crds->purged.pool );
858 0 : if( FD_LIKELY( crds->metrics ) ) {
859 0 : crds->metrics->purged_evicted_cnt++;
860 0 : }
861 0 : } else {
862 0 : failed = purged_pool_ele_acquire( crds->purged.pool );
863 0 : if( FD_LIKELY( crds->metrics ) ) {
864 0 : crds->metrics->purged_cnt++;
865 0 : }
866 0 : }
867 0 : purged_init( failed, hash, hash_prefix, now );
868 0 : purged_treap_ele_insert( crds->purged.treap, failed, crds->purged.pool );
869 0 : failed_inserts_dlist_ele_push_tail( crds->purged.failed_inserts_dlist, failed, crds->purged.pool );
870 0 : }
871 :
872 : int
873 : fd_crds_checks_fast( fd_crds_t * crds,
874 : fd_gossip_view_crds_value_t const * candidate,
875 : uchar const * payload,
876 4608 : uchar from_push_msg ) {
877 4608 : fd_crds_key_t candidate_key;
878 4608 : generate_key( candidate, payload, &candidate_key );
879 4608 : fd_crds_entry_t * incumbent = lookup_map_ele_query( crds->lookup_map, &candidate_key, NULL, crds->pool );
880 :
881 4608 : if( FD_UNLIKELY( !incumbent ) ) return FD_CRDS_UPSERT_CHECK_UPSERTS;
882 :
883 2304 : if( FD_UNLIKELY( fd_ulong_load_8( incumbent->value_bytes )==fd_ulong_load_8( payload+candidate->value_off ) ) ) {
884 : /* We have a duplicate, so we return the number of duplicates */
885 0 : return (int)(++incumbent->num_duplicates);
886 0 : }
887 2304 : int overrides = overrides_fast( incumbent, candidate, payload );
888 2304 : if( FD_LIKELY( overrides==1 ) ) return FD_CRDS_UPSERT_CHECK_UPSERTS;
889 :
890 0 : uchar cand_hash[ 32UL ];
891 0 : fd_crds_generate_hash( crds->sha256, payload+candidate->value_off, candidate->length, cand_hash );
892 :
893 0 : if( FD_UNLIKELY( overrides==-1 ) ) {
894 : /* Tiebreaker case, we compare hash values */
895 0 : int res = memcmp( cand_hash, incumbent->value_hash, 32UL );
896 0 : if( FD_UNLIKELY( !res ) ) {
897 : /* Hashes match, so we treat this as a duplicate */
898 0 : return (int)(++incumbent->num_duplicates);
899 0 : } else if( res>0 ) {
900 : /* Candidate hash is greater than incumbent hash, so we treat
901 : this as an upsert */
902 0 : return FD_CRDS_UPSERT_CHECK_UPSERTS;
903 0 : }
904 0 : }
905 :
906 0 : from_push_msg ? insert_purged( crds, cand_hash, candidate->wallclock_nanos ) :
907 0 : fd_crds_insert_failed_insert( crds, cand_hash, candidate->wallclock_nanos );
908 :
909 0 : return FD_CRDS_UPSERT_CHECK_FAILS;
910 0 : }
911 :
912 : static inline void
913 : publish_update_msg( fd_crds_t * crds,
914 : fd_crds_entry_t * entry,
915 : fd_gossip_view_crds_value_t const * entry_view,
916 : uchar const * payload,
917 : long now,
918 4758 : fd_stem_context_t * stem ) {
919 4758 : if( FD_UNLIKELY( !stem ) ) return;
920 0 : if( FD_LIKELY( entry->key.tag!=FD_GOSSIP_VALUE_CONTACT_INFO &&
921 0 : entry->key.tag!=FD_GOSSIP_VALUE_LOWEST_SLOT &&
922 0 : entry->key.tag!=FD_GOSSIP_VALUE_VOTE &&
923 0 : entry->key.tag!=FD_GOSSIP_VALUE_DUPLICATE_SHRED &&
924 0 : entry->key.tag!=FD_GOSSIP_VALUE_INC_SNAPSHOT_HASHES ) ) {
925 0 : return;
926 0 : }
927 :
928 0 : fd_gossip_update_message_t * msg = fd_gossip_out_get_chunk( crds->gossip_update );
929 0 : msg->wallclock_nanos = now;
930 0 : fd_memcpy( msg->origin_pubkey, entry->key.pubkey, 32UL );
931 0 : ulong sz;
932 0 : switch( entry->key.tag ) {
933 0 : case FD_GOSSIP_VALUE_CONTACT_INFO:
934 0 : msg->tag = FD_GOSSIP_UPDATE_TAG_CONTACT_INFO;
935 0 : *msg->contact_info.contact_info = *entry->contact_info.ci->contact_info;
936 0 : msg->contact_info.idx = crds_contact_info_pool_idx( crds->contact_info.pool, entry->contact_info.ci );
937 0 : sz = FD_GOSSIP_UPDATE_SZ_CONTACT_INFO;
938 0 : break;
939 0 : case FD_GOSSIP_VALUE_LOWEST_SLOT:
940 0 : msg->tag = FD_GOSSIP_UPDATE_TAG_LOWEST_SLOT;
941 0 : sz = FD_GOSSIP_UPDATE_SZ_LOWEST_SLOT;
942 0 : msg->lowest_slot = entry_view->lowest_slot;
943 0 : break;
944 0 : case FD_GOSSIP_VALUE_VOTE:
945 0 : msg->tag = FD_GOSSIP_UPDATE_TAG_VOTE;
946 : /* TODO: dynamic sizing */
947 0 : sz = FD_GOSSIP_UPDATE_SZ_VOTE;
948 0 : fd_crds_key_t lookup_ci;
949 0 : lookup_ci.tag = FD_GOSSIP_VALUE_CONTACT_INFO;
950 0 : fd_memcpy( &lookup_ci.pubkey, entry->key.pubkey, sizeof(fd_pubkey_t) );
951 0 : fd_crds_entry_t * ci = lookup_map_ele_query( crds->lookup_map, &lookup_ci, NULL, crds->pool );
952 :
953 0 : if( FD_LIKELY( ci && ci->key.tag == FD_GOSSIP_VALUE_CONTACT_INFO ) ) {
954 0 : msg->vote.socket = ci->contact_info.ci->contact_info->sockets[ FD_CONTACT_INFO_SOCKET_GOSSIP ];
955 0 : } else {
956 0 : msg->vote.socket = (fd_ip4_port_t){ 0 };
957 0 : }
958 :
959 0 : msg->vote.vote_tower_index = entry->key.vote_index;
960 0 : msg->vote.txn_sz = entry_view->vote->txn_sz;
961 0 : fd_memcpy( msg->vote.txn, payload+entry_view->vote->txn_off, entry_view->vote->txn_sz );
962 0 : break;
963 0 : case FD_GOSSIP_VALUE_DUPLICATE_SHRED:
964 0 : msg->tag = FD_GOSSIP_UPDATE_TAG_DUPLICATE_SHRED;
965 : /* TODO: dynamic sizing */
966 0 : sz = FD_GOSSIP_UPDATE_SZ_DUPLICATE_SHRED;
967 0 : {
968 0 : fd_gossip_view_duplicate_shred_t const * ds = entry_view->duplicate_shred;
969 0 : fd_gossip_duplicate_shred_t * ds_msg = &msg->duplicate_shred;
970 :
971 0 : ds_msg->index = ds->index;
972 0 : ds_msg->slot = ds->slot;
973 0 : ds_msg->num_chunks = ds->num_chunks;
974 0 : ds_msg->chunk_index = ds->chunk_index;
975 0 : ds_msg->wallclock = entry->wallclock_nanos;
976 0 : ds_msg->chunk_len = ds->chunk_len;
977 0 : fd_memcpy( ds_msg->chunk, payload+ds->chunk_off, ds->chunk_len );
978 0 : }
979 0 : break;
980 0 : case FD_GOSSIP_VALUE_INC_SNAPSHOT_HASHES:
981 0 : msg->tag = FD_GOSSIP_UPDATE_TAG_SNAPSHOT_HASHES;
982 : /* TODO: dynamic sizing */
983 0 : sz = FD_GOSSIP_UPDATE_SZ_SNAPSHOT_HASHES;
984 0 : {
985 0 : fd_gossip_view_snapshot_hashes_t const * sh = entry_view->snapshot_hashes;
986 0 : fd_gossip_snapshot_hashes_t * sh_msg = &msg->snapshot_hashes;
987 :
988 0 : sh_msg->incremental_len = sh->inc_len;
989 0 : fd_memcpy( sh_msg->full, payload+sh->full_off, sizeof(fd_gossip_snapshot_hash_pair_t) );
990 0 : fd_memcpy( sh_msg->incremental, payload+sh->inc_off, sh->inc_len*sizeof(fd_gossip_snapshot_hash_pair_t) );
991 0 : }
992 0 : break;
993 0 : default:
994 0 : FD_LOG_ERR(( "impossible" ));
995 0 : }
996 0 : fd_gossip_tx_publish_chunk( crds->gossip_update,
997 0 : stem,
998 0 : (ulong)msg->tag,
999 0 : sz,
1000 0 : now );
1001 0 : }
1002 :
1003 : fd_crds_entry_t const *
1004 : fd_crds_insert( fd_crds_t * crds,
1005 : fd_gossip_view_crds_value_t const * candidate_view,
1006 : uchar const * payload,
1007 : ulong origin_stake,
1008 : uchar is_from_me,
1009 : long now ,
1010 4758 : fd_stem_context_t * stem ) {
1011 : /* Update table count metrics at the end to avoid early return
1012 : handling */
1013 4758 : fd_crds_entry_t * candidate = fd_crds_acquire( crds, now, stem );
1014 4758 : crds_entry_init( candidate_view, crds->sha256, payload, origin_stake, candidate );
1015 :
1016 4758 : crds->metrics->count[ candidate->key.tag ]++;
1017 :
1018 4758 : fd_crds_entry_t * incumbent = lookup_map_ele_query( crds->lookup_map, &candidate->key, NULL, crds->pool );
1019 4758 : uchar is_replacing = incumbent!=NULL;
1020 4758 : if( FD_LIKELY( is_replacing ) ) {
1021 2304 : insert_purged( crds, incumbent->value_hash, now );
1022 :
1023 2304 : if( FD_UNLIKELY( incumbent->key.tag==FD_GOSSIP_VALUE_CONTACT_INFO ) ) {
1024 0 : if( FD_LIKELY( !!incumbent->contact_info.fresh_dlist.in_list ) ) crds_contact_info_fresh_list_ele_remove( crds->contact_info.fresh_dlist, incumbent, crds->pool );
1025 0 : crds_contact_info_evict_dlist_ele_remove( crds->contact_info.evict_dlist, incumbent, crds->pool );
1026 0 : candidate->contact_info.ci = incumbent->contact_info.ci;
1027 :
1028 : /* is_active is user controlled (specifically by ping_tracker),
1029 : and is used in sampler score calculations. So we inherit the
1030 : incumbent's setting. */
1031 0 : candidate->contact_info.is_active = incumbent->contact_info.is_active;
1032 0 : if( FD_LIKELY( !is_from_me ) ) {
1033 0 : if( FD_UNLIKELY( candidate->stake!=incumbent->stake ) ) {
1034 : /* Perform a rescore here (expensive) */
1035 0 : crds_samplers_upd_peer_at_idx( crds->samplers, candidate, incumbent->contact_info.sampler_idx, now );
1036 0 : } else {
1037 0 : crds_samplers_swap_peer_at_idx( crds->samplers, candidate, incumbent->contact_info.sampler_idx );
1038 0 : }
1039 0 : }
1040 :
1041 0 : if( FD_LIKELY( incumbent->stake ) ) crds->metrics->peer_staked_cnt--;
1042 0 : else crds->metrics->peer_unstaked_cnt--;
1043 0 : crds->metrics->peer_visible_stake -= incumbent->stake;
1044 0 : }
1045 :
1046 2304 : if( FD_LIKELY( incumbent->stake ) ) {
1047 0 : staked_expire_dlist_ele_remove( crds->staked_expire_dlist, incumbent, crds->pool );
1048 2304 : } else {
1049 2304 : unstaked_expire_dlist_ele_remove( crds->unstaked_expire_dlist, incumbent, crds->pool );
1050 2304 : }
1051 2304 : evict_treap_ele_remove( crds->evict_treap, incumbent, crds->pool );
1052 2304 : hash_treap_ele_remove( crds->hash_treap, incumbent, crds->pool );
1053 2304 : lookup_map_ele_remove( crds->lookup_map, &incumbent->key, NULL, crds->pool );
1054 2304 : fd_crds_release( crds, incumbent );
1055 2454 : } else if( candidate->key.tag==FD_GOSSIP_VALUE_CONTACT_INFO ) {
1056 150 : if( FD_UNLIKELY( !crds_contact_info_pool_free( crds->contact_info.pool ) ) ) {
1057 0 : fd_crds_entry_t * evict = crds_contact_info_evict_dlist_ele_peek_head( crds->contact_info.evict_dlist, crds->pool );
1058 0 : remove_contact_info( crds, evict, now, stem );
1059 0 : if( FD_LIKELY( evict->stake ) ) {
1060 0 : staked_expire_dlist_ele_remove( crds->staked_expire_dlist, evict, crds->pool );
1061 0 : } else {
1062 0 : unstaked_expire_dlist_ele_remove( crds->unstaked_expire_dlist, evict, crds->pool );
1063 0 : }
1064 0 : evict_treap_ele_remove( crds->evict_treap, evict, crds->pool );
1065 0 : hash_treap_ele_remove( crds->hash_treap, evict, crds->pool );
1066 0 : lookup_map_ele_remove( crds->lookup_map, &evict->key, NULL, crds->pool );
1067 0 : fd_crds_release( crds, evict );
1068 0 : crds->metrics->peer_evicted_cnt++;
1069 0 : crds->metrics->evicted_cnt++;
1070 0 : }
1071 :
1072 150 : candidate->contact_info.ci = crds_contact_info_pool_ele_acquire( crds->contact_info.pool );
1073 150 : }
1074 :
1075 4758 : candidate->num_duplicates = 0UL;
1076 4758 : candidate->expire.wallclock_nanos = now;
1077 4758 : candidate->value_sz = candidate_view->length;
1078 4758 : fd_memcpy( candidate->value_bytes, payload+candidate_view->value_off, candidate_view->length );
1079 :
1080 4758 : crds->has_staked_node |= candidate->stake ? 1 : 0;
1081 :
1082 4758 : evict_treap_ele_insert( crds->evict_treap, candidate, crds->pool );
1083 4758 : if( FD_LIKELY( candidate->stake ) ) {
1084 150 : staked_expire_dlist_ele_push_tail( crds->staked_expire_dlist, candidate, crds->pool );
1085 4608 : } else {
1086 4608 : unstaked_expire_dlist_ele_push_tail( crds->unstaked_expire_dlist, candidate, crds->pool );
1087 4608 : }
1088 4758 : hash_treap_ele_insert( crds->hash_treap, candidate, crds->pool );
1089 4758 : lookup_map_ele_insert( crds->lookup_map, candidate, crds->pool );
1090 :
1091 4758 : if( FD_UNLIKELY( candidate->key.tag==FD_GOSSIP_VALUE_CONTACT_INFO ) ) {
1092 150 : fd_memcpy( candidate->contact_info.ci->contact_info, candidate_view->ci_view->contact_info, sizeof(fd_contact_info_t) );
1093 : /* Default to active, since we filter inactive entries prior to insertion */
1094 150 : candidate->contact_info.is_active = 1;
1095 :
1096 150 : crds_contact_info_evict_dlist_ele_push_tail( crds->contact_info.evict_dlist, candidate, crds->pool );
1097 :
1098 150 : if( FD_LIKELY( !is_from_me ) ){
1099 150 : crds_contact_info_fresh_list_ele_push_tail( crds->contact_info.fresh_dlist, candidate, crds->pool );
1100 150 : candidate->contact_info.fresh_dlist.in_list = 1;
1101 150 : } else {
1102 0 : candidate->contact_info.fresh_dlist.in_list = 0;
1103 0 : }
1104 :
1105 150 : if( FD_UNLIKELY( !is_replacing && !is_from_me ) ) {
1106 150 : crds_samplers_add_peer( crds->samplers, candidate, now);
1107 150 : }
1108 :
1109 150 : if( FD_LIKELY( candidate->stake ) ) crds->metrics->peer_staked_cnt++;
1110 0 : else crds->metrics->peer_unstaked_cnt++;
1111 150 : crds->metrics->peer_visible_stake += candidate->stake;
1112 150 : }
1113 :
1114 4758 : publish_update_msg( crds, candidate, candidate_view, payload, now, stem );
1115 4758 : return candidate;
1116 4758 : }
1117 :
1118 : void
1119 : fd_crds_entry_value( fd_crds_entry_t const * entry,
1120 : uchar const ** value_bytes,
1121 0 : ulong * value_sz ) {
1122 0 : *value_bytes = entry->value_bytes;
1123 0 : *value_sz = entry->value_sz;
1124 0 : }
1125 :
1126 : uchar const *
1127 2403 : fd_crds_entry_hash( fd_crds_entry_t const * entry ) {
1128 2403 : return entry->value_hash;
1129 2403 : }
1130 :
1131 : inline static void
1132 : make_contact_info_key( uchar const * pubkey,
1133 150 : fd_crds_key_t * key_out ) {
1134 150 : key_out->tag = FD_GOSSIP_VALUE_CONTACT_INFO;
1135 150 : fd_memcpy( key_out->pubkey, pubkey, 32UL );
1136 150 : }
1137 :
1138 : int
1139 0 : fd_crds_entry_is_contact_info( fd_crds_entry_t const * entry ) {
1140 0 : return entry->key.tag==FD_GOSSIP_VALUE_CONTACT_INFO;
1141 0 : }
1142 :
1143 : fd_contact_info_t *
1144 3 : fd_crds_entry_contact_info( fd_crds_entry_t const * entry ) {
1145 3 : return entry->contact_info.ci->contact_info;
1146 3 : }
1147 :
1148 :
1149 : fd_contact_info_t const *
1150 : fd_crds_contact_info_lookup( fd_crds_t const * crds,
1151 0 : uchar const * pubkey ) {
1152 :
1153 0 : fd_crds_key_t key[1];
1154 0 : make_contact_info_key( pubkey, key );
1155 0 : fd_crds_entry_t * peer_ci = lookup_map_ele_query( crds->lookup_map, key, NULL, crds->pool );
1156 0 : if( FD_UNLIKELY( !peer_ci ) ) {
1157 0 : return NULL;
1158 0 : }
1159 :
1160 0 : return peer_ci->contact_info.ci->contact_info;
1161 0 : }
1162 :
1163 : ulong
1164 3 : fd_crds_peer_count( fd_crds_t const * crds ){
1165 3 : return crds_contact_info_pool_used( crds->contact_info.pool );
1166 3 : }
1167 :
1168 : static inline void
1169 : set_peer_active_status( fd_crds_t * crds,
1170 : uchar const * peer_pubkey,
1171 : uchar status,
1172 150 : long now ) {
1173 :
1174 150 : fd_crds_key_t key[1];
1175 150 : make_contact_info_key( peer_pubkey, key );
1176 :
1177 150 : fd_crds_entry_t * peer_ci = lookup_map_ele_query( crds->lookup_map, key, NULL, crds->pool );
1178 : /* TODO: error handling? This technically should never hit */
1179 150 : if( FD_UNLIKELY( !peer_ci ) ) return;
1180 150 : uchar old_status = peer_ci->contact_info.is_active;
1181 150 : peer_ci->contact_info.is_active = status;
1182 :
1183 150 : if( FD_UNLIKELY( old_status!=status ) ) {
1184 : /* Trigger sampler update */
1185 0 : crds_samplers_upd_peer_at_idx( crds->samplers,
1186 0 : peer_ci,
1187 0 : peer_ci->contact_info.sampler_idx,
1188 0 : now );
1189 0 : }
1190 150 : }
1191 : void
1192 : fd_crds_peer_active( fd_crds_t * crds,
1193 : uchar const * peer_pubkey,
1194 150 : long now ) {
1195 150 : set_peer_active_status( crds, peer_pubkey, 1 /* active */, now );
1196 150 : }
1197 :
1198 : void
1199 : fd_crds_peer_inactive( fd_crds_t * crds,
1200 : uchar const * peer_pubkey,
1201 0 : long now ) {
1202 0 : set_peer_active_status( crds, peer_pubkey, 0 /* inactive */, now );
1203 0 : }
1204 :
1205 : fd_contact_info_t const *
1206 : fd_crds_peer_sample( fd_crds_t const * crds,
1207 0 : fd_rng_t * rng ) {
1208 0 : ulong idx = wpeer_sampler_sample( crds->samplers->pr_sampler,
1209 0 : rng,
1210 0 : crds->samplers->ele_cnt );
1211 0 : if( FD_UNLIKELY( idx==SAMPLE_IDX_SENTINEL ) ) return NULL;
1212 0 : return fd_crds_entry_contact_info( crds->samplers->ele[idx] );
1213 0 : }
1214 :
1215 : fd_contact_info_t const *
1216 : fd_crds_bucket_sample_and_remove( fd_crds_t * crds,
1217 : fd_rng_t * rng,
1218 3 : ulong bucket ) {
1219 3 : ulong idx = wpeer_sampler_sample( &crds->samplers->bucket_samplers[bucket],
1220 3 : rng,
1221 3 : crds->samplers->ele_cnt );
1222 3 : if( FD_UNLIKELY( idx==SAMPLE_IDX_SENTINEL ) ) return NULL;
1223 : /* Disable peer to prevent future sampling until added back with
1224 : fd_crds_bucket_add */
1225 3 : wpeer_sampler_disable( &crds->samplers->bucket_samplers[bucket],
1226 3 : idx,
1227 3 : crds->samplers->ele_cnt );
1228 :
1229 3 : return fd_crds_entry_contact_info( crds->samplers->ele[idx] );
1230 3 : }
1231 :
1232 : void
1233 : fd_crds_bucket_add( fd_crds_t * crds,
1234 : ulong bucket,
1235 0 : uchar const * pubkey ) {
1236 0 : fd_crds_key_t key[1];
1237 0 : make_contact_info_key( pubkey, key );
1238 0 : fd_crds_entry_t * peer_ci = lookup_map_ele_query( crds->lookup_map, key, NULL, crds->pool );
1239 0 : if( FD_UNLIKELY( !peer_ci ) ) {
1240 0 : FD_LOG_DEBUG(( "Sample peer not found in CRDS. Likely dropped." ));
1241 0 : return;
1242 0 : }
1243 0 : wpeer_sampler_t * bucket_sampler = &crds->samplers->bucket_samplers[bucket];
1244 0 : wpeer_sampler_enable( bucket_sampler,
1245 0 : peer_ci->contact_info.sampler_idx,
1246 0 : crds->samplers->ele_cnt );
1247 :
1248 0 : ulong score = wpeer_sampler_bucket_score( peer_ci, bucket );
1249 0 : wpeer_sampler_upd( bucket_sampler,
1250 0 : score,
1251 0 : peer_ci->contact_info.sampler_idx,
1252 0 : crds->samplers->ele_cnt );
1253 0 : }
1254 :
1255 : static void
1256 : generate_masks( ulong mask,
1257 : uint mask_bits,
1258 : ulong * start_mask,
1259 33 : ulong * end_mask ) {
1260 : /* agave defines the mask as a ulong with the top mask_bits bits
1261 : set to the desired prefix and all other bits set to 1. */
1262 33 : FD_TEST( mask_bits<64U );
1263 33 : ulong range = fd_ulong_mask( 0U, (int)(63U-mask_bits) );
1264 33 : *start_mask = mask & ~range;
1265 33 : *end_mask = mask | range;
1266 33 : }
1267 :
1268 : struct fd_crds_mask_iter_private {
1269 : ulong idx;
1270 : ulong end_hash;
1271 : };
1272 :
1273 : fd_crds_mask_iter_t *
1274 : fd_crds_mask_iter_init( fd_crds_t const * crds,
1275 : ulong mask,
1276 : uint mask_bits,
1277 15 : uchar iter_mem[ static 16UL ] ) {
1278 15 : ulong start_hash, end_hash;
1279 15 : generate_masks( mask, mask_bits, &start_hash, &end_hash );
1280 :
1281 15 : fd_crds_mask_iter_t * it = (fd_crds_mask_iter_t *)iter_mem;
1282 15 : it->end_hash = end_hash;
1283 15 : it->idx = hash_treap_idx_ge( crds->hash_treap, start_hash, crds->pool );
1284 15 : return it;
1285 15 : }
1286 :
1287 : fd_crds_mask_iter_t *
1288 2403 : fd_crds_mask_iter_next( fd_crds_mask_iter_t * it, fd_crds_t const * crds ) {
1289 2403 : fd_crds_entry_t const * val = hash_treap_ele_fast_const( it->idx, crds->pool );
1290 2403 : it->idx = val->hash.next;
1291 2403 : return it;
1292 2403 : }
1293 :
1294 : int
1295 2418 : fd_crds_mask_iter_done( fd_crds_mask_iter_t * it, fd_crds_t const * crds ) {
1296 2418 : fd_crds_entry_t const * val = hash_treap_ele_fast_const( it->idx, crds->pool );
1297 2418 : return hash_treap_idx_is_null( it->idx ) ||
1298 2418 : (it->end_hash < val->hash.hash_prefix);
1299 2418 : }
1300 :
1301 : fd_crds_entry_t const *
1302 2403 : fd_crds_mask_iter_entry( fd_crds_mask_iter_t * it, fd_crds_t const * crds ){
1303 2403 : return hash_treap_ele_fast_const( it->idx, crds->pool );
1304 2403 : }
1305 :
1306 : fd_crds_mask_iter_t *
1307 : fd_crds_purged_mask_iter_init( fd_crds_t const * crds,
1308 : ulong mask,
1309 : uint mask_bits,
1310 18 : uchar iter_mem[ static 16UL ] ){
1311 18 : ulong start_hash, end_hash;
1312 18 : generate_masks( mask, mask_bits, &start_hash, &end_hash );
1313 :
1314 18 : fd_crds_mask_iter_t * it = (fd_crds_mask_iter_t *)iter_mem;
1315 18 : it->end_hash = end_hash;
1316 18 : it->idx = purged_treap_idx_ge( crds->purged.treap, start_hash, crds->purged.pool );
1317 18 : return it;
1318 18 : }
1319 :
1320 : fd_crds_mask_iter_t *
1321 : fd_crds_purged_mask_iter_next( fd_crds_mask_iter_t * it,
1322 2403 : fd_crds_t const * crds ){
1323 2403 : fd_crds_purged_t const * val = purged_treap_ele_fast_const( it->idx, crds->purged.pool );
1324 2403 : it->idx = val->treap.next;
1325 2403 : return it;
1326 2403 : }
1327 :
1328 : int
1329 : fd_crds_purged_mask_iter_done( fd_crds_mask_iter_t * it,
1330 2421 : fd_crds_t const * crds ){
1331 2421 : fd_crds_purged_t const * val = purged_treap_ele_fast_const( it->idx, crds->purged.pool );
1332 2421 : return purged_treap_idx_is_null( it->idx ) ||
1333 2421 : (it->end_hash < val->treap.hash_prefix);
1334 2421 : }
1335 :
1336 : /* fd_crds_purged_mask_iter_hash returns the hash of the current
1337 : entry in the purged mask iterator. */
1338 : uchar const *
1339 : fd_crds_purged_mask_iter_hash( fd_crds_mask_iter_t * it,
1340 2403 : fd_crds_t const * crds ){
1341 2403 : fd_crds_purged_t const * val = purged_treap_ele_fast_const( it->idx, crds->purged.pool );
1342 2403 : return val->hash;
1343 2403 : }
|