Line data Source code
1 : #include "fd_crds.h"
2 : #include "fd_crds_contact_info.c"
3 : #include "../fd_gossip_types.h"
4 :
5 : #include "../../../ballet/sha256/fd_sha256.h"
6 : #include "../../../funk/fd_funk_base.h" /* no link dependency, only using hash */
7 :
8 : #include <string.h>
9 :
10 : FD_STATIC_ASSERT( CRDS_MAX_CONTACT_INFO==FD_CONTACT_INFO_TABLE_SIZE,
11 : "CRDS_MAX_CONTACT_INFO must match FD_CONTACT_INFO_TABLE_SIZE" );
12 :
13 : struct fd_crds_key {
14 : uchar tag;
15 : uchar pubkey[ 32UL ];
16 : union {
17 : uchar vote_index;
18 : uchar epoch_slots_index;
19 : ushort duplicate_shred_index;
20 : };
21 : };
22 :
23 : typedef struct fd_crds_key fd_crds_key_t;
24 :
25 : /* The CRDS at a high level is just a list of all the messages we have
26 : received over gossip. These are called the CRDS values. Values
27 : are not arbitrary, and must conform to a strictly typed schema of
28 : around 10 different messages. */
29 :
30 : struct fd_crds_entry_private {
31 : /* The core operation of the CRDS is to "upsert" a value. Basically,
32 : all of the message types are keyed by the originators public key,
33 : and we only want to store the most recent message of each type.
34 :
35 : This key field is the key for the hash table. */
36 : fd_crds_key_t key;
37 :
38 : union {
39 : struct {
40 : fd_crds_contact_info_entry_t * ci;
41 : long instance_creation_wallclock_nanos;
42 : uchar is_active;
43 : ulong sampler_idx;
44 :
45 : /* A list of "fresh" contact info entries is maintained, holding
46 : entries that have been refreshed/inserted in the last 60s in
47 : upsertion order (oldest first).
48 :
49 : fd_crds_advance periodically checks for and removes peers from
50 : this list if they exceed the threshold. Peers removed in this
51 : loop are also re-scored in the peer sampler. This is different
52 : from dropping the CRDS entry entirely, which also removes the
53 : entry from this list. To avoid double-popping an entry we use
54 : in_list as a presence check prior to removing */
55 : struct {
56 : ulong prev;
57 : ulong next;
58 : uchar in_list; /* 1 if in the fresh list, 0 otherwise */
59 : } fresh_dlist;
60 :
61 : /* The contact info side table has a separate size limit, so
62 : we maintain a separate evict list to make space for new
63 : entries */
64 : struct {
65 : ulong prev;
66 : ulong next;
67 : } evict_dlist;
68 :
69 : /* TODO: stake-ordered treap/pq? */
70 : } contact_info;
71 : struct {
72 : ulong token;
73 : } node_instance;
74 : };
75 :
76 : /* When an originator creates a CRDS message, they attach their local
77 : wallclock time to it. This time is used to determine when a
78 : message should be upserted. If messages have the same key, the
79 : newer one (as created by the originator) is used.
80 :
81 : Messages encode wallclock in millis, firedancer converts
82 : them into nanos internally. */
83 : long wallclock_nanos;
84 :
85 : uchar value_bytes[ FD_GOSSIP_CRDS_MAX_SZ ];
86 : ushort value_sz;
87 :
88 : /* The value hash is the sha256 of the value_bytes. It is used in
89 : bloom filter generation and as a tiebreaker when a
90 : fd_crds_checks_fast call returns CHECK_UNDETERMINED. */
91 : uchar value_hash[ 32UL ];
92 : ulong num_duplicates;
93 : ulong stake;
94 :
95 : struct {
96 : ulong next;
97 : } pool;
98 :
99 : /* The CRDS needs to perform a variety of actions on the message table
100 : quickly, so there are various indexes woven through them values to
101 : support these actions. They are ...
102 :
103 : lookup is used to enable the core map<key, value> functionality
104 : described for upserts defined by value->key. */
105 : struct {
106 : ulong next;
107 : ulong prev;
108 : } lookup;
109 :
110 : /* The table has a fixed size message capacity, and supports eviction
111 : so insertion never fails. If the table is full and we wish to
112 : insert a new value, the "lowest priority" message is evicted to
113 : make room. This is accomplished with a treap sorted by stake, so
114 : the lowest stake message is removed. */
115 : struct {
116 : ulong parent;
117 : ulong left;
118 : ulong right;
119 : ulong prio;
120 : ulong next;
121 : ulong prev;
122 : } evict;
123 :
124 : /* Values in the table expire after a pre-determined amount of time,
125 : so we also keep a linked list of values sorted by creation time.
126 : The time used here is our nodes wallclock when we received the
127 : CRDS, not the originators local wallclock, which they could skew
128 : to cause their values to live longer.
129 :
130 : There are actually two lists that reuse the same pointers here,
131 : and a value will be in exactly one of the lists. One is for staked
132 : nodes, which values expire after 48 hours, and one is for unstaked
133 : nodes, which expire after 15 seconds. */
134 : struct {
135 : long wallclock_nanos;
136 : ulong prev;
137 : ulong next;
138 : } expire;
139 :
140 : /* Finally, a core operation on the CRDS is to to query for values by
141 : hash, to respond to pull requests. This is done with a treap
142 : sorted by hash, which is just the first 8 bytes value_hash. */
143 : struct {
144 : ulong hash;
145 : ulong parent;
146 : ulong left;
147 : ulong right;
148 : ulong next;
149 : ulong prev;
150 : ulong prio;
151 : } hash;
152 : };
153 :
154 : #define POOL_NAME crds_pool
155 6 : #define POOL_T fd_crds_entry_t
156 3222 : #define POOL_NEXT pool.next
157 :
158 : #include "../../../util/tmpl/fd_pool.c"
159 :
160 : #define TREAP_NAME evict_treap
161 : #define TREAP_T fd_crds_entry_t
162 : #define TREAP_QUERY_T void * /* We don't use query ... */
163 : #define TREAP_CMP(q,e) (__extension__({ (void)(q); (void)(e); -1; })) /* which means we don't need to give a real
164 : implementation to cmp either */
165 873 : #define TREAP_IDX_T ulong
166 687 : #define TREAP_LT(e0,e1) ((e0)->stake<(e1)->stake)
167 573 : #define TREAP_PARENT evict.parent
168 291 : #define TREAP_LEFT evict.left
169 150 : #define TREAP_RIGHT evict.right
170 3504 : #define TREAP_PRIO evict.prio
171 : #define TREAP_OPTIMIZE_ITERATION 1
172 150 : #define TREAP_NEXT evict.next
173 150 : #define TREAP_PREV evict.prev
174 :
175 : #include "../../../util/tmpl/fd_treap.c"
176 :
177 : #define DLIST_NAME staked_expire_dlist
178 : #define DLIST_ELE_T fd_crds_entry_t
179 150 : #define DLIST_PREV expire.prev
180 150 : #define DLIST_NEXT expire.next
181 :
182 : #include "../../../util/tmpl/fd_dlist.c"
183 :
184 : #define DLIST_NAME unstaked_expire_dlist
185 : #define DLIST_ELE_T fd_crds_entry_t
186 0 : #define DLIST_PREV expire.prev
187 0 : #define DLIST_NEXT expire.next
188 :
189 : #include "../../../util/tmpl/fd_dlist.c"
190 :
191 :
192 : #define DLIST_NAME crds_contact_info_fresh_list
193 : #define DLIST_ELE_T fd_crds_entry_t
194 150 : #define DLIST_PREV contact_info.fresh_dlist.prev
195 150 : #define DLIST_NEXT contact_info.fresh_dlist.next
196 : #include "../../../util/tmpl/fd_dlist.c"
197 :
198 : #define DLIST_NAME crds_contact_info_evict_dlist
199 : #define DLIST_ELE_T fd_crds_entry_t
200 150 : #define DLIST_PREV contact_info.evict_dlist.prev
201 150 : #define DLIST_NEXT contact_info.evict_dlist.next
202 : #include "../../../util/tmpl/fd_dlist.c"
203 :
204 : #define TREAP_NAME hash_treap
205 : #define TREAP_T fd_crds_entry_t
206 : #define TREAP_QUERY_T ulong
207 0 : #define TREAP_CMP(q,e) ((q>e->hash.hash)-(q<e->hash.hash))
208 1305 : #define TREAP_IDX_T ulong
209 : #define TREAP_OPTIMIZE_ITERATION 1
210 150 : #define TREAP_NEXT hash.next
211 150 : #define TREAP_PREV hash.prev
212 834 : #define TREAP_LT(e0,e1) ((e0)->hash.hash<(e1)->hash.hash)
213 :
214 1005 : #define TREAP_PARENT hash.parent
215 435 : #define TREAP_LEFT hash.left
216 150 : #define TREAP_RIGHT hash.right
217 3648 : #define TREAP_PRIO hash.prio
218 :
219 : #include "../../../util/tmpl/fd_treap.c"
220 :
221 : static inline ulong
222 : lookup_hash( fd_crds_key_t const * key,
223 450 : ulong seed ) {
224 450 : ulong hash_fn = ((ulong)key->tag)<<16;
225 450 : switch( key->tag ) {
226 0 : case FD_GOSSIP_VALUE_VOTE:
227 0 : hash_fn ^= key->vote_index;
228 0 : break;
229 0 : case FD_GOSSIP_VALUE_EPOCH_SLOTS:
230 0 : hash_fn ^= key->epoch_slots_index;
231 0 : break;
232 0 : case FD_GOSSIP_VALUE_DUPLICATE_SHRED:
233 0 : hash_fn ^= key->duplicate_shred_index;
234 0 : break;
235 450 : default:
236 450 : break;
237 450 : }
238 450 : return fd_funk_rec_key_hash1( key->pubkey, hash_fn, seed );
239 450 : }
240 :
241 : static inline int
242 : lookup_eq( fd_crds_key_t const * key0,
243 153 : fd_crds_key_t const * key1 ) {
244 153 : if( FD_UNLIKELY( key0->tag!=key1->tag ) ) return 0;
245 153 : if( FD_UNLIKELY( !!memcmp( key0->pubkey, key1->pubkey, 32UL ) ) ) return 0;
246 150 : switch( key0->tag ) {
247 0 : case FD_GOSSIP_VALUE_VOTE:
248 0 : return key0->vote_index==key1->vote_index;
249 0 : case FD_GOSSIP_VALUE_EPOCH_SLOTS:
250 0 : return key0->epoch_slots_index==key1->epoch_slots_index;
251 0 : case FD_GOSSIP_VALUE_DUPLICATE_SHRED:
252 0 : return key0->duplicate_shred_index==key1->duplicate_shred_index;
253 150 : default:
254 150 : break;
255 150 : }
256 150 : return 1;
257 150 : }
258 :
259 : #define MAP_NAME lookup_map
260 : #define MAP_ELE_T fd_crds_entry_t
261 : #define MAP_KEY_T fd_crds_key_t
262 150 : #define MAP_KEY key
263 753 : #define MAP_IDX_T ulong
264 153 : #define MAP_NEXT lookup.next
265 153 : #define MAP_PREV lookup.prev
266 450 : #define MAP_KEY_HASH(k,s) (lookup_hash( k, s ))
267 153 : #define MAP_KEY_EQ(k0,k1) (lookup_eq( k0, k1 ))
268 : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
269 :
270 : #include "../../../util/tmpl/fd_map_chain.c"
271 :
272 : #include "fd_crds_peer_samplers.c"
273 :
274 : struct fd_crds_purged {
275 : uchar hash[ 32UL ];
276 : struct {
277 : ulong next;
278 : } pool;
279 :
280 : /* Similar to fd_crds_entry, we want the ability to query and iterate
281 : through value by hash[:8] to generate pull requests. */
282 : struct {
283 : ulong parent;
284 : ulong left;
285 : ulong right;
286 : ulong next;
287 : ulong prev;
288 : ulong prio;
289 : } treap;
290 :
291 : /* Similar to fd_crds_entry, we keep a linked list of purged values sorted
292 : by insertion time. The time used here is our node's wallclock.
293 :
294 : There are actually two (mutually exclusive) lists that reuse the same
295 : pointers here: one for "purged" entries that expire in 60s and one for
296 : "failed_inserts" that expire after 20s. */
297 : struct {
298 : long wallclock_nanos;
299 : ulong next;
300 : ulong prev;
301 : } expire;
302 : };
303 : typedef struct fd_crds_purged fd_crds_purged_t;
304 :
305 : #define POOL_NAME purged_pool
306 6 : #define POOL_T fd_crds_purged_t
307 1536 : #define POOL_NEXT pool.next
308 :
309 : #include "../../../util/tmpl/fd_pool.c"
310 :
311 : #define TREAP_NAME purged_treap
312 : #define TREAP_T fd_crds_purged_t
313 : #define TREAP_QUERY_T ulong
314 0 : #define TREAP_CMP(q,e) ((q>*(ulong *)(e->hash))-(q<*(ulong *)(e->hash)))
315 : #define TREAP_OPTIMIZE_ITERATION 1
316 0 : #define TREAP_NEXT treap.next
317 0 : #define TREAP_PREV treap.prev
318 0 : #define TREAP_LT(e0,e1) (*(ulong *)((e0)->hash)<*(ulong *)((e1)->hash))
319 :
320 0 : #define TREAP_PARENT treap.parent
321 0 : #define TREAP_LEFT treap.left
322 0 : #define TREAP_RIGHT treap.right
323 1536 : #define TREAP_PRIO treap.prio
324 :
325 : #include "../../../util/tmpl/fd_treap.c"
326 :
327 : #define DLIST_NAME failed_inserts_dlist
328 : #define DLIST_ELE_T fd_crds_purged_t
329 0 : #define DLIST_PREV expire.prev
330 0 : #define DLIST_NEXT expire.next
331 :
332 : #include "../../../util/tmpl/fd_dlist.c"
333 :
334 : #define DLIST_NAME purged_dlist
335 : #define DLIST_ELE_T fd_crds_purged_t
336 0 : #define DLIST_PREV expire.prev
337 0 : #define DLIST_NEXT expire.next
338 :
339 : #include "../../../util/tmpl/fd_dlist.c"
340 :
341 : struct fd_crds_private {
342 : fd_gossip_out_ctx_t * gossip_update;
343 :
344 : fd_sha256_t sha256[1];
345 :
346 : int has_staked_node;
347 :
348 : fd_crds_entry_t * pool;
349 :
350 : evict_treap_t * evict_treap;
351 : staked_expire_dlist_t * staked_expire_dlist;
352 : unstaked_expire_dlist_t * unstaked_expire_dlist;
353 : hash_treap_t * hash_treap;
354 : lookup_map_t * lookup_map;
355 :
356 : struct {
357 : fd_crds_purged_t * pool;
358 : purged_treap_t * treap;
359 : purged_dlist_t * purged_dlist;
360 : failed_inserts_dlist_t * failed_inserts_dlist;
361 : } purged;
362 :
363 : struct {
364 : fd_crds_contact_info_entry_t * pool;
365 : crds_contact_info_fresh_list_t * fresh_dlist;
366 : crds_contact_info_evict_dlist_t * evict_dlist;
367 : } contact_info;
368 :
369 : crds_samplers_t samplers[1];
370 :
371 : fd_crds_metrics_t metrics[1];
372 :
373 : ulong magic;
374 : };
375 :
376 : FD_FN_CONST ulong
377 9 : fd_crds_align( void ) {
378 9 : return FD_CRDS_ALIGN;
379 9 : }
380 :
381 : FD_FN_CONST ulong
382 : fd_crds_footprint( ulong ele_max,
383 3 : ulong purged_max ) {
384 3 : ulong l;
385 3 : l = FD_LAYOUT_INIT;
386 3 : l = FD_LAYOUT_APPEND( l, FD_CRDS_ALIGN, sizeof(fd_crds_t) );
387 3 : l = FD_LAYOUT_APPEND( l, crds_pool_align(), crds_pool_footprint( ele_max ) );
388 3 : l = FD_LAYOUT_APPEND( l, evict_treap_align(), evict_treap_footprint( ele_max ) );
389 3 : l = FD_LAYOUT_APPEND( l, staked_expire_dlist_align(), staked_expire_dlist_footprint() );
390 3 : l = FD_LAYOUT_APPEND( l, unstaked_expire_dlist_align(), unstaked_expire_dlist_footprint() );
391 3 : l = FD_LAYOUT_APPEND( l, hash_treap_align(), hash_treap_footprint( ele_max ) );
392 3 : l = FD_LAYOUT_APPEND( l, lookup_map_align(), lookup_map_footprint( ele_max ) );
393 3 : l = FD_LAYOUT_APPEND( l, purged_pool_align(), purged_pool_footprint( purged_max ) );
394 3 : l = FD_LAYOUT_APPEND( l, purged_treap_align(), purged_treap_footprint( purged_max ) );
395 3 : l = FD_LAYOUT_APPEND( l, purged_dlist_align(), purged_dlist_footprint() );
396 3 : l = FD_LAYOUT_APPEND( l, failed_inserts_dlist_align(), failed_inserts_dlist_footprint() );
397 3 : l = FD_LAYOUT_APPEND( l, crds_contact_info_pool_align(), crds_contact_info_pool_footprint( CRDS_MAX_CONTACT_INFO ) );
398 3 : l = FD_LAYOUT_APPEND( l, crds_contact_info_fresh_list_align(), crds_contact_info_fresh_list_footprint() );
399 3 : l = FD_LAYOUT_APPEND( l, crds_contact_info_evict_dlist_align(), crds_contact_info_evict_dlist_footprint() );
400 3 : return FD_LAYOUT_FINI( l, FD_CRDS_ALIGN );
401 3 : }
402 :
403 : void *
404 : fd_crds_new( void * shmem,
405 : fd_rng_t * rng,
406 : ulong ele_max,
407 : ulong purged_max,
408 3 : fd_gossip_out_ctx_t * gossip_update_out ) {
409 3 : if( FD_UNLIKELY( !shmem ) ) {
410 0 : FD_LOG_WARNING(( "NULL shmem" ));
411 0 : return NULL;
412 0 : }
413 :
414 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_crds_align() ) ) ) {
415 0 : FD_LOG_WARNING(( "misaligned shmem" ));
416 0 : return NULL;
417 0 : }
418 :
419 3 : if( FD_UNLIKELY( !ele_max || !fd_ulong_is_pow2( ele_max ) ) ) {
420 0 : FD_LOG_WARNING(( "ele_max must be a power of 2" ));
421 0 : return NULL;
422 0 : }
423 :
424 3 : if( FD_UNLIKELY( !purged_max || !fd_ulong_is_pow2( purged_max ) ) ) {
425 0 : FD_LOG_WARNING(( "purged_max must be a power of 2" ));
426 0 : return NULL;
427 0 : }
428 :
429 3 : if( FD_UNLIKELY( !rng ) ) {
430 0 : FD_LOG_WARNING(( "NULL rng" ));
431 0 : return NULL;
432 0 : }
433 :
434 3 : if( FD_UNLIKELY( !gossip_update_out ) ) {
435 0 : FD_LOG_WARNING(( "NULL gossip_out" ));
436 0 : return NULL;
437 0 : }
438 :
439 3 : FD_SCRATCH_ALLOC_INIT( l, shmem );
440 3 : fd_crds_t * crds = FD_SCRATCH_ALLOC_APPEND( l, FD_CRDS_ALIGN, sizeof(fd_crds_t) );
441 3 : void * _pool = FD_SCRATCH_ALLOC_APPEND( l, crds_pool_align(), crds_pool_footprint( ele_max ) );
442 3 : void * _evict_treap = FD_SCRATCH_ALLOC_APPEND( l, evict_treap_align(), evict_treap_footprint( ele_max ) );
443 3 : void * _staked_expire_dlist = FD_SCRATCH_ALLOC_APPEND( l, staked_expire_dlist_align(), staked_expire_dlist_footprint() );
444 3 : void * _unstaked_expire_dlist = FD_SCRATCH_ALLOC_APPEND( l, unstaked_expire_dlist_align(), unstaked_expire_dlist_footprint() );
445 3 : void * _hash_treap = FD_SCRATCH_ALLOC_APPEND( l, hash_treap_align(), hash_treap_footprint( ele_max ) );
446 3 : void * _lookup_map = FD_SCRATCH_ALLOC_APPEND( l, lookup_map_align(), lookup_map_footprint( ele_max ) );
447 3 : void * _purged_pool = FD_SCRATCH_ALLOC_APPEND( l, purged_pool_align(), purged_pool_footprint( purged_max ) );
448 3 : void * _purged_treap = FD_SCRATCH_ALLOC_APPEND( l, purged_treap_align(), purged_treap_footprint( purged_max ) );
449 3 : void * _purged_dlist = FD_SCRATCH_ALLOC_APPEND( l, purged_dlist_align(), purged_dlist_footprint() );
450 3 : void * _failed_inserts_dlist = FD_SCRATCH_ALLOC_APPEND( l, failed_inserts_dlist_align(), failed_inserts_dlist_footprint() );
451 3 : void * _ci_pool = FD_SCRATCH_ALLOC_APPEND( l, crds_contact_info_pool_align(), crds_contact_info_pool_footprint( CRDS_MAX_CONTACT_INFO ) );
452 3 : void * _ci_dlist = FD_SCRATCH_ALLOC_APPEND( l, crds_contact_info_fresh_list_align(), crds_contact_info_fresh_list_footprint() );
453 3 : void * _ci_evict_dlist = FD_SCRATCH_ALLOC_APPEND( l, crds_contact_info_evict_dlist_align(), crds_contact_info_evict_dlist_footprint() );
454 :
455 0 : crds->pool = crds_pool_join( crds_pool_new( _pool, ele_max ) );
456 3 : FD_TEST( crds->pool );
457 :
458 3 : crds->evict_treap = evict_treap_join( evict_treap_new( _evict_treap, ele_max ) );
459 3 : FD_TEST( crds->evict_treap );
460 3 : evict_treap_seed( crds->pool, ele_max, fd_rng_ulong( rng ) );
461 :
462 3 : crds->staked_expire_dlist = staked_expire_dlist_join( staked_expire_dlist_new( _staked_expire_dlist ) );
463 3 : FD_TEST( crds->staked_expire_dlist );
464 :
465 3 : crds->unstaked_expire_dlist = unstaked_expire_dlist_join( unstaked_expire_dlist_new( _unstaked_expire_dlist ) );
466 3 : FD_TEST( crds->unstaked_expire_dlist );
467 :
468 3 : crds->hash_treap = hash_treap_join( hash_treap_new( _hash_treap, ele_max ) );
469 3 : FD_TEST( crds->hash_treap );
470 3 : hash_treap_seed( crds->pool, ele_max, fd_rng_ulong( rng ) );
471 :
472 3 : crds->lookup_map = lookup_map_join( lookup_map_new( _lookup_map, ele_max, fd_rng_ulong( rng ) ) );
473 3 : FD_TEST( crds->lookup_map );
474 :
475 3 : crds->purged.pool = purged_pool_join( purged_pool_new( _purged_pool, purged_max ) );
476 3 : FD_TEST( crds->purged.pool );
477 :
478 3 : crds->purged.treap = purged_treap_join( purged_treap_new( _purged_treap, purged_max ) );
479 3 : FD_TEST( crds->purged.treap );
480 3 : purged_treap_seed( crds->purged.pool, purged_max, fd_rng_ulong( rng ) );
481 :
482 3 : crds->purged.purged_dlist = purged_dlist_join( purged_dlist_new( _purged_dlist ) );
483 3 : FD_TEST( crds->purged.purged_dlist );
484 :
485 3 : crds->purged.failed_inserts_dlist = failed_inserts_dlist_join( failed_inserts_dlist_new( _failed_inserts_dlist ) );
486 3 : FD_TEST( crds->purged.failed_inserts_dlist );
487 :
488 3 : crds->contact_info.pool = crds_contact_info_pool_join( crds_contact_info_pool_new( _ci_pool, CRDS_MAX_CONTACT_INFO ) );
489 3 : FD_TEST( crds->contact_info.pool );
490 :
491 3 : crds->contact_info.fresh_dlist = crds_contact_info_fresh_list_join( crds_contact_info_fresh_list_new( _ci_dlist ) );
492 3 : FD_TEST( crds->contact_info.fresh_dlist );
493 :
494 3 : crds->contact_info.evict_dlist = crds_contact_info_evict_dlist_join( crds_contact_info_evict_dlist_new( _ci_evict_dlist ) );
495 3 : FD_TEST( crds->contact_info.evict_dlist );
496 :
497 3 : FD_TEST( fd_sha256_join( fd_sha256_new( crds->sha256 ) ) );
498 :
499 3 : crds_samplers_new( crds->samplers );
500 :
501 3 : memset( crds->metrics, 0, sizeof(fd_crds_metrics_t) );
502 :
503 3 : crds->gossip_update = gossip_update_out;
504 3 : crds->has_staked_node = 0;
505 :
506 3 : FD_COMPILER_MFENCE();
507 3 : FD_VOLATILE( crds->magic ) = FD_CRDS_MAGIC;
508 3 : FD_COMPILER_MFENCE();
509 :
510 3 : return (void *)crds;
511 3 : }
512 :
513 : fd_crds_t *
514 3 : fd_crds_join( void * shcrds ) {
515 3 : if( FD_UNLIKELY( !shcrds ) ) {
516 0 : FD_LOG_WARNING(( "NULL shcrds" ));
517 0 : return NULL;
518 0 : }
519 :
520 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shcrds, fd_crds_align() ) ) ) {
521 0 : FD_LOG_WARNING(( "misaligned shcrds" ));
522 0 : return NULL;
523 0 : }
524 :
525 3 : fd_crds_t * crds = (fd_crds_t *)shcrds;
526 :
527 3 : if( FD_UNLIKELY( crds->magic!=FD_CRDS_MAGIC ) ) {
528 0 : FD_LOG_WARNING(( "bad magic" ));
529 0 : return NULL;
530 0 : }
531 :
532 3 : return crds;
533 3 : }
534 :
535 : fd_crds_metrics_t const *
536 0 : fd_crds_metrics( fd_crds_t const * crds ) {
537 0 : return crds->metrics;
538 0 : }
539 :
540 : static inline void
541 : remove_contact_info( fd_crds_t * crds,
542 : fd_crds_entry_t * ci,
543 : long now,
544 0 : fd_stem_context_t * stem ) {
545 0 : if( FD_UNLIKELY( !stem ) ) return;
546 0 : fd_gossip_update_message_t * msg = fd_gossip_out_get_chunk( crds->gossip_update );
547 0 : msg->tag = FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE;
548 0 : msg->wallclock_nanos = now;
549 0 : fd_memcpy( msg->origin_pubkey, ci->key.pubkey, 32UL );
550 0 : msg->contact_info_remove.idx = crds_contact_info_pool_idx( crds->contact_info.pool, ci->contact_info.ci );
551 0 : fd_gossip_tx_publish_chunk( crds->gossip_update, stem, (ulong)msg->tag, FD_GOSSIP_UPDATE_SZ_CONTACT_INFO_REMOVE, now );
552 :
553 0 : if( FD_LIKELY( ci->stake ) ) crds->metrics->peer_staked_cnt--;
554 0 : else crds->metrics->peer_unstaked_cnt--;
555 :
556 0 : crds->metrics->peer_visible_stake -= ci->stake;
557 :
558 0 : if( FD_LIKELY( !!ci->contact_info.fresh_dlist.in_list ) ) {
559 0 : crds_contact_info_fresh_list_ele_remove( crds->contact_info.fresh_dlist, ci, crds->pool );
560 0 : }
561 0 : crds_contact_info_evict_dlist_ele_remove( crds->contact_info.evict_dlist, ci, crds->pool );
562 0 : crds_contact_info_pool_ele_release( crds->contact_info.pool, ci->contact_info.ci );
563 :
564 : /* FIXME: If the peer is in any active set bucket, it is NOT removed
565 : here. If the peer is re-inserted into the CRDS table in the future,
566 : it is added back into the bucket's sampler. This means a peer can
567 : be sampled in a bucket (at least) twice during
568 : fd_active_set_rotate. */
569 0 : crds_samplers_rem_peer( crds->samplers, ci );
570 0 : }
571 :
572 : ulong
573 0 : fd_crds_len( fd_crds_t const * crds ) {
574 0 : return crds_pool_used( crds->pool );
575 0 : }
576 :
577 : ulong
578 0 : fd_crds_purged_len( fd_crds_t const * crds ) {
579 0 : return purged_pool_used( crds->purged.pool );
580 0 : }
581 :
582 : void
583 : fd_crds_release( fd_crds_t * crds,
584 0 : fd_crds_entry_t * value ) {
585 0 : crds_pool_ele_release( crds->pool, value );
586 0 : crds->metrics->count[ value->key.tag ]--;
587 0 : }
588 :
589 : static inline void
590 : expire( fd_crds_t * crds,
591 : long now,
592 0 : fd_stem_context_t * stem ){
593 0 : static const long SLOT_DURATION_NANOS = 400L*1000L*1000L;
594 0 : static const long STAKED_EXPIRE_DURATION_NANOS = 432000L*SLOT_DURATION_NANOS;
595 0 : static const long UNSTAKED_EXPIRE_DURATION_NANOS = 15L*1000L*1000L*1000L;
596 :
597 0 : while( !staked_expire_dlist_is_empty( crds->staked_expire_dlist, crds->pool ) ) {
598 0 : fd_crds_entry_t * head = staked_expire_dlist_ele_peek_head( crds->staked_expire_dlist, crds->pool );
599 :
600 0 : if( FD_LIKELY( head->expire.wallclock_nanos>now-STAKED_EXPIRE_DURATION_NANOS ) ) break;
601 :
602 0 : staked_expire_dlist_ele_pop_head( crds->staked_expire_dlist, crds->pool );
603 0 : hash_treap_ele_remove( crds->hash_treap, head, crds->pool );
604 0 : lookup_map_ele_remove( crds->lookup_map, &head->key, NULL, crds->pool );
605 0 : evict_treap_ele_remove( crds->evict_treap, head, crds->pool );
606 :
607 0 : if( FD_UNLIKELY( head->key.tag==FD_GOSSIP_VALUE_CONTACT_INFO ) ) remove_contact_info( crds, head, now, stem );
608 0 : fd_crds_release( crds, head );
609 :
610 0 : crds->metrics->expired_cnt++;
611 0 : }
612 :
613 0 : long unstaked_expire_duration_nanos = fd_long_if( crds->has_staked_node,
614 0 : UNSTAKED_EXPIRE_DURATION_NANOS,
615 0 : STAKED_EXPIRE_DURATION_NANOS );
616 :
617 0 : while( !unstaked_expire_dlist_is_empty( crds->unstaked_expire_dlist, crds->pool ) ) {
618 0 : fd_crds_entry_t * head = unstaked_expire_dlist_ele_peek_head( crds->unstaked_expire_dlist, crds->pool );
619 :
620 0 : if( FD_LIKELY( head->expire.wallclock_nanos>now-unstaked_expire_duration_nanos ) ) break;
621 :
622 0 : unstaked_expire_dlist_ele_pop_head( crds->unstaked_expire_dlist, crds->pool );
623 0 : hash_treap_ele_remove( crds->hash_treap, head, crds->pool );
624 0 : lookup_map_ele_remove( crds->lookup_map, &head->key, NULL, crds->pool );
625 0 : evict_treap_ele_remove( crds->evict_treap, head, crds->pool );
626 :
627 0 : if( FD_UNLIKELY( head->key.tag==FD_GOSSIP_VALUE_CONTACT_INFO ) ) remove_contact_info( crds, head, now, stem );
628 0 : fd_crds_release( crds, head );
629 :
630 0 : crds->metrics->expired_cnt++;
631 0 : }
632 :
633 0 : while( !purged_dlist_is_empty( crds->purged.purged_dlist, crds->purged.pool ) ) {
634 0 : fd_crds_purged_t * head = purged_dlist_ele_peek_head( crds->purged.purged_dlist, crds->purged.pool );
635 :
636 0 : if( FD_LIKELY( head->expire.wallclock_nanos>now-60L*1000L*1000L*1000L ) ) break;
637 :
638 0 : purged_dlist_ele_pop_head( crds->purged.purged_dlist, crds->purged.pool );
639 0 : purged_treap_ele_remove( crds->purged.treap, head, crds->purged.pool );
640 0 : purged_pool_ele_release( crds->purged.pool, head );
641 :
642 0 : crds->metrics->purged_cnt--;
643 0 : crds->metrics->purged_expired_cnt++;
644 0 : }
645 :
646 0 : while( !failed_inserts_dlist_is_empty( crds->purged.failed_inserts_dlist, crds->purged.pool ) ) {
647 0 : fd_crds_purged_t * head = failed_inserts_dlist_ele_peek_head( crds->purged.failed_inserts_dlist, crds->purged.pool );
648 :
649 0 : if( FD_LIKELY( head->expire.wallclock_nanos>now-20L*1000L*1000L*1000L ) ) break;
650 :
651 0 : failed_inserts_dlist_ele_pop_head( crds->purged.failed_inserts_dlist, crds->purged.pool );
652 0 : purged_treap_ele_remove( crds->purged.treap, head, crds->purged.pool );
653 0 : purged_pool_ele_release( crds->purged.pool, head );
654 :
655 0 : crds->metrics->purged_cnt--;
656 0 : crds->metrics->purged_expired_cnt++;
657 0 : }
658 0 : }
659 :
660 : void
661 : unfresh( fd_crds_t * crds,
662 0 : long now ) {
663 0 : while( !crds_contact_info_fresh_list_is_empty( crds->contact_info.fresh_dlist, crds->pool ) ) {
664 0 : fd_crds_entry_t * head = crds_contact_info_fresh_list_ele_peek_head( crds->contact_info.fresh_dlist, crds->pool );
665 :
666 0 : if( FD_LIKELY( head->expire.wallclock_nanos>now-60L*1000L*1000L*1000L ) ) break;
667 :
668 0 : head = crds_contact_info_fresh_list_ele_pop_head( crds->contact_info.fresh_dlist, crds->pool );
669 0 : head->contact_info.fresh_dlist.in_list = 0;
670 0 : crds_samplers_upd_peer_at_idx( crds->samplers, head, head->contact_info.sampler_idx, now );
671 0 : }
672 0 : }
673 :
674 : void
675 : fd_crds_advance( fd_crds_t * crds,
676 : long now,
677 0 : fd_stem_context_t * stem ) {
678 0 : expire( crds, now, stem );
679 0 : unfresh( crds, now );
680 0 : }
681 :
682 : fd_crds_entry_t *
683 : fd_crds_acquire( fd_crds_t * crds,
684 : long now,
685 150 : fd_stem_context_t * stem ) {
686 150 : if( FD_UNLIKELY( !crds_pool_free( crds->pool ) ) ) {
687 0 : evict_treap_fwd_iter_t head = evict_treap_fwd_iter_init( crds->evict_treap, crds->pool );
688 0 : FD_TEST( !evict_treap_fwd_iter_done( head ) );
689 0 : fd_crds_entry_t * evict = evict_treap_fwd_iter_ele( head, crds->pool );
690 :
691 0 : if( FD_LIKELY( !evict->stake ) ) unstaked_expire_dlist_ele_remove( crds->unstaked_expire_dlist, evict, crds->pool );
692 0 : else staked_expire_dlist_ele_remove( crds->staked_expire_dlist, evict, crds->pool );
693 :
694 0 : hash_treap_ele_remove( crds->hash_treap, evict, crds->pool );
695 0 : lookup_map_ele_remove( crds->lookup_map, &evict->key, NULL, crds->pool );
696 0 : if( FD_UNLIKELY( evict->key.tag==FD_GOSSIP_VALUE_CONTACT_INFO ) ) remove_contact_info( crds, evict, now, stem );
697 :
698 0 : crds->metrics->evicted_cnt++;
699 :
700 0 : return evict;
701 150 : } else {
702 150 : return crds_pool_ele_acquire( crds->pool );
703 150 : }
704 150 : }
705 :
706 : int
707 0 : fd_crds_has_staked_node( fd_crds_t const * crds ) {
708 0 : return crds->has_staked_node;
709 0 : }
710 :
711 : static inline void
712 : generate_key( fd_gossip_view_crds_value_t const * view,
713 : uchar const * payload,
714 150 : fd_crds_key_t * out_key ) {
715 150 : out_key->tag = view->tag;
716 150 : fd_memcpy( out_key->pubkey, payload+view->pubkey_off, 32UL );
717 :
718 150 : switch( out_key->tag ) {
719 0 : case FD_GOSSIP_VALUE_VOTE:
720 0 : out_key->vote_index = view->vote->index;
721 0 : break;
722 0 : case FD_GOSSIP_VALUE_EPOCH_SLOTS:
723 0 : out_key->epoch_slots_index = view->epoch_slots->index;
724 0 : break;
725 0 : case FD_GOSSIP_VALUE_DUPLICATE_SHRED:
726 0 : out_key->duplicate_shred_index = view->duplicate_shred->index;
727 0 : break;
728 150 : default:
729 150 : break;
730 150 : }
731 150 : }
732 :
733 : void
734 : fd_crds_generate_hash( fd_sha256_t * sha,
735 : uchar const * crds_value,
736 : ulong crds_value_sz,
737 150 : uchar out_hash[ static 32UL ] ){
738 150 : fd_sha256_init( sha );
739 150 : fd_sha256_append( sha, crds_value, crds_value_sz );
740 150 : fd_sha256_fini( sha, out_hash );
741 150 : }
742 :
743 : static inline void
744 : crds_entry_init( fd_gossip_view_crds_value_t const * view,
745 : fd_sha256_t * sha,
746 : uchar const * payload,
747 : ulong stake,
748 150 : fd_crds_entry_t * out_value ) {
749 : /* Construct key */
750 150 : fd_crds_key_t * key = &out_value->key;
751 150 : generate_key( view, payload, key );
752 :
753 150 : out_value->wallclock_nanos = view->wallclock_nanos;
754 150 : out_value->stake = stake;
755 :
756 150 : fd_crds_generate_hash( sha, payload+view->value_off, view->length, out_value->value_hash );
757 150 : out_value->hash.hash = fd_ulong_load_8( out_value->value_hash );
758 :
759 150 : if( FD_UNLIKELY( view->tag==FD_GOSSIP_VALUE_NODE_INSTANCE ) ) {
760 0 : out_value->node_instance.token = view->node_instance->token;
761 150 : } else if( FD_UNLIKELY( key->tag==FD_GOSSIP_VALUE_CONTACT_INFO ) ) {
762 150 : out_value->contact_info.instance_creation_wallclock_nanos = view->ci_view->contact_info->instance_creation_wallclock_nanos;
763 : /* Contact Info entry will be added to sampler upon successful insertion */
764 150 : out_value->contact_info.sampler_idx = SAMPLE_IDX_SENTINEL;
765 150 : }
766 150 : }
767 :
768 : static inline void
769 : purged_init( fd_crds_purged_t * purged,
770 : uchar const * hash,
771 0 : long now ) {
772 0 : fd_memcpy( purged->hash, hash, 32UL );
773 0 : purged->expire.wallclock_nanos = now;
774 0 : }
775 :
776 : void
777 : insert_purged( fd_crds_t * crds,
778 : uchar const * hash,
779 0 : long now ) {
780 0 : if( purged_treap_ele_query( crds->purged.treap, *(ulong *)hash, crds->purged.pool ) ) {
781 0 : return;
782 0 : }
783 0 : fd_crds_purged_t * purged;
784 0 : if( FD_UNLIKELY( !purged_pool_free( crds->purged.pool ) ) ) {
785 0 : purged = purged_dlist_ele_pop_head( crds->purged.purged_dlist, crds->purged.pool );
786 0 : purged_treap_ele_remove( crds->purged.treap, purged, crds->purged.pool );
787 0 : if( FD_LIKELY( crds->metrics ) ) {
788 0 : crds->metrics->purged_evicted_cnt++;
789 0 : }
790 0 : } else {
791 0 : purged = purged_pool_ele_acquire( crds->purged.pool );
792 0 : if( FD_LIKELY( crds->metrics ) ) {
793 0 : crds->metrics->purged_cnt++;
794 0 : }
795 0 : }
796 0 : purged_init( purged, hash, now );
797 0 : purged_treap_ele_insert( crds->purged.treap, purged, crds->purged.pool );
798 0 : purged_dlist_ele_push_tail( crds->purged.purged_dlist, purged, crds->purged.pool );
799 0 : }
800 :
801 : /* overrides_fast
802 : - returns 1 if candidate overrides existing (incumbent) CRDS value
803 : - returns 0 if candidate does not override existing CRDS value
804 : - return -1 if further checks are needed (e.g. hash comparison) */
805 : int
806 : overrides_fast( fd_crds_entry_t const * incumbent,
807 : fd_gossip_view_crds_value_t const * candidate,
808 0 : uchar const * payload ){
809 0 : long existing_wc = incumbent->wallclock_nanos;
810 0 : long candidate_wc = candidate->wallclock_nanos;
811 0 : long existing_ci_onset = incumbent->contact_info.instance_creation_wallclock_nanos;
812 0 : long candidate_ci_onset = candidate->ci_view->contact_info->instance_creation_wallclock_nanos;
813 :
814 0 : switch( candidate->tag ) {
815 0 : case FD_GOSSIP_VALUE_CONTACT_INFO:
816 0 : if( FD_UNLIKELY( candidate_ci_onset>existing_ci_onset ) ) return 1;
817 0 : else if( FD_UNLIKELY( candidate_ci_onset<existing_ci_onset ) ) return 0;
818 0 : else if( FD_UNLIKELY( candidate_wc>existing_wc ) ) return 1;
819 0 : else if( FD_UNLIKELY( candidate_wc<existing_wc ) ) return 0;
820 0 : break;
821 0 : case FD_GOSSIP_VALUE_NODE_INSTANCE:
822 0 : if( FD_LIKELY( candidate->node_instance->token==incumbent->node_instance.token ) ) break;
823 0 : else if( FD_LIKELY( memcmp( payload+candidate->pubkey_off, incumbent->key.pubkey, 32UL ) ) ) break;
824 0 : else if( FD_UNLIKELY( candidate_wc>existing_wc ) ) return 1;
825 0 : else if( FD_UNLIKELY( candidate_wc<existing_wc ) ) return 0;
826 0 : else if( candidate->node_instance->token<incumbent->node_instance.token ) return 0;;
827 0 : break;
828 0 : default:
829 0 : break;
830 0 : }
831 :
832 0 : if( FD_UNLIKELY( candidate_wc>existing_wc ) ) return 1;
833 0 : else if( FD_UNLIKELY( candidate_wc<existing_wc ) ) return 0;
834 0 : return -1;
835 0 : }
836 :
837 :
838 : void
839 : fd_crds_insert_failed_insert( fd_crds_t * crds,
840 : uchar const * hash,
841 0 : long now ) {
842 0 : if( purged_treap_ele_query( crds->purged.treap, *(ulong *)hash, crds->purged.pool ) ) {
843 0 : return;
844 0 : }
845 0 : fd_crds_purged_t * failed;
846 0 : if( FD_UNLIKELY( !purged_pool_free( crds->purged.pool ) ) ) {
847 0 : failed = failed_inserts_dlist_ele_pop_head( crds->purged.failed_inserts_dlist, crds->purged.pool );
848 0 : purged_treap_ele_remove( crds->purged.treap, failed, crds->purged.pool );
849 0 : if( FD_LIKELY( crds->metrics ) ) {
850 0 : crds->metrics->purged_evicted_cnt++;
851 0 : }
852 0 : } else {
853 0 : failed = purged_pool_ele_acquire( crds->purged.pool );
854 0 : if( FD_LIKELY( crds->metrics ) ) {
855 0 : crds->metrics->purged_cnt++;
856 0 : }
857 0 : }
858 0 : purged_init( failed, hash, now );
859 0 : purged_treap_ele_insert( crds->purged.treap, failed, crds->purged.pool );
860 0 : failed_inserts_dlist_ele_push_tail( crds->purged.failed_inserts_dlist, failed, crds->purged.pool );
861 0 : }
862 :
863 : int
864 : fd_crds_checks_fast( fd_crds_t * crds,
865 : fd_gossip_view_crds_value_t const * candidate,
866 : uchar const * payload,
867 0 : uchar from_push_msg ) {
868 0 : fd_crds_key_t candidate_key;
869 0 : generate_key( candidate, payload, &candidate_key );
870 0 : fd_crds_entry_t * incumbent = lookup_map_ele_query( crds->lookup_map, &candidate_key, NULL, crds->pool );
871 :
872 0 : if( FD_UNLIKELY( !incumbent ) ) return FD_CRDS_UPSERT_CHECK_UPSERTS;
873 :
874 0 : if( FD_UNLIKELY( *(ulong *)incumbent->value_bytes==(*(ulong *)(payload+candidate->value_off)) ) ) {
875 : /* We have a duplicate, so we return the number of duplicates */
876 0 : return (int)(++incumbent->num_duplicates);
877 0 : }
878 0 : int overrides = overrides_fast( incumbent, candidate, payload );
879 0 : if( FD_LIKELY( overrides==1 ) ) return FD_CRDS_UPSERT_CHECK_UPSERTS;
880 :
881 0 : uchar cand_hash[ 32UL ];
882 0 : fd_crds_generate_hash( crds->sha256, payload+candidate->value_off, candidate->length, cand_hash );
883 :
884 0 : if( FD_UNLIKELY( overrides==-1 ) ) {
885 : /* Tiebreaker case, we compare hash values */
886 0 : int res = memcmp( cand_hash, incumbent->value_hash, 32UL );
887 0 : if( FD_UNLIKELY( !res ) ) {
888 : /* Hashes match, so we treat this as a duplicate */
889 0 : return (int)(++incumbent->num_duplicates);
890 0 : } else if( res>0 ) {
891 : /* Candidate hash is greater than incumbent hash, so we treat
892 : this as an upsert */
893 0 : return FD_CRDS_UPSERT_CHECK_UPSERTS;
894 0 : }
895 0 : }
896 :
897 0 : from_push_msg ? insert_purged( crds, cand_hash, candidate->wallclock_nanos ) :
898 0 : fd_crds_insert_failed_insert( crds, cand_hash, candidate->wallclock_nanos );
899 :
900 0 : return FD_CRDS_UPSERT_CHECK_FAILS;
901 0 : }
902 :
903 : static inline void
904 : publish_update_msg( fd_crds_t * crds,
905 : fd_crds_entry_t * entry,
906 : fd_gossip_view_crds_value_t const * entry_view,
907 : uchar const * payload,
908 : long now,
909 150 : fd_stem_context_t * stem ) {
910 150 : if( FD_UNLIKELY( !stem ) ) return;
911 0 : if( FD_LIKELY( entry->key.tag!=FD_GOSSIP_VALUE_CONTACT_INFO &&
912 0 : entry->key.tag!=FD_GOSSIP_VALUE_LOWEST_SLOT &&
913 0 : entry->key.tag!=FD_GOSSIP_VALUE_VOTE &&
914 0 : entry->key.tag!=FD_GOSSIP_VALUE_DUPLICATE_SHRED &&
915 0 : entry->key.tag!=FD_GOSSIP_VALUE_INC_SNAPSHOT_HASHES ) ) {
916 0 : return;
917 0 : }
918 :
919 0 : fd_gossip_update_message_t * msg = fd_gossip_out_get_chunk( crds->gossip_update );
920 0 : msg->wallclock_nanos = now;
921 0 : fd_memcpy( msg->origin_pubkey, entry->key.pubkey, 32UL );
922 0 : ulong sz;
923 0 : switch( entry->key.tag ) {
924 0 : case FD_GOSSIP_VALUE_CONTACT_INFO:
925 0 : msg->tag = FD_GOSSIP_UPDATE_TAG_CONTACT_INFO;
926 0 : *msg->contact_info.contact_info = *entry->contact_info.ci->contact_info;
927 0 : msg->contact_info.idx = crds_contact_info_pool_idx( crds->contact_info.pool, entry->contact_info.ci );
928 0 : sz = FD_GOSSIP_UPDATE_SZ_CONTACT_INFO;
929 0 : break;
930 0 : case FD_GOSSIP_VALUE_LOWEST_SLOT:
931 0 : msg->tag = FD_GOSSIP_UPDATE_TAG_LOWEST_SLOT;
932 0 : sz = FD_GOSSIP_UPDATE_SZ_LOWEST_SLOT;
933 0 : msg->lowest_slot = entry_view->lowest_slot;
934 0 : break;
935 0 : case FD_GOSSIP_VALUE_VOTE:
936 0 : msg->tag = FD_GOSSIP_UPDATE_TAG_VOTE;
937 : /* TODO: dynamic sizing */
938 0 : sz = FD_GOSSIP_UPDATE_SZ_VOTE;
939 0 : msg->vote.vote_tower_index = entry->key.vote_index;
940 0 : msg->vote.txn_sz = entry_view->vote->txn_sz;
941 0 : fd_memcpy( msg->vote.txn, payload+entry_view->vote->txn_off, entry_view->vote->txn_sz );
942 0 : break;
943 0 : case FD_GOSSIP_VALUE_DUPLICATE_SHRED:
944 0 : msg->tag = FD_GOSSIP_UPDATE_TAG_DUPLICATE_SHRED;
945 : /* TODO: dynamic sizing */
946 0 : sz = FD_GOSSIP_UPDATE_SZ_DUPLICATE_SHRED;
947 0 : {
948 0 : fd_gossip_view_duplicate_shred_t const * ds = entry_view->duplicate_shred;
949 0 : fd_gossip_duplicate_shred_t * ds_msg = &msg->duplicate_shred;
950 :
951 0 : ds_msg->index = ds->index;
952 0 : ds_msg->slot = ds->slot;
953 0 : ds_msg->num_chunks = ds->num_chunks;
954 0 : ds_msg->chunk_index = ds->chunk_index;
955 0 : ds_msg->wallclock = entry->wallclock_nanos;
956 0 : ds_msg->chunk_len = ds->chunk_len;
957 0 : fd_memcpy( ds_msg->chunk, payload+ds->chunk_off, ds->chunk_len );
958 0 : }
959 0 : break;
960 0 : case FD_GOSSIP_VALUE_INC_SNAPSHOT_HASHES:
961 0 : msg->tag = FD_GOSSIP_UPDATE_TAG_SNAPSHOT_HASHES;
962 : /* TODO: dynamic sizing */
963 0 : sz = FD_GOSSIP_UPDATE_SZ_SNAPSHOT_HASHES;
964 0 : {
965 0 : fd_gossip_view_snapshot_hashes_t const * sh = entry_view->snapshot_hashes;
966 0 : fd_gossip_snapshot_hashes_t * sh_msg = &msg->snapshot_hashes;
967 :
968 0 : sh_msg->incremental_len = sh->inc_len;
969 0 : fd_memcpy( sh_msg->full, payload+sh->full_off, sizeof(fd_gossip_snapshot_hash_pair_t) );
970 0 : fd_memcpy( sh_msg->incremental, payload+sh->inc_off, sh->inc_len*sizeof(fd_gossip_snapshot_hash_pair_t) );
971 0 : }
972 0 : break;
973 0 : default:
974 0 : FD_LOG_ERR(( "impossible" ));
975 0 : }
976 0 : fd_gossip_tx_publish_chunk( crds->gossip_update,
977 0 : stem,
978 0 : (ulong)msg->tag,
979 0 : sz,
980 0 : now );
981 0 : }
982 :
983 : fd_crds_entry_t const *
984 : fd_crds_insert( fd_crds_t * crds,
985 : fd_gossip_view_crds_value_t const * candidate_view,
986 : uchar const * payload,
987 : ulong origin_stake,
988 : uchar is_from_me,
989 : long now ,
990 150 : fd_stem_context_t * stem ) {
991 : /* Update table count metrics at the end to avoid early return
992 : handling */
993 150 : fd_crds_entry_t * candidate = fd_crds_acquire( crds, now, stem );
994 150 : crds_entry_init( candidate_view, crds->sha256, payload, origin_stake, candidate );
995 :
996 150 : crds->metrics->count[ candidate->key.tag ]++;
997 :
998 150 : fd_crds_entry_t * incumbent = lookup_map_ele_query( crds->lookup_map, &candidate->key, NULL, crds->pool );
999 150 : uchar is_replacing = incumbent!=NULL;
1000 150 : if( FD_LIKELY( is_replacing ) ) {
1001 0 : insert_purged( crds, incumbent->value_hash, now );
1002 :
1003 0 : if( FD_UNLIKELY( incumbent->key.tag==FD_GOSSIP_VALUE_CONTACT_INFO ) ) {
1004 0 : if( FD_LIKELY( !!incumbent->contact_info.fresh_dlist.in_list ) ) crds_contact_info_fresh_list_ele_remove( crds->contact_info.fresh_dlist, incumbent, crds->pool );
1005 0 : crds_contact_info_evict_dlist_ele_remove( crds->contact_info.evict_dlist, incumbent, crds->pool );
1006 0 : candidate->contact_info.ci = incumbent->contact_info.ci;
1007 :
1008 : /* is_active is user controlled (specifically by ping_tracker),
1009 : and is used in sampler score calculations. So we inherit the
1010 : incumbent's setting. */
1011 0 : candidate->contact_info.is_active = incumbent->contact_info.is_active;
1012 0 : if( FD_LIKELY( !is_from_me ) ) {
1013 0 : if( FD_UNLIKELY( candidate->stake!=incumbent->stake ) ) {
1014 : /* Perform a rescore here (expensive) */
1015 0 : crds_samplers_upd_peer_at_idx( crds->samplers, candidate, incumbent->contact_info.sampler_idx, now );
1016 0 : } else {
1017 0 : crds_samplers_swap_peer_at_idx( crds->samplers, candidate, incumbent->contact_info.sampler_idx );
1018 0 : }
1019 0 : }
1020 :
1021 0 : if( FD_LIKELY( incumbent->stake ) ) crds->metrics->peer_staked_cnt--;
1022 0 : else crds->metrics->peer_unstaked_cnt--;
1023 0 : crds->metrics->peer_visible_stake -= incumbent->stake;
1024 0 : }
1025 :
1026 0 : if( FD_LIKELY( incumbent->stake ) ) {
1027 0 : staked_expire_dlist_ele_remove( crds->staked_expire_dlist, incumbent, crds->pool );
1028 0 : } else {
1029 0 : unstaked_expire_dlist_ele_remove( crds->unstaked_expire_dlist, incumbent, crds->pool );
1030 0 : }
1031 0 : evict_treap_ele_remove( crds->evict_treap, incumbent, crds->pool );
1032 0 : hash_treap_ele_remove( crds->hash_treap, incumbent, crds->pool );
1033 0 : lookup_map_ele_remove( crds->lookup_map, &incumbent->key, NULL, crds->pool );
1034 0 : fd_crds_release( crds, incumbent );
1035 150 : } else if( candidate->key.tag==FD_GOSSIP_VALUE_CONTACT_INFO ) {
1036 150 : if( FD_UNLIKELY( !crds_contact_info_pool_free( crds->contact_info.pool ) ) ) {
1037 0 : fd_crds_entry_t * evict = crds_contact_info_evict_dlist_ele_peek_head( crds->contact_info.evict_dlist, crds->pool );
1038 0 : remove_contact_info( crds, evict, now, stem );
1039 0 : if( FD_LIKELY( evict->stake ) ) {
1040 0 : staked_expire_dlist_ele_remove( crds->staked_expire_dlist, evict, crds->pool );
1041 0 : } else {
1042 0 : unstaked_expire_dlist_ele_remove( crds->unstaked_expire_dlist, evict, crds->pool );
1043 0 : }
1044 0 : evict_treap_ele_remove( crds->evict_treap, evict, crds->pool );
1045 0 : hash_treap_ele_remove( crds->hash_treap, evict, crds->pool );
1046 0 : lookup_map_ele_remove( crds->lookup_map, &evict->key, NULL, crds->pool );
1047 0 : fd_crds_release( crds, evict );
1048 0 : crds->metrics->peer_evicted_cnt++;
1049 0 : crds->metrics->evicted_cnt++;
1050 0 : }
1051 :
1052 150 : candidate->contact_info.ci = crds_contact_info_pool_ele_acquire( crds->contact_info.pool );
1053 150 : }
1054 :
1055 150 : candidate->num_duplicates = 0UL;
1056 150 : candidate->expire.wallclock_nanos = now;
1057 150 : candidate->value_sz = candidate_view->length;
1058 150 : fd_memcpy( candidate->value_bytes, payload+candidate_view->value_off, candidate_view->length );
1059 :
1060 150 : crds->has_staked_node |= candidate->stake ? 1 : 0;
1061 :
1062 150 : evict_treap_ele_insert( crds->evict_treap, candidate, crds->pool );
1063 150 : if( FD_LIKELY( candidate->stake ) ) {
1064 150 : staked_expire_dlist_ele_push_tail( crds->staked_expire_dlist, candidate, crds->pool );
1065 150 : } else {
1066 0 : unstaked_expire_dlist_ele_push_tail( crds->unstaked_expire_dlist, candidate, crds->pool );
1067 0 : }
1068 150 : hash_treap_ele_insert( crds->hash_treap, candidate, crds->pool );
1069 150 : lookup_map_ele_insert( crds->lookup_map, candidate, crds->pool );
1070 :
1071 150 : if( FD_UNLIKELY( candidate->key.tag==FD_GOSSIP_VALUE_CONTACT_INFO ) ) {
1072 150 : fd_memcpy( candidate->contact_info.ci->contact_info, candidate_view->ci_view->contact_info, sizeof(fd_contact_info_t) );
1073 : /* Default to active, since we filter inactive entries prior to insertion */
1074 150 : candidate->contact_info.is_active = 1;
1075 :
1076 150 : crds_contact_info_evict_dlist_ele_push_tail( crds->contact_info.evict_dlist, candidate, crds->pool );
1077 :
1078 150 : if( FD_LIKELY( !is_from_me ) ){
1079 150 : crds_contact_info_fresh_list_ele_push_tail( crds->contact_info.fresh_dlist, candidate, crds->pool );
1080 150 : candidate->contact_info.fresh_dlist.in_list = 1;
1081 150 : } else {
1082 0 : candidate->contact_info.fresh_dlist.in_list = 0;
1083 0 : }
1084 :
1085 150 : if( FD_UNLIKELY( !is_replacing && !is_from_me ) ) {
1086 150 : crds_samplers_add_peer( crds->samplers, candidate, now);
1087 150 : }
1088 :
1089 150 : if( FD_LIKELY( candidate->stake ) ) crds->metrics->peer_staked_cnt++;
1090 0 : else crds->metrics->peer_unstaked_cnt++;
1091 150 : crds->metrics->peer_visible_stake += candidate->stake;
1092 150 : }
1093 :
1094 150 : publish_update_msg( crds, candidate, candidate_view, payload, now, stem );
1095 150 : return candidate;
1096 150 : }
1097 :
1098 : void
1099 : fd_crds_entry_value( fd_crds_entry_t const * entry,
1100 : uchar const ** value_bytes,
1101 0 : ulong * value_sz ) {
1102 0 : *value_bytes = entry->value_bytes;
1103 0 : *value_sz = entry->value_sz;
1104 0 : }
1105 :
1106 : uchar const *
1107 0 : fd_crds_entry_hash( fd_crds_entry_t const * entry ) {
1108 0 : return entry->value_hash;
1109 0 : }
1110 :
1111 : inline static void
1112 : make_contact_info_key( uchar const * pubkey,
1113 150 : fd_crds_key_t * key_out ) {
1114 150 : key_out->tag = FD_GOSSIP_VALUE_CONTACT_INFO;
1115 150 : fd_memcpy( key_out->pubkey, pubkey, 32UL );
1116 150 : }
1117 :
1118 : int
1119 0 : fd_crds_entry_is_contact_info( fd_crds_entry_t const * entry ) {
1120 0 : return entry->key.tag==FD_GOSSIP_VALUE_CONTACT_INFO;
1121 0 : }
1122 :
1123 : fd_contact_info_t *
1124 3 : fd_crds_entry_contact_info( fd_crds_entry_t const * entry ) {
1125 3 : return entry->contact_info.ci->contact_info;
1126 3 : }
1127 :
1128 :
1129 : fd_contact_info_t const *
1130 : fd_crds_contact_info_lookup( fd_crds_t const * crds,
1131 0 : uchar const * pubkey ) {
1132 :
1133 0 : fd_crds_key_t key[1];
1134 0 : make_contact_info_key( pubkey, key );
1135 0 : fd_crds_entry_t * peer_ci = lookup_map_ele_query( crds->lookup_map, key, NULL, crds->pool );
1136 0 : if( FD_UNLIKELY( !peer_ci ) ) {
1137 0 : return NULL;
1138 0 : }
1139 :
1140 0 : return peer_ci->contact_info.ci->contact_info;
1141 0 : }
1142 :
1143 : ulong
1144 3 : fd_crds_peer_count( fd_crds_t const * crds ){
1145 3 : return crds_contact_info_pool_used( crds->contact_info.pool );
1146 3 : }
1147 :
1148 : static inline void
1149 : set_peer_active_status( fd_crds_t * crds,
1150 : uchar const * peer_pubkey,
1151 : uchar status,
1152 150 : long now ) {
1153 :
1154 150 : fd_crds_key_t key[1];
1155 150 : make_contact_info_key( peer_pubkey, key );
1156 :
1157 150 : fd_crds_entry_t * peer_ci = lookup_map_ele_query( crds->lookup_map, key, NULL, crds->pool );
1158 : /* TODO: error handling? This technically should never hit */
1159 150 : if( FD_UNLIKELY( !peer_ci ) ) return;
1160 150 : uchar old_status = peer_ci->contact_info.is_active;
1161 150 : peer_ci->contact_info.is_active = status;
1162 :
1163 150 : if( FD_UNLIKELY( old_status!=status ) ) {
1164 : /* Trigger sampler update */
1165 0 : crds_samplers_upd_peer_at_idx( crds->samplers,
1166 0 : peer_ci,
1167 0 : peer_ci->contact_info.sampler_idx,
1168 0 : now );
1169 0 : }
1170 150 : }
1171 : void
1172 : fd_crds_peer_active( fd_crds_t * crds,
1173 : uchar const * peer_pubkey,
1174 150 : long now ) {
1175 150 : set_peer_active_status( crds, peer_pubkey, 1 /* active */, now );
1176 150 : }
1177 :
1178 : void
1179 : fd_crds_peer_inactive( fd_crds_t * crds,
1180 : uchar const * peer_pubkey,
1181 0 : long now ) {
1182 0 : set_peer_active_status( crds, peer_pubkey, 0 /* inactive */, now );
1183 0 : }
1184 :
1185 : fd_contact_info_t const *
1186 : fd_crds_peer_sample( fd_crds_t const * crds,
1187 0 : fd_rng_t * rng ) {
1188 0 : ulong idx = wpeer_sampler_sample( crds->samplers->pr_sampler,
1189 0 : rng,
1190 0 : crds->samplers->ele_cnt );
1191 0 : if( FD_UNLIKELY( idx==SAMPLE_IDX_SENTINEL ) ) return NULL;
1192 0 : return fd_crds_entry_contact_info( crds->samplers->ele[idx] );
1193 0 : }
1194 :
1195 : fd_contact_info_t const *
1196 : fd_crds_bucket_sample_and_remove( fd_crds_t * crds,
1197 : fd_rng_t * rng,
1198 3 : ulong bucket ) {
1199 3 : ulong idx = wpeer_sampler_sample( &crds->samplers->bucket_samplers[bucket],
1200 3 : rng,
1201 3 : crds->samplers->ele_cnt );
1202 3 : if( FD_UNLIKELY( idx==SAMPLE_IDX_SENTINEL ) ) return NULL;
1203 : /* Disable peer to prevent future sampling until added back with
1204 : fd_crds_bucket_add */
1205 3 : wpeer_sampler_disable( &crds->samplers->bucket_samplers[bucket],
1206 3 : idx,
1207 3 : crds->samplers->ele_cnt );
1208 :
1209 3 : return fd_crds_entry_contact_info( crds->samplers->ele[idx] );
1210 3 : }
1211 :
1212 : void
1213 : fd_crds_bucket_add( fd_crds_t * crds,
1214 : ulong bucket,
1215 0 : uchar const * pubkey ) {
1216 0 : fd_crds_key_t key[1];
1217 0 : make_contact_info_key( pubkey, key );
1218 0 : fd_crds_entry_t * peer_ci = lookup_map_ele_query( crds->lookup_map, key, NULL, crds->pool );
1219 0 : if( FD_UNLIKELY( !peer_ci ) ) {
1220 0 : FD_LOG_NOTICE(( "Sample peer not found in CRDS. Likely dropped." ));
1221 0 : return;
1222 0 : }
1223 0 : wpeer_sampler_t * bucket_sampler = &crds->samplers->bucket_samplers[bucket];
1224 0 : wpeer_sampler_enable( bucket_sampler,
1225 0 : peer_ci->contact_info.sampler_idx,
1226 0 : crds->samplers->ele_cnt );
1227 :
1228 0 : ulong score = wpeer_sampler_bucket_score( peer_ci, bucket );
1229 0 : wpeer_sampler_upd( bucket_sampler,
1230 0 : score,
1231 0 : peer_ci->contact_info.sampler_idx,
1232 0 : crds->samplers->ele_cnt );
1233 0 : }
1234 :
1235 : struct fd_crds_mask_iter_private {
1236 : ulong idx;
1237 : ulong end_hash;
1238 : };
1239 :
1240 : fd_crds_mask_iter_t *
1241 : fd_crds_mask_iter_init( fd_crds_t const * crds,
1242 : ulong mask,
1243 : uint mask_bits,
1244 0 : uchar iter_mem[ static 16UL ] ) {
1245 0 : fd_crds_mask_iter_t * it = (fd_crds_mask_iter_t *)iter_mem;
1246 0 : ulong start_hash = 0 ;
1247 0 : if( FD_LIKELY( mask_bits > 0) ) start_hash = (mask&(~0UL<<(64UL-mask_bits)));
1248 :
1249 0 : it->end_hash = mask;
1250 :
1251 0 : it->idx = hash_treap_idx_ge( crds->hash_treap, start_hash, crds->pool );
1252 0 : return it;
1253 0 : }
1254 :
1255 : fd_crds_mask_iter_t *
1256 0 : fd_crds_mask_iter_next( fd_crds_mask_iter_t * it, fd_crds_t const * crds ) {
1257 0 : fd_crds_entry_t const * val = hash_treap_ele_fast_const( it->idx, crds->pool );
1258 0 : it->idx = val->hash.next;
1259 0 : return it;
1260 0 : }
1261 :
1262 : int
1263 0 : fd_crds_mask_iter_done( fd_crds_mask_iter_t * it, fd_crds_t const * crds ) {
1264 0 : fd_crds_entry_t const * val = hash_treap_ele_fast_const( it->idx, crds->pool );
1265 0 : return hash_treap_idx_is_null( it->idx ) ||
1266 0 : (it->end_hash < val->hash.hash);
1267 0 : }
1268 :
1269 : fd_crds_entry_t const *
1270 0 : fd_crds_mask_iter_entry( fd_crds_mask_iter_t * it, fd_crds_t const * crds ){
1271 0 : return hash_treap_ele_fast_const( it->idx, crds->pool );
1272 0 : }
1273 :
1274 : fd_crds_mask_iter_t *
1275 : fd_crds_purged_mask_iter_init( fd_crds_t const * crds,
1276 : ulong mask,
1277 : uint mask_bits,
1278 0 : uchar iter_mem[ static 16UL ] ){
1279 0 : fd_crds_mask_iter_t * it = (fd_crds_mask_iter_t *)iter_mem;
1280 0 : ulong start_hash = 0;
1281 0 : if( FD_LIKELY( mask_bits > 0 ) ) start_hash = (mask&(~0UL<<(64UL-mask_bits)));
1282 0 : it->end_hash = mask;
1283 :
1284 0 : it->idx = purged_treap_idx_ge( crds->purged.treap, start_hash, crds->purged.pool );
1285 0 : return it;
1286 0 : }
1287 :
1288 : fd_crds_mask_iter_t *
1289 : fd_crds_purged_mask_iter_next( fd_crds_mask_iter_t * it,
1290 0 : fd_crds_t const * crds ){
1291 0 : fd_crds_purged_t const * val = purged_treap_ele_fast_const( it->idx, crds->purged.pool );
1292 0 : it->idx = val->treap.next;
1293 0 : return it;
1294 0 : }
1295 :
1296 : int
1297 : fd_crds_purged_mask_iter_done( fd_crds_mask_iter_t * it,
1298 0 : fd_crds_t const * crds ){
1299 0 : fd_crds_purged_t const * val = purged_treap_ele_fast_const( it->idx, crds->purged.pool );
1300 0 : return purged_treap_idx_is_null( it->idx ) ||
1301 0 : (it->end_hash < fd_ulong_load_8( val->hash ));
1302 0 : }
1303 :
1304 : /* fd_crds_purged_mask_iter_hash returns the hash of the current
1305 : entry in the purged mask iterator. */
1306 : uchar const *
1307 : fd_crds_purged_mask_iter_hash( fd_crds_mask_iter_t * it,
1308 0 : fd_crds_t const * crds ){
1309 0 : fd_crds_purged_t const * val = purged_treap_ele_fast_const( it->idx, crds->purged.pool );
1310 0 : return val->hash;
1311 0 : }
|