Line data Source code
1 : #include "fd_txncache.h"
2 : #include "fd_txncache_private.h"
3 : #include "../../util/log/fd_log.h"
4 :
5 : struct blockcache {
6 : fd_txncache_blockcache_shmem_t * shmem;
7 :
8 : uint * heads; /* The hash table for the blockhash. Each entry is a pointer to the head of a linked list of
9 : transactions that reference this blockhash. As we add transactions to the bucket, the head
10 : pointer is updated to the new item, and the new item is pointed to the previous head. */
11 : ushort * pages; /* A list of the txnpages containing the transactions for this blockcache. */
12 :
13 : descends_set_t * descends; /* Each fork can descend from other forks in the txncache, and this bit vector contains one
14 : value for each fork in the txncache. If this fork descends from some other fork F, then
15 : the bit at index F in descends[] is set. */
16 : };
17 :
18 : typedef struct blockcache blockcache_t;
19 :
20 : struct fd_txncache_private {
21 : fd_txncache_shmem_t * shmem;
22 :
23 : fd_txncache_blockcache_shmem_t * blockcache_shmem_pool;
24 : blockcache_t * blockcache_pool;
25 : blockhash_map_t * blockhash_map;
26 :
27 : ushort * txnpages_free; /* The index in the txnpages array that is free, for each of the free pages. */
28 :
29 : fd_txncache_txnpage_t * txnpages; /* The actual storage for the transactions. The blockcache points to these
30 : pages when storing transactions. Transaction are grouped into pages of
31 : size 16384 to make certain allocation and deallocation operations faster
32 : (just the pages are acquired/released, rather than each txn). */
33 :
34 : ushort * scratch_pages;
35 : uint * scratch_heads;
36 : fd_txncache_txnpage_t * scratch_txnpage;
37 : };
38 :
39 : FD_FN_CONST ulong
40 0 : fd_txncache_align( void ) {
41 0 : return FD_TXNCACHE_ALIGN;
42 0 : }
43 :
44 : FD_FN_CONST ulong
45 0 : fd_txncache_footprint( ulong max_live_slots ) {
46 0 : ulong max_active_slots = FD_TXNCACHE_MAX_BLOCKHASH_DISTANCE+max_live_slots;
47 :
48 0 : ulong l;
49 0 : l = FD_LAYOUT_INIT;
50 0 : l = FD_LAYOUT_APPEND( l, FD_TXNCACHE_SHMEM_ALIGN, sizeof(fd_txncache_t) );
51 0 : l = FD_LAYOUT_APPEND( l, alignof(blockcache_t), max_active_slots*sizeof(blockcache_t) );
52 0 : return FD_LAYOUT_FINI( l, FD_TXNCACHE_ALIGN );
53 0 : }
54 :
55 : void *
56 : fd_txncache_new( void * ljoin,
57 0 : fd_txncache_shmem_t * shmem ) {
58 0 : if( FD_UNLIKELY( !ljoin ) ) {
59 0 : FD_LOG_WARNING(( "NULL ljoin" ));
60 0 : return NULL;
61 0 : }
62 :
63 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)ljoin, fd_txncache_align() ) ) ) {
64 0 : FD_LOG_WARNING(( "misaligned ljoin" ));
65 0 : return NULL;
66 0 : }
67 :
68 0 : ulong max_active_slots = shmem->active_slots_max;
69 0 : ulong blockhash_map_chains = fd_ulong_pow2_up( 2UL*shmem->active_slots_max );
70 :
71 0 : ushort _max_txnpages = fd_txncache_max_txnpages( max_active_slots, shmem->txn_per_slot_max );
72 0 : ushort _max_txnpages_per_blockhash = fd_txncache_max_txnpages_per_blockhash( max_active_slots, shmem->txn_per_slot_max );
73 :
74 0 : ulong _descends_footprint = descends_set_footprint( max_active_slots );
75 0 : if( FD_UNLIKELY( !_descends_footprint ) ) {
76 0 : FD_LOG_WARNING(( "invalid max_active_slots" ));
77 0 : return NULL;
78 0 : }
79 :
80 0 : FD_SCRATCH_ALLOC_INIT( l, shmem );
81 0 : fd_txncache_shmem_t * tc = FD_SCRATCH_ALLOC_APPEND( l, FD_TXNCACHE_SHMEM_ALIGN, sizeof(fd_txncache_shmem_t) );
82 0 : void * _blockhash_map = FD_SCRATCH_ALLOC_APPEND( l, blockhash_map_align(), blockhash_map_footprint( blockhash_map_chains ) );
83 0 : void * _blockcache_pool = FD_SCRATCH_ALLOC_APPEND( l, blockcache_pool_align(), blockcache_pool_footprint( max_active_slots ) );
84 0 : void * _blockcache_pages = FD_SCRATCH_ALLOC_APPEND( l, alignof(ushort), max_active_slots*_max_txnpages_per_blockhash*sizeof(ushort) );
85 0 : void * _blockcache_heads = FD_SCRATCH_ALLOC_APPEND( l, alignof(uint), max_active_slots*shmem->txn_per_slot_max*sizeof(uint) );
86 0 : void * _blockcache_descends = FD_SCRATCH_ALLOC_APPEND( l, descends_set_align(), max_active_slots*_descends_footprint );
87 0 : void * _txnpages_free = FD_SCRATCH_ALLOC_APPEND( l, alignof(ushort), _max_txnpages*sizeof(ushort) );
88 0 : void * _txnpages = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_txncache_txnpage_t), _max_txnpages*sizeof(fd_txncache_txnpage_t) );
89 0 : void * _scratch_pages = FD_SCRATCH_ALLOC_APPEND( l, alignof(ushort), _max_txnpages_per_blockhash*sizeof(ushort) );
90 0 : void * _scratch_heads = FD_SCRATCH_ALLOC_APPEND( l, alignof(uint), shmem->txn_per_slot_max*sizeof(uint) );
91 0 : void * _scratch_txnpage = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_txncache_txnpage_t), sizeof(fd_txncache_txnpage_t) );
92 :
93 0 : FD_SCRATCH_ALLOC_INIT( l2, ljoin );
94 0 : fd_txncache_t * ltc = FD_SCRATCH_ALLOC_APPEND( l2, FD_TXNCACHE_ALIGN, sizeof(fd_txncache_t) );
95 0 : void * _local_blockcache_pool = FD_SCRATCH_ALLOC_APPEND( l2, alignof(blockcache_t), max_active_slots*sizeof(blockcache_t) );
96 :
97 0 : ltc->shmem = tc;
98 :
99 0 : ltc->blockcache_pool = (blockcache_t*)_local_blockcache_pool;
100 0 : ltc->blockcache_shmem_pool = blockcache_pool_join( _blockcache_pool );
101 :
102 0 : for( ulong i=0UL; i<shmem->active_slots_max; i++ ) {
103 0 : ltc->blockcache_pool[ i ].pages = (ushort *)_blockcache_pages + i*_max_txnpages_per_blockhash;
104 0 : ltc->blockcache_pool[ i ].heads = (uint *)_blockcache_heads + i*shmem->txn_per_slot_max;
105 0 : ltc->blockcache_pool[ i ].descends = descends_set_join( (uchar *)_blockcache_descends + i*_descends_footprint );
106 0 : ltc->blockcache_pool[ i ].shmem = ltc->blockcache_shmem_pool + i;
107 0 : FD_TEST( ltc->blockcache_pool[ i ].shmem );
108 0 : }
109 :
110 0 : FD_TEST( ltc->blockcache_shmem_pool );
111 :
112 0 : ltc->blockhash_map = blockhash_map_join( _blockhash_map );
113 0 : FD_TEST( ltc->blockhash_map );
114 :
115 0 : ltc->txnpages_free = (ushort *)_txnpages_free;
116 0 : ltc->txnpages = (fd_txncache_txnpage_t *)_txnpages;
117 :
118 0 : ltc->scratch_pages = _scratch_pages;
119 0 : ltc->scratch_heads = _scratch_heads;
120 0 : ltc->scratch_txnpage = _scratch_txnpage;
121 :
122 0 : return (void *)ltc;
123 0 : }
124 :
125 : fd_txncache_t *
126 0 : fd_txncache_join( void * ljoin ) {
127 0 : if( FD_UNLIKELY( !ljoin ) ) {
128 0 : FD_LOG_WARNING(( "NULL ljoin" ));
129 0 : return NULL;
130 0 : }
131 :
132 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)ljoin, fd_txncache_align() ) ) ) {
133 0 : FD_LOG_WARNING(( "misaligned ljoin" ));
134 0 : return NULL;
135 0 : }
136 :
137 0 : fd_txncache_t * tc = (fd_txncache_t *)ljoin;
138 :
139 0 : return tc;
140 0 : }
141 :
142 : void
143 0 : fd_txncache_reset( fd_txncache_t * tc ) {
144 0 : fd_rwlock_write( tc->shmem->lock );
145 :
146 0 : tc->shmem->root_cnt = 0UL;
147 0 : root_slist_remove_all( tc->shmem->root_ll, tc->blockcache_shmem_pool );
148 :
149 0 : tc->shmem->txnpages_free_cnt = tc->shmem->max_txnpages;
150 0 : for( ushort i=0; i<tc->shmem->max_txnpages; i++ ) tc->txnpages_free[ i ] = i;
151 :
152 0 : blockcache_pool_reset( tc->blockcache_shmem_pool );
153 0 : blockhash_map_reset( tc->blockhash_map );
154 :
155 0 : fd_rwlock_unwrite( tc->shmem->lock );
156 0 : }
157 :
158 : static fd_txncache_txnpage_t *
159 : fd_txncache_ensure_txnpage( fd_txncache_t * tc,
160 0 : blockcache_t * blockcache ) {
161 0 : ushort page_cnt = blockcache->shmem->pages_cnt;
162 0 : if( FD_UNLIKELY( page_cnt>tc->shmem->txnpages_per_blockhash_max ) ) return NULL;
163 :
164 0 : if( FD_LIKELY( page_cnt ) ) {
165 0 : ushort txnpage_idx = blockcache->pages[ page_cnt-1 ];
166 0 : ushort txnpage_free = tc->txnpages[ txnpage_idx ].free;
167 0 : if( FD_LIKELY( txnpage_free ) ) return &tc->txnpages[ txnpage_idx ];
168 0 : }
169 :
170 0 : if( FD_UNLIKELY( page_cnt==tc->shmem->txnpages_per_blockhash_max ) ) return NULL;
171 0 : if( FD_LIKELY( FD_ATOMIC_CAS( &blockcache->pages[ page_cnt ], (ushort)USHORT_MAX, (ushort)(USHORT_MAX-1UL) )==(ushort)USHORT_MAX ) ) {
172 0 : ulong txnpages_free_cnt = tc->shmem->txnpages_free_cnt;
173 0 : for(;;) {
174 0 : if( FD_UNLIKELY( !txnpages_free_cnt ) ) return NULL;
175 0 : ulong old_txnpages_free_cnt = FD_ATOMIC_CAS( &tc->shmem->txnpages_free_cnt, (ushort)txnpages_free_cnt, (ushort)(txnpages_free_cnt-1UL) );
176 0 : if( FD_LIKELY( old_txnpages_free_cnt==txnpages_free_cnt ) ) break;
177 0 : txnpages_free_cnt = old_txnpages_free_cnt;
178 0 : FD_SPIN_PAUSE();
179 0 : }
180 :
181 0 : ushort txnpage_idx = tc->txnpages_free[ txnpages_free_cnt-1UL ];
182 0 : fd_txncache_txnpage_t * txnpage = &tc->txnpages[ txnpage_idx ];
183 0 : txnpage->free = FD_TXNCACHE_TXNS_PER_PAGE;
184 0 : FD_COMPILER_MFENCE();
185 0 : blockcache->pages[ page_cnt ] = txnpage_idx;
186 0 : FD_COMPILER_MFENCE();
187 0 : blockcache->shmem->pages_cnt = (ushort)(page_cnt+1);
188 0 : return txnpage;
189 0 : } else {
190 0 : ushort txnpage_idx = blockcache->pages[ page_cnt ];
191 0 : while( FD_UNLIKELY( txnpage_idx>=USHORT_MAX-1UL ) ) {
192 0 : txnpage_idx = blockcache->pages[ page_cnt ];
193 0 : FD_SPIN_PAUSE();
194 0 : }
195 0 : return &tc->txnpages[ txnpage_idx ];
196 0 : }
197 0 : }
198 :
199 : static int
200 : fd_txncache_insert_txn( fd_txncache_t * tc,
201 : blockcache_t * blockcache,
202 : fd_txncache_txnpage_t * txnpage,
203 : fd_txncache_fork_id_t fork_id,
204 0 : uchar const * txnhash ) {
205 0 : ulong txnpage_idx = (ulong)(txnpage - tc->txnpages);
206 :
207 0 : for(;;) {
208 0 : ushort txnpage_free = txnpage->free;
209 0 : if( FD_UNLIKELY( !txnpage_free ) ) return 0;
210 0 : if( FD_UNLIKELY( FD_ATOMIC_CAS( &txnpage->free, txnpage_free, txnpage_free-1UL )!=txnpage_free ) ) {
211 0 : FD_SPIN_PAUSE();
212 0 : continue;
213 0 : }
214 :
215 0 : ulong txn_idx = FD_TXNCACHE_TXNS_PER_PAGE-txnpage_free;
216 0 : ulong txnhash_offset = blockcache->shmem->txnhash_offset;
217 0 : memcpy( txnpage->txns[ txn_idx ]->txnhash, txnhash+txnhash_offset, 20UL );
218 0 : txnpage->txns[ txn_idx ]->fork_id = fork_id;
219 0 : txnpage->txns[ txn_idx ]->generation = tc->blockcache_pool[ fork_id.val ].shmem->generation;
220 0 : FD_COMPILER_MFENCE();
221 :
222 0 : ulong txn_bucket = FD_LOAD( ulong, txnhash+txnhash_offset )%tc->shmem->txn_per_slot_max;
223 0 : for(;;) {
224 0 : uint head = blockcache->heads[ txn_bucket ];
225 0 : txnpage->txns[ txn_idx ]->blockcache_next = head;
226 0 : FD_COMPILER_MFENCE();
227 0 : if( FD_LIKELY( FD_ATOMIC_CAS( &blockcache->heads[ txn_bucket ], head, (uint)(FD_TXNCACHE_TXNS_PER_PAGE*txnpage_idx+txn_idx) )==head ) ) break;
228 0 : FD_SPIN_PAUSE();
229 0 : }
230 :
231 0 : return 1;
232 0 : }
233 0 : }
234 :
235 : fd_txncache_fork_id_t
236 : fd_txncache_attach_child( fd_txncache_t * tc,
237 0 : fd_txncache_fork_id_t parent_fork_id ) {
238 0 : fd_rwlock_write( tc->shmem->lock );
239 :
240 0 : FD_TEST( blockcache_pool_free( tc->blockcache_shmem_pool ) );
241 0 : ulong idx = blockcache_pool_idx_acquire( tc->blockcache_shmem_pool );
242 :
243 0 : blockcache_t * fork = &tc->blockcache_pool[ idx ];
244 0 : fd_txncache_fork_id_t fork_id = { .val = (ushort)idx };
245 :
246 0 : fork->shmem->generation = tc->shmem->blockcache_generation++;
247 0 : fork->shmem->child_id = (fd_txncache_fork_id_t){ .val = USHORT_MAX };
248 :
249 0 : if( FD_LIKELY( parent_fork_id.val==USHORT_MAX ) ) {
250 0 : FD_TEST( blockcache_pool_free( tc->blockcache_shmem_pool )==blockcache_pool_max( tc->blockcache_shmem_pool )-1UL );
251 0 : fork->shmem->parent_id = (fd_txncache_fork_id_t){ .val = USHORT_MAX };
252 0 : fork->shmem->sibling_id = (fd_txncache_fork_id_t){ .val = USHORT_MAX };
253 :
254 0 : descends_set_null( fork->descends );
255 0 : root_slist_ele_push_tail( tc->shmem->root_ll, fork->shmem, tc->blockcache_shmem_pool );
256 0 : } else {
257 0 : blockcache_t * parent = &tc->blockcache_pool[ parent_fork_id.val ];
258 : /* We might be tempted to freeze the parent here, and it's valid to
259 : do this ordinarily, but not when loading from a snapshot, when
260 : we need to load many transactions into a root parent chain at
261 : once. */
262 0 : fork->shmem->sibling_id = parent->shmem->child_id;
263 0 : fork->shmem->parent_id = parent_fork_id;
264 0 : parent->shmem->child_id = fork_id;
265 :
266 0 : descends_set_copy( fork->descends, parent->descends );
267 0 : descends_set_insert( fork->descends, parent_fork_id.val );
268 0 : }
269 :
270 0 : fork->shmem->txnhash_offset = 0UL;
271 0 : fork->shmem->frozen = 0;
272 0 : memset( fork->heads, 0xFF, tc->shmem->txn_per_slot_max*sizeof(uint) );
273 0 : fork->shmem->pages_cnt = 0;
274 0 : memset( fork->pages, 0xFF, tc->shmem->txnpages_per_blockhash_max*sizeof(fork->pages[ 0 ]) );
275 :
276 0 : fd_rwlock_unwrite( tc->shmem->lock );
277 0 : return fork_id;
278 0 : }
279 :
280 : void
281 : fd_txncache_attach_blockhash( fd_txncache_t * tc,
282 : fd_txncache_fork_id_t fork_id,
283 0 : uchar const * blockhash ) {
284 0 : fd_rwlock_write( tc->shmem->lock );
285 :
286 0 : blockcache_t * fork = &tc->blockcache_pool[ fork_id.val ];
287 0 : FD_TEST( !fork->shmem->frozen );
288 0 : fork->shmem->frozen = 1;
289 :
290 0 : memcpy( fork->shmem->blockhash.uc, blockhash, 32UL );
291 :
292 0 : blockhash_map_ele_insert( tc->blockhash_map, fork->shmem, tc->blockcache_shmem_pool );
293 :
294 0 : fd_rwlock_unwrite( tc->shmem->lock );
295 0 : }
296 :
297 : void
298 : fd_txncache_finalize_fork( fd_txncache_t * tc,
299 : fd_txncache_fork_id_t fork_id,
300 : ulong txnhash_offset,
301 0 : uchar const * blockhash ) {
302 0 : fd_rwlock_write( tc->shmem->lock );
303 :
304 0 : blockcache_t * fork = &tc->blockcache_pool[ fork_id.val ];
305 0 : FD_TEST( fork->shmem->frozen<=1 );
306 0 : FD_TEST( fork->shmem->frozen>=0 );
307 0 : fork->shmem->txnhash_offset = txnhash_offset;
308 :
309 0 : memcpy( fork->shmem->blockhash.uc, blockhash, 32UL );
310 :
311 0 : if( FD_LIKELY( !fork->shmem->frozen ) ) blockhash_map_ele_insert( tc->blockhash_map, fork->shmem, tc->blockcache_shmem_pool );
312 0 : fork->shmem->frozen = 2;
313 :
314 0 : fd_rwlock_unwrite( tc->shmem->lock );
315 0 : }
316 :
317 : static inline void
318 : remove_blockcache( fd_txncache_t * tc,
319 0 : blockcache_t * blockcache ) {
320 0 : FD_TEST( blockcache->shmem->frozen>=0 );
321 0 : memcpy( tc->txnpages_free+tc->shmem->txnpages_free_cnt, blockcache->pages, blockcache->shmem->pages_cnt*sizeof(tc->txnpages_free[ 0 ]) );
322 0 : tc->shmem->txnpages_free_cnt = (ushort)(tc->shmem->txnpages_free_cnt+blockcache->shmem->pages_cnt);
323 :
324 0 : ulong idx = blockcache_pool_idx( tc->blockcache_shmem_pool, blockcache->shmem );
325 0 : for( ulong i=0UL; i<tc->shmem->active_slots_max; i++ ) descends_set_remove( tc->blockcache_pool[ i ].descends, idx );
326 :
327 0 : if( FD_LIKELY( blockcache->shmem->frozen ) ) blockhash_map_ele_remove_fast( tc->blockhash_map, blockcache->shmem, tc->blockcache_shmem_pool );
328 0 : blockcache->shmem->frozen = -1;
329 0 : blockcache_pool_ele_release( tc->blockcache_shmem_pool, blockcache->shmem );
330 0 : }
331 :
332 : static inline void
333 : remove_children( fd_txncache_t * tc,
334 : blockcache_t const * fork,
335 0 : blockcache_t const * except ) {
336 0 : fd_txncache_fork_id_t sibling_idx = fork->shmem->child_id;
337 0 : while( sibling_idx.val!=USHORT_MAX ) {
338 0 : blockcache_t * sibling = &tc->blockcache_pool[ sibling_idx.val ];
339 :
340 0 : sibling_idx = sibling->shmem->sibling_id;
341 0 : if( FD_UNLIKELY( sibling==except ) ) continue;
342 :
343 0 : remove_children( tc, sibling, except );
344 0 : remove_blockcache( tc, sibling );
345 0 : }
346 0 : }
347 :
348 : void
349 : fd_txncache_cancel_fork( fd_txncache_t * tc,
350 0 : fd_txncache_fork_id_t fork_id ) {
351 0 : fd_rwlock_write( tc->shmem->lock );
352 0 : blockcache_t * fork = &tc->blockcache_pool[ fork_id.val ];
353 0 : FD_TEST( fork->shmem->child_id.val==USHORT_MAX );
354 0 : FD_TEST( fork->shmem->parent_id.val!=USHORT_MAX );
355 0 : remove_blockcache( tc, fork );
356 0 : ushort * fork_id_p = &(tc->blockcache_pool[ fork->shmem->parent_id.val ].shmem->child_id.val);
357 0 : while( *fork_id_p!=fork_id.val ) {
358 0 : fork_id_p = &(tc->blockcache_pool[ *fork_id_p ].shmem->sibling_id.val);
359 0 : }
360 0 : *fork_id_p = fork->shmem->sibling_id.val;
361 0 : fd_rwlock_unwrite( tc->shmem->lock );
362 0 : }
363 :
364 : void
365 : fd_txncache_advance_root( fd_txncache_t * tc,
366 0 : fd_txncache_fork_id_t fork_id ) {
367 0 : fd_rwlock_write( tc->shmem->lock );
368 :
369 0 : blockcache_t * fork = &tc->blockcache_pool[ fork_id.val ];
370 :
371 0 : blockcache_t * parent_fork = &tc->blockcache_pool[ fork->shmem->parent_id.val ];
372 0 : if( FD_UNLIKELY( root_slist_ele_peek_tail( tc->shmem->root_ll, tc->blockcache_shmem_pool )!=parent_fork->shmem ) ) {
373 0 : FD_BASE58_ENCODE_32_BYTES( parent_fork->shmem->blockhash.uc, parent_blockhash_b58 );
374 0 : FD_BASE58_ENCODE_32_BYTES( fork->shmem->blockhash.uc, fork_blockhash_b58 );
375 0 : FD_BASE58_ENCODE_32_BYTES( root_slist_ele_peek_tail( tc->shmem->root_ll, tc->blockcache_shmem_pool )->blockhash.uc, root_blockhash_b58 );
376 0 : FD_LOG_CRIT(( "advancing root from %s to %s but that is not valid, last root is %s",
377 0 : parent_blockhash_b58,
378 0 : fork_blockhash_b58,
379 0 : root_blockhash_b58 ));
380 0 : }
381 :
382 0 : FD_BASE58_ENCODE_32_BYTES( parent_fork->shmem->blockhash.uc, parent_blockhash_b58 );
383 0 : FD_BASE58_ENCODE_32_BYTES( fork->shmem->blockhash.uc, fork_blockhash_b58 );
384 0 : FD_LOG_DEBUG(( "advancing root from %s to %s",
385 0 : parent_blockhash_b58,
386 0 : fork_blockhash_b58 ));
387 :
388 : /* When a fork is rooted, any competing forks can be immediately
389 : removed as they will not be needed again. This includes child
390 : forks of the pruned siblings as well. */
391 0 : remove_children( tc, parent_fork, fork );
392 :
393 : /* Now, the earliest known rooted fork can likely be removed since its
394 : blockhashes cannot be referenced anymore (they are older than 151
395 : blockhashes away). */
396 0 : tc->shmem->root_cnt++;
397 0 : root_slist_ele_push_tail( tc->shmem->root_ll, fork->shmem, tc->blockcache_shmem_pool );
398 0 : if( FD_LIKELY( tc->shmem->root_cnt>FD_TXNCACHE_MAX_BLOCKHASH_DISTANCE ) ) {
399 0 : fd_txncache_blockcache_shmem_t * old_root_shmem = root_slist_ele_pop_head( tc->shmem->root_ll, tc->blockcache_shmem_pool );
400 0 : FD_TEST( old_root_shmem );
401 0 : blockcache_t * old_root = &tc->blockcache_pool[ blockcache_pool_idx( tc->blockcache_shmem_pool, old_root_shmem ) ];
402 :
403 0 : root_slist_ele_peek_head( tc->shmem->root_ll, tc->blockcache_shmem_pool )->parent_id.val = USHORT_MAX;
404 :
405 0 : remove_blockcache( tc, old_root );
406 0 : tc->shmem->root_cnt--;
407 0 : }
408 :
409 0 : fd_rwlock_unwrite( tc->shmem->lock );
410 0 : }
411 :
412 : static inline blockcache_t *
413 : blockhash_on_fork( fd_txncache_t * tc,
414 : blockcache_t const * fork,
415 0 : uchar const * blockhash ) {
416 0 : fd_txncache_blockcache_shmem_t const * candidate = blockhash_map_ele_query_const( tc->blockhash_map, fd_type_pun_const( blockhash ), NULL, tc->blockcache_shmem_pool );
417 0 : if( FD_UNLIKELY( !candidate ) ) return NULL;
418 :
419 0 : while( candidate ) {
420 0 : ulong candidate_idx = blockcache_pool_idx( tc->blockcache_shmem_pool, candidate );
421 0 : if( FD_LIKELY( descends_set_test( fork->descends, candidate_idx ) ) ) return &tc->blockcache_pool[ candidate_idx ];
422 0 : candidate = blockhash_map_ele_next_const( candidate, NULL, tc->blockcache_shmem_pool );
423 0 : }
424 0 : return NULL;
425 0 : }
426 :
427 : static void
428 : purge_stale_on_blockcache( fd_txncache_t * tc,
429 0 : blockcache_t * blockcache ) {
430 0 : memset( tc->scratch_heads, 0xFF, tc->shmem->txn_per_slot_max*sizeof(tc->scratch_heads[ 0 ]) );
431 0 : memset( tc->scratch_pages, 0xFF, tc->shmem->txnpages_per_blockhash_max*sizeof(tc->scratch_pages[ 0 ]) );
432 0 : ushort scratch_pages_cnt = 0;
433 0 : ushort scratch_txnpage_idx = USHORT_MAX;
434 0 : tc->scratch_txnpage->free = 0;
435 0 : for( ulong i=0UL; i<blockcache->shmem->pages_cnt; i++ ) {
436 0 : ushort curr_txnpage_idx = blockcache->pages[ blockcache->shmem->pages_cnt-i-1UL ];
437 0 : ulong curr_txn_cnt = FD_TXNCACHE_TXNS_PER_PAGE-tc->txnpages[ curr_txnpage_idx ].free;
438 0 : for( ulong j=0UL; j<curr_txn_cnt; j++ ) {
439 0 : fd_txncache_single_txn_t * curr_txn = tc->txnpages[ curr_txnpage_idx ].txns[ curr_txn_cnt-j-1UL ];
440 0 : blockcache_t const * txn_fork = &tc->blockcache_pool[ curr_txn->fork_id.val ];
441 0 : if( FD_LIKELY( txn_fork->shmem->frozen>=0 && txn_fork->shmem->generation==curr_txn->generation ) ) {
442 : /* Valid transaction. Keep. */
443 0 : if( FD_UNLIKELY( !tc->scratch_txnpage->free ) ) {
444 0 : FD_TEST( scratch_txnpage_idx!=curr_txnpage_idx );
445 0 : if( FD_LIKELY( scratch_txnpage_idx!=USHORT_MAX ) ) {
446 0 : fd_txncache_txnpage_t * txnpage = &tc->txnpages[ scratch_txnpage_idx ];
447 0 : memcpy( txnpage, tc->scratch_txnpage, sizeof(*txnpage) );
448 0 : }
449 0 : scratch_txnpage_idx = curr_txnpage_idx;
450 0 : tc->scratch_txnpage->free = FD_TXNCACHE_TXNS_PER_PAGE;
451 0 : tc->scratch_pages[ scratch_pages_cnt ] = scratch_txnpage_idx;
452 0 : scratch_pages_cnt++;
453 0 : }
454 0 : ulong txn_idx = FD_TXNCACHE_TXNS_PER_PAGE-tc->scratch_txnpage->free;
455 0 : memcpy( tc->scratch_txnpage->txns[ txn_idx ], curr_txn, sizeof(*curr_txn) );
456 0 : ulong txn_bucket = FD_LOAD( ulong, curr_txn->txnhash )%tc->shmem->txn_per_slot_max;
457 0 : uint head = tc->scratch_heads[ txn_bucket ];
458 0 : tc->scratch_txnpage->txns[ txn_idx ]->blockcache_next = head;
459 0 : ulong txn_gidx = FD_TXNCACHE_TXNS_PER_PAGE*scratch_txnpage_idx+txn_idx;
460 0 : FD_TEST( txn_gidx<UINT_MAX );
461 0 : tc->scratch_heads[ txn_bucket ] = (uint)txn_gidx;
462 0 : tc->scratch_txnpage->free--;
463 0 : } else {
464 : /* Stale transaction. Drop. */
465 0 : continue;
466 0 : }
467 0 : }
468 0 : if( FD_UNLIKELY( curr_txnpage_idx!=scratch_txnpage_idx ) ) {
469 : /* The txnpage is not being used for compaction, free it up. */
470 0 : tc->txnpages_free[ tc->shmem->txnpages_free_cnt ] = curr_txnpage_idx;
471 0 : tc->shmem->txnpages_free_cnt++;
472 0 : }
473 0 : }
474 0 : if( FD_LIKELY( scratch_txnpage_idx!=USHORT_MAX ) ) {
475 0 : fd_txncache_txnpage_t * txnpage = &tc->txnpages[ scratch_txnpage_idx ];
476 0 : memcpy( txnpage, tc->scratch_txnpage, sizeof(*txnpage) );
477 0 : }
478 0 : blockcache->shmem->pages_cnt = scratch_pages_cnt;
479 0 : memcpy( blockcache->pages, tc->scratch_pages, tc->shmem->txnpages_per_blockhash_max*sizeof(blockcache->pages[0]) );
480 0 : memcpy( blockcache->heads, tc->scratch_heads, tc->shmem->txn_per_slot_max*sizeof(blockcache->heads[0]) );
481 0 : }
482 :
483 : static void
484 : purge_stale_on_fork( fd_txncache_t * tc,
485 0 : blockcache_t * fork ) {
486 0 : purge_stale_on_blockcache( tc, fork );
487 :
488 0 : fd_txncache_fork_id_t sibling_idx = fork->shmem->child_id;
489 0 : while( sibling_idx.val!=USHORT_MAX ) {
490 0 : blockcache_t * sibling = &tc->blockcache_pool[ sibling_idx.val ];
491 0 : purge_stale_on_fork( tc, sibling );
492 0 : sibling_idx = sibling->shmem->sibling_id;
493 0 : }
494 0 : }
495 :
496 : static void
497 0 : purge_stale( fd_txncache_t * tc ) {
498 0 : fd_txncache_blockcache_shmem_t * root_shmem = root_slist_ele_peek_head( tc->shmem->root_ll, tc->blockcache_shmem_pool );
499 0 : FD_TEST( root_shmem );
500 0 : blockcache_t * root = &tc->blockcache_pool[ blockcache_pool_idx( tc->blockcache_shmem_pool, root_shmem ) ];
501 0 : ushort free_before = tc->shmem->txnpages_free_cnt;
502 : /* One might think that an optimization here is to stop the descent on
503 : the latest rooted blockcache. There could be no pruned minority
504 : forks from that point on. As a result, there could be no stale
505 : transactions in any blockcache descending from that.
506 : Unfortunately, frontier eviction means that any blockcache in the
507 : fork tree can have stale transactions. */
508 0 : purge_stale_on_fork( tc, root );
509 0 : FD_LOG_NOTICE(( "purge_stale: txnpages_free %hu -> %hu", free_before, tc->shmem->txnpages_free_cnt ));
510 0 : }
511 :
512 : void
513 : fd_txncache_insert( fd_txncache_t * tc,
514 : fd_txncache_fork_id_t fork_id,
515 : uchar const * blockhash,
516 0 : uchar const * txnhash ) {
517 0 : fd_rwlock_read( tc->shmem->lock );
518 :
519 0 : blockcache_t const * fork = &tc->blockcache_pool[ fork_id.val ];
520 0 : FD_TEST( fork->shmem->frozen<=1 );
521 0 : FD_TEST( fork->shmem->frozen>=0 );
522 0 : blockcache_t * blockcache = blockhash_on_fork( tc, fork, blockhash );
523 0 : FD_TEST( blockcache );
524 :
525 0 : for(;;) {
526 0 : fd_txncache_txnpage_t * txnpage = fd_txncache_ensure_txnpage( tc, blockcache );
527 0 : if( FD_UNLIKELY( !txnpage ) ) {
528 : /* Because of sizing invariants when creating the structure, it is
529 : not typically possible to fill it, unless there are stale
530 : transactions from minority forks that were purged floating
531 : around, in which case we can purge them here and try again. */
532 0 : fd_rwlock_unread( tc->shmem->lock );
533 0 : fd_rwlock_write( tc->shmem->lock );
534 0 : if( FD_LIKELY( !fd_txncache_ensure_txnpage( tc, blockcache ) ) ) purge_stale( tc );
535 0 : fd_rwlock_unwrite( tc->shmem->lock );
536 0 : fd_rwlock_read( tc->shmem->lock );
537 0 : continue;
538 0 : }
539 :
540 0 : int success = fd_txncache_insert_txn( tc, blockcache, txnpage, fork_id, txnhash );
541 0 : if( FD_LIKELY( success ) ) break;
542 :
543 0 : FD_SPIN_PAUSE();
544 0 : }
545 :
546 0 : fd_rwlock_unread( tc->shmem->lock );
547 0 : }
548 :
549 : int
550 : fd_txncache_query( fd_txncache_t * tc,
551 : fd_txncache_fork_id_t fork_id,
552 : uchar const * blockhash,
553 0 : uchar const * txnhash ) {
554 0 : fd_rwlock_read( tc->shmem->lock );
555 :
556 0 : blockcache_t const * fork = &tc->blockcache_pool[ fork_id.val ];
557 0 : FD_TEST( fork->shmem->frozen>=0 );
558 0 : blockcache_t const * blockcache = blockhash_on_fork( tc, fork, blockhash );
559 0 : FD_TEST( blockcache );
560 0 : FD_TEST( blockcache->shmem->frozen==2 );
561 :
562 0 : int found = 0;
563 :
564 0 : ulong txnhash_offset = blockcache->shmem->txnhash_offset;
565 0 : ulong head_hash = FD_LOAD( ulong, txnhash+txnhash_offset ) % tc->shmem->txn_per_slot_max;
566 0 : for( uint head=blockcache->heads[ head_hash ]; head!=UINT_MAX; head=tc->txnpages[ head/FD_TXNCACHE_TXNS_PER_PAGE ].txns[ head%FD_TXNCACHE_TXNS_PER_PAGE ]->blockcache_next ) {
567 0 : fd_txncache_single_txn_t * txn = tc->txnpages[ head/FD_TXNCACHE_TXNS_PER_PAGE ].txns[ head%FD_TXNCACHE_TXNS_PER_PAGE ];
568 :
569 0 : blockcache_t const * txn_fork = &tc->blockcache_pool[ txn->fork_id.val ];
570 0 : int descends = (txn->fork_id.val==fork_id.val || descends_set_test( fork->descends, txn->fork_id.val )) && txn_fork->shmem->frozen>=0 && txn_fork->shmem->generation==txn->generation;
571 0 : if( FD_LIKELY( descends && !memcmp( txnhash+txnhash_offset, txn->txnhash, 20UL ) ) ) {
572 0 : found = 1;
573 0 : break;
574 0 : }
575 0 : }
576 :
577 0 : fd_rwlock_unread( tc->shmem->lock );
578 0 : return found;
579 0 : }
|