Line data Source code
1 : /* Owned by CRDS table, tightly coupled with contact info side table.
2 :
3 : The gossip table (currently) needs 25 (active set rotation)
4 : + 1 (tx pull req) stake-weighted peer samplers.
5 :
6 : Each sampler needs a different weight scoring implementation. This
7 : compile unit:
8 : - defines weight scoring implementations for each sampler.
9 : - defines functions to insert, remove, and update all samplers in
10 : one go
11 :
12 : The sample sets are designed to closely track modifications to the
13 : CRDS contact info table. */
14 :
15 : #include "fd_crds.h"
16 : #include "../fd_active_set_private.h"
17 :
18 150 : #define SAMPLE_IDX_SENTINEL ULONG_MAX
19 :
20 : #define SET_NAME peer_enabled
21 78 : #define SET_MAX CRDS_MAX_CONTACT_INFO
22 : #include "../../../util/tmpl/fd_set.c"
23 :
24 : /* This is a very rudimentary implementation of a weighted sampler. We will
25 : eventually want to use a modified fd_wsample. Using this until then. */
26 : struct wpeer_sampler {
27 : /* Cumulative weight for each peer.
28 : Individual peer weight can be derived with cumul_weight[i]-cumul_weight[i-1] */
29 : ulong cumul_weight[ CRDS_MAX_CONTACT_INFO ];
30 :
31 : /* peer_enabled_test( peer_enabled, i ) determines if peer i is enabled. A
32 : disabled peer should not be scored (e.g., during an update) and should
33 : have a peer weight of 0 (i.e,. cumul_weight[i] - cumul_weight[i-1]==0).
34 :
35 : Why not just remove the peer?
36 : Adding/removing should strictly track the CRDS Contact Info
37 : table. That is to say a peer must have an entry in the sampler
38 : if it has an entry in the table, and vice versa. This simplifies
39 : the crds_samplers management logic which ties the peer to the
40 : same index across all samplers. A single sampler can "disable" a
41 : peer without affecting the other samplers.
42 :
43 : Why not just set the peer's weight to 0?
44 : 0 is a legitimate score for a peer (wpeer_sampler_peer_score).
45 : A futre upsert into the CRDS table might trigger a re-score, which
46 : might inadvertently re-enable the peer in the sampler. We want to
47 : distinguish peers that are "removed" from the sampler from peers
48 : that have a score of 0 based on the parameters provided. */
49 : peer_enabled_t peer_enabled[ peer_enabled_word_cnt ];
50 : };
51 :
52 : typedef struct wpeer_sampler wpeer_sampler_t;
53 :
54 :
55 : struct crds_samplers {
56 : wpeer_sampler_t pr_sampler[1];
57 : wpeer_sampler_t bucket_samplers[25];
58 : fd_crds_entry_t * ele[ CRDS_MAX_CONTACT_INFO ];
59 : ulong ele_cnt;
60 : };
61 :
62 : typedef struct crds_samplers crds_samplers_t;
63 :
64 : #define PREV_PEER_WEIGHT( ps, idx ) \
65 3903 : ( (idx) ? (ps)->cumul_weight[(idx)-1] : 0UL )
66 : int
67 78 : wpeer_sampler_init( wpeer_sampler_t * ps ) {
68 78 : if( FD_UNLIKELY( !ps ) ) return -1;
69 2555982 : for( ulong i = 0UL; i < CRDS_MAX_CONTACT_INFO; i++ ) {
70 2555904 : ps->cumul_weight[i] = 0UL;
71 2555904 : }
72 : /* All peers are "enabled" by default as they get added. */
73 78 : peer_enabled_full( ps->peer_enabled );
74 78 : return 0;
75 78 : }
76 :
77 : ulong
78 : wpeer_sampler_sample( wpeer_sampler_t const * ps,
79 : fd_rng_t * rng,
80 3 : ulong ele_cnt ) {
81 3 : if( FD_UNLIKELY( !ele_cnt || !ps->cumul_weight[ele_cnt-1] ) ) {
82 0 : return SAMPLE_IDX_SENTINEL;
83 0 : }
84 :
85 3 : ulong sample = fd_rng_ulong_roll( rng, ps->cumul_weight[ele_cnt-1] );
86 : /* avoid sampling 0 */
87 3 : sample = fd_ulong_min( sample+1UL, ps->cumul_weight[ele_cnt-1] );
88 :
89 : /* Binary search for the smallest cumulative weight >= sample */
90 3 : ulong left = 0UL;
91 3 : ulong right = ele_cnt;
92 18 : while( left < right ) {
93 15 : ulong mid = left + (right - left) / 2UL;
94 15 : if( ps->cumul_weight[mid]<sample ) {
95 12 : left = mid + 1UL;
96 12 : } else {
97 3 : right = mid;
98 3 : }
99 15 : }
100 3 : return left;
101 3 : }
102 :
103 : int
104 : wpeer_sampler_upd( wpeer_sampler_t * ps,
105 : ulong weight,
106 : ulong idx,
107 3903 : ulong ele_cnt ) {
108 3903 : if( FD_UNLIKELY( !ps ) ) return -1;
109 :
110 : /* Disabled peers should not be scored */
111 3903 : weight *= (ulong)peer_enabled_test( ps->peer_enabled, idx );
112 :
113 3903 : ulong old_weight = ps->cumul_weight[idx] - PREV_PEER_WEIGHT( ps, idx );
114 3903 : if( FD_UNLIKELY( old_weight==weight ) ) return 0;
115 :
116 3903 : if( weight>old_weight ) {
117 156 : for( ulong i=idx; i<ele_cnt; i++ ) {
118 78 : ps->cumul_weight[i] += (weight - old_weight);
119 78 : }
120 3825 : } else {
121 7722 : for( ulong i=idx; i<ele_cnt; i++ ) {
122 3897 : ps->cumul_weight[i] -= (old_weight - weight);
123 3897 : }
124 3825 : }
125 3903 : return 0;
126 3903 : }
127 :
128 : int
129 : wpeer_sampler_disable( wpeer_sampler_t * ps,
130 : ulong idx,
131 3 : ulong ele_cnt ) {
132 3 : if( FD_UNLIKELY( !ps || idx>=ele_cnt ) ) return -1;
133 :
134 : /* Set the peer weight to zero */
135 3 : if( FD_UNLIKELY( wpeer_sampler_upd( ps, 0UL, idx, ele_cnt )<0 ) ) return -1;
136 :
137 : /* Disable the peer in the enabled set */
138 3 : peer_enabled_remove( ps->peer_enabled, idx );
139 3 : return 0;
140 3 : }
141 :
142 : int
143 : wpeer_sampler_enable( wpeer_sampler_t * ps,
144 : ulong idx,
145 0 : ulong ele_cnt ) {
146 0 : if( FD_UNLIKELY( !ps || idx>=ele_cnt ) ) return -1;
147 0 : peer_enabled_insert( ps->peer_enabled, idx );
148 0 : return 0;
149 0 : }
150 :
151 : /* NOTE: this should only be called if the peer is dropped from the
152 : Contact Info table. Use wpeer_sampler_disable otherwise */
153 : int
154 : wpeer_sampler_rem( wpeer_sampler_t * ps,
155 : ulong idx,
156 0 : ulong ele_cnt ) {
157 0 : ulong score = ps->cumul_weight[idx] - PREV_PEER_WEIGHT( ps, idx );
158 :
159 0 : for( ulong i = idx+1; i < ele_cnt; i++ ) {
160 0 : ps->cumul_weight[i] -= score;
161 0 : ps->cumul_weight[i-1] = ps->cumul_weight[i];
162 :
163 0 : peer_enabled_insert_if( ps->peer_enabled, peer_enabled_test( ps->peer_enabled, i ), i-1UL );
164 0 : peer_enabled_remove_if( ps->peer_enabled, !peer_enabled_test( ps->peer_enabled, i ), i-1UL );
165 0 : }
166 :
167 0 : peer_enabled_insert( ps->peer_enabled, ele_cnt-1UL );
168 0 : return 0;
169 0 : }
170 :
171 150 : #define BASE_WEIGHT 100UL /* TODO: figure this out!! */
172 : ulong
173 : wpeer_sampler_peer_score( fd_crds_entry_t * peer,
174 150 : long now ) {
175 150 : if( FD_UNLIKELY( !peer->contact_info.is_active ) ) return 0;
176 150 : ulong score = BASE_WEIGHT;
177 150 : score += peer->stake;
178 150 : if( FD_UNLIKELY( peer->wallclock_nanos<now-60*1000L*1000L*1000L ) ) score/=100;
179 :
180 150 : return score;
181 150 : }
182 :
183 : ulong
184 : wpeer_sampler_bucket_score( fd_crds_entry_t * peer,
185 3750 : ulong bucket ) {
186 3750 : ulong peer_bucket = fd_active_set_stake_bucket( peer->stake );
187 3750 : ulong score = fd_ulong_sat_add( fd_ulong_min( bucket, peer_bucket ), 1UL );
188 :
189 3750 : return score*score;
190 3750 : }
191 :
192 :
193 :
194 : void
195 3 : crds_samplers_new( crds_samplers_t * ps ) {
196 3 : if( FD_UNLIKELY( !ps ) ) return;
197 :
198 3 : wpeer_sampler_init( ps->pr_sampler );
199 78 : for( ulong i=0UL; i<25UL; i++ ) {
200 75 : wpeer_sampler_init( &ps->bucket_samplers[i] );
201 75 : }
202 3 : ps->ele_cnt = 0UL;
203 98307 : for( ulong i=0UL; i<CRDS_MAX_CONTACT_INFO; i++ ) {
204 98304 : ps->ele[i] = NULL;
205 98304 : }
206 3 : }
207 :
208 : int
209 : crds_samplers_upd_peer_at_idx( crds_samplers_t * ps,
210 : fd_crds_entry_t * peer,
211 : ulong idx,
212 150 : long now ) {
213 150 : if( FD_UNLIKELY( idx>=ps->ele_cnt ) ) {
214 0 : FD_LOG_WARNING(( "Bad peer idx supplied in sample update" ));
215 0 : return -1;
216 0 : }
217 150 : ps->ele[idx] = peer;
218 150 : peer->contact_info.sampler_idx = idx;
219 150 : ulong peer_score = wpeer_sampler_peer_score( peer, now );
220 150 : if( FD_UNLIKELY( wpeer_sampler_upd( ps->pr_sampler, peer_score, idx, ps->ele_cnt )<0 ) ) return -1;
221 :
222 3900 : for( ulong i=0UL; i<25UL; i++ ) {
223 3750 : ulong bucket_score = wpeer_sampler_bucket_score( peer, i );
224 3750 : if( FD_UNLIKELY( !bucket_score ) ) FD_LOG_ERR(( "0-weighted peer in bucket, should not be possible" ));
225 3750 : if( FD_UNLIKELY( wpeer_sampler_upd( &ps->bucket_samplers[i], bucket_score, idx, ps->ele_cnt )<0 ) ) return -1;
226 3750 : }
227 :
228 150 : return 0;
229 150 : }
230 :
231 : int
232 : crds_samplers_swap_peer_at_idx( crds_samplers_t * ps,
233 : fd_crds_entry_t * new_peer,
234 0 : ulong idx ) {
235 0 : if( FD_UNLIKELY( idx>=ps->ele_cnt ) ) {
236 0 : FD_LOG_WARNING(( "Bad peer idx supplied in sample update" ));
237 0 : return -1;
238 0 : }
239 0 : fd_crds_entry_t * old_peer = ps->ele[idx];
240 0 : if( FD_UNLIKELY( !old_peer ) ) {
241 0 : FD_LOG_ERR(( "No peer at index %lu in samplers" , idx ));
242 0 : }
243 :
244 0 : ps->ele[idx] = new_peer;
245 0 : new_peer->contact_info.sampler_idx = idx;
246 0 : old_peer->contact_info.sampler_idx = SAMPLE_IDX_SENTINEL;
247 0 : return 0;
248 0 : }
249 :
250 : int
251 : crds_samplers_add_peer( crds_samplers_t * ps,
252 : fd_crds_entry_t * peer,
253 150 : long now ) {
254 150 : ulong idx = fd_ulong_min( ps->ele_cnt, (CRDS_MAX_CONTACT_INFO)-1UL );
255 150 : ps->ele_cnt++;
256 150 : if( FD_UNLIKELY( !!crds_samplers_upd_peer_at_idx( ps, peer, idx, now ) ) ){
257 0 : FD_LOG_WARNING(( "Failed to update peer in samplers" ));
258 0 : ps->ele_cnt--;
259 0 : ps->ele[idx] = NULL;
260 0 : return -1;
261 0 : }
262 150 : return 0;
263 150 : }
264 :
265 : int
266 : crds_samplers_rem_peer( crds_samplers_t * ps,
267 0 : fd_crds_entry_t * peer ) {
268 0 : ulong idx = peer->contact_info.sampler_idx;
269 0 : if( FD_UNLIKELY( idx>=ps->ele_cnt ) ) return -1;
270 0 : if( FD_UNLIKELY( wpeer_sampler_rem( ps->pr_sampler, idx, ps->ele_cnt )<0 ) ) return -1;
271 0 : for( ulong i=0UL; i<25UL; i++ ) {
272 0 : if( FD_UNLIKELY( wpeer_sampler_rem( &ps->bucket_samplers[i], idx, ps->ele_cnt )<0 ) ) return -1;
273 0 : }
274 :
275 : // Shift the elements down in elems array
276 0 : for( ulong i = idx+1; i < ps->ele_cnt; i++ ) {
277 0 : ps->ele[i-1] = ps->ele[i];
278 0 : ps->ele[i-1]->contact_info.sampler_idx = i-1;
279 0 : }
280 0 : ps->ele_cnt--;
281 0 : return 0;
282 0 : }
|