Line data Source code
1 : #include "fd_txncache.h"
2 : #include "../fd_rwlock.h"
3 :
4 : #define SORT_NAME sort_slot_ascend
5 : #define SORT_KEY_T ulong
6 : #define SORT_BEFORE(a,b) (a)<(b)
7 : #include "../../util/tmpl/fd_sort.c"
8 :
9 : /* TODO: This data structure needs a careful audit and testing. It may need
10 : to be reworked to support better fork-aware behavior. */
11 :
12 : /* The number of transactions in each page. This needs to be high
13 : enough to amoritze the cost of caller code reserving pages from,
14 : and returning pages to the pool, but not so high that the memory
15 : wasted from blockhashes with only one transaction is significant. */
16 :
17 0 : #define FD_TXNCACHE_TXNS_PER_PAGE (16384UL)
18 :
19 : /* The number of unique entries in the hash lookup table for each
20 : blockhash. A higher value here uses more memory but enables faster
21 : lookups. */
22 :
23 0 : #define FD_TXNCACHE_BLOCKCACHE_MAP_CNT (524288UL)
24 :
25 : /* The number of unique entries in the hash lookup table for each
26 : (slot, blockhash). This prevents all the entries needing to be in
27 : one slotcache list, so insertions can happen concurrently. */
28 :
29 0 : #define FD_TXNCACHE_SLOTCACHE_MAP_CNT (1024UL)
30 :
31 : /* Value for an empty blockcache `max_slot` or empty slotcache
32 : `slot` entry. When the entries are set to this value, we can insert
33 : to the entry, but stop iterating while running queries. */
34 :
35 0 : #define FD_TXNCACHE_EMPTY_ENTRY (ULONG_MAX)
36 :
37 : /* Value for a deleted cache entry. We can insert to such entries,
38 : and keep iterating while running queries. */
39 :
40 0 : #define FD_TXNCACHE_TOMBSTONE_ENTRY (ULONG_MAX-1UL)
41 :
42 : /* Placeholder value used for critical sections. */
43 :
44 : #define FD_TXNCACHE_TEMP_ENTRY (ULONG_MAX-2UL)
45 :
46 : struct fd_txncache_private_txn {
47 : uint blockcache_next; /* Pointer to the next element in the blockcache hash chain containing this entry from the pool. */
48 : uint slotblockcache_next; /* Pointer to the next element in the slotcache hash chain containing this entry from the pool. */
49 :
50 : ulong slot; /* Slot that the transaction was executed. A transaction might be in the cache
51 : multiple times if it was executed in a different slot on different forks. The
52 : same slot will not appear multiple times however. */
53 : uchar txnhash[ 20 ]; /* The transaction hash, truncated to 20 bytes. The hash is not always the first 20
54 : bytes, but is 20 bytes starting at some arbitrary offset given by the key_offset value
55 : of the containing by_blockhash entry. */
56 : uchar result; /* The result of executing the transaction. This is the discriminant of the transaction
57 : result type. 0 means success. */
58 : };
59 :
60 : typedef struct fd_txncache_private_txn fd_txncache_private_txn_t;
61 :
62 : struct fd_txncache_private_txnpage {
63 : ushort free; /* The number of free txn entries in this page. */
64 : fd_txncache_private_txn_t txns[ FD_TXNCACHE_TXNS_PER_PAGE][ 1 ]; /* The transactions in the page. */
65 : };
66 :
67 : typedef struct fd_txncache_private_txnpage fd_txncache_private_txnpage_t;
68 :
69 : struct fd_txncache_private_blockcache {
70 : uchar blockhash[ 32 ]; /* The actual blockhash of these transactions. */
71 : ulong max_slot; /* The max slot we have seen that contains a transaction referencing this blockhash.
72 : The blockhash entry will not be purged until the lowest rooted slot is greater than this. */
73 : ulong txnhash_offset; /* To save memory, the Agave validator decided to truncate the hash of transactions stored in
74 : this memory to 20 bytes rather than 32 bytes. The bytes used are not the first 20 as you
75 : might expect, but instead the first 20 starting at some random offset into the transaction
76 : hash (starting between 0 and len(hash)-20, a/k/a 44 for signatures, and 12 for hashes).
77 :
78 : In an unfortunate turn, the offset is also propogated to peers via. snapshot responses,
79 : which only communicate the offset and the respective 20 bytes. To make sure we are
80 : deduplicating incoming transactions correctly, we must replicate this system even though
81 : it would be easier to just always take the first 20 bytes. For transactions that we
82 : insert into the cache ourselves, we do just always use a key_offset of zero, so this is
83 : only nonzero when constructed form a peer snapshot. */
84 :
85 : uint heads[ FD_TXNCACHE_BLOCKCACHE_MAP_CNT ]; /* The hash table for the blockhash. Each entry is a pointer to the head of a
86 : linked list of transactions that reference this blockhash. As we add
87 : transactions to the bucket, the head pointer is updated to the new item, and
88 : the new item is pointed to the previous head. */
89 :
90 : ushort pages_cnt; /* The number of txnpages currently in use to store the transactions in this blockcache. */
91 : uint * pages; /* A list of the txnpages containing the transactions for this blockcache. */
92 : };
93 :
94 : typedef struct fd_txncache_private_blockcache fd_txncache_private_blockcache_t;
95 :
96 : struct fd_txncache_private_slotblockcache {
97 : uchar blockhash[ 32 ]; /* The actual blockhash of these transactions. */
98 : ulong txnhash_offset; /* As described above. */
99 : uint heads[ FD_TXNCACHE_SLOTCACHE_MAP_CNT ]; /* A map of the head of a linked list of tansactions in this slot and blockhash */
100 : };
101 :
102 : typedef struct fd_txncache_private_slotblockcache fd_txncache_private_slotblockcache_t;
103 :
104 : struct fd_txncache_private_slotcache {
105 : ulong slot; /* The slot that this slotcache is for. */
106 : fd_txncache_private_slotblockcache_t blockcache[ 300UL ];
107 : };
108 :
109 : typedef struct fd_txncache_private_slotcache fd_txncache_private_slotcache_t;
110 :
111 : struct __attribute__((aligned(FD_TXNCACHE_ALIGN))) fd_txncache_private {
112 : fd_rwlock_t lock[ 1 ]; /* The txncache is a concurrent structure and will be accessed by multiple threads
113 : concurrently. Insertion and querying only take a read lock as they can be done
114 : lockless but all other operations will take a write lock internally. */
115 :
116 : ulong root_slots_max;
117 : ulong live_slots_max;
118 : ushort txnpages_per_blockhash_max;
119 : uint txnpages_max;
120 :
121 : ulong root_slots_cnt; /* The number of root slots being tracked in the below array. */
122 : ulong root_slots_off; /* The highest N slots that have been rooted. These slots are
123 : used to determine which transactions should be kept around to
124 : be queried and served to snapshot requests. The actual
125 : footprint for this data (and other data below) are declared
126 : immediately following the struct. I.e. these pointers point to
127 : memory not far after the struct. */
128 :
129 : ulong blockcache_off; /* The actual cache of transactions. This is a linear probed hash
130 : table that maps blockhashes to the transactions that reference them.
131 : The depth of the hash table is live_slots_max, since this is the
132 : maximum number of blockhashes that can be alive. The loading factor
133 : if they were all alive would be 1.0, but this is rare because we
134 : will almost never fork repeatedly to hit this limit. These
135 : blockcaches are just pointers to pages from the txnpages below, so
136 : they don't take up much memory. */
137 :
138 : ulong slotcache_off; /* The cache of transactions by slot instead of by blockhash, so we
139 : can quickly serialize the slot deltas for the root slots which are
140 : served to peers in snapshots. Similar to the above, it uses the
141 : same underlying transaction storage, but different lookup tables. */
142 :
143 : uint txnpages_free_cnt; /* The number of pages in the txnpages that are not currently in use. */
144 : ulong txnpages_free_off; /* The index in the txnpages array that is free, for each of the free pages. */
145 :
146 : ulong txnpages_off; /* The actual storage for the transactions. The blockcache points to these
147 : pages when storing transactions. Transaction are grouped into pages of
148 : size 16384 to make certain allocation and deallocation operations faster
149 : (just the pages are acquired/released, rather than each txn). */
150 :
151 : ulong blockcache_pages_off; /* The pages for the blockcache entries. */
152 :
153 : ulong probed_entries_off; /* The map of index to number of entries which oveflowed over this index.
154 : Overflow for index i is defined as every entry j > i where j should have
155 : been inserted at k < i. */
156 :
157 : ulong magic; /* ==FD_TXNCACHE_MAGIC */
158 : };
159 :
160 : FD_FN_PURE static ulong *
161 0 : fd_txncache_get_root_slots( fd_txncache_t * tc ) {
162 0 : return (ulong *)( (uchar *)tc + tc->root_slots_off );
163 0 : }
164 :
165 : FD_FN_PURE static fd_txncache_private_blockcache_t *
166 0 : fd_txncache_get_blockcache( fd_txncache_t * tc ) {
167 0 : return (fd_txncache_private_blockcache_t *)( (uchar *)tc + tc->blockcache_off );
168 0 : }
169 :
170 : FD_FN_PURE static fd_txncache_private_blockcache_t *
171 0 : fd_txncache_get_blockcache_const( fd_txncache_t const * tc ) {
172 0 : return (fd_txncache_private_blockcache_t *)( (uchar const *)tc + tc->blockcache_off );
173 0 : }
174 :
175 : FD_FN_PURE static fd_txncache_private_slotcache_t *
176 0 : fd_txncache_get_slotcache( fd_txncache_t * tc ) {
177 0 : return (fd_txncache_private_slotcache_t *)( (uchar *)tc + tc->slotcache_off );
178 0 : }
179 :
180 : FD_FN_PURE static fd_txncache_private_slotcache_t *
181 0 : fd_txncache_get_slotcache_const( fd_txncache_t const * tc ) {
182 0 : return (fd_txncache_private_slotcache_t *)( (uchar const *)tc + tc->slotcache_off );
183 0 : }
184 :
185 : FD_FN_PURE static uint *
186 0 : fd_txncache_get_txnpages_free( fd_txncache_t * tc ) {
187 0 : return (uint *)( (uchar *)tc + tc->txnpages_free_off );
188 0 : }
189 :
190 : FD_FN_PURE static fd_txncache_private_txnpage_t *
191 0 : fd_txncache_get_txnpages( fd_txncache_t * tc ) {
192 0 : return (fd_txncache_private_txnpage_t *)( (uchar *)tc + tc->txnpages_off );
193 0 : }
194 :
195 : FD_FN_PURE static ulong *
196 0 : fd_txncache_get_probed_entries( fd_txncache_t * tc ) {
197 0 : return (ulong *)( (uchar *)tc + tc->probed_entries_off );
198 0 : }
199 :
200 : FD_FN_PURE static ulong *
201 0 : fd_txncache_get_probed_entries_const( fd_txncache_t const * tc ) {
202 0 : return (ulong *)( (uchar const *)tc + tc->probed_entries_off );
203 0 : }
204 :
205 : FD_FN_CONST static ushort
206 0 : fd_txncache_max_txnpages_per_blockhash( ulong max_txn_per_slot ) {
207 : /* The maximum number of transaction pages we might need to store all
208 : the transactions that could be seen in a blockhash.
209 :
210 : In the worst case, every transaction in every slot refers to
211 : the same blockhash for as long as it is possible (150 slots
212 : following the slot where the blockhash is produced). So there
213 : could be up to
214 :
215 : 524,288 * 150 = 78,643,200
216 :
217 : Note that the blockcaches store txns for forks, and the same txn
218 : might appear multiple times in one block, but if there's a fork,
219 : the fork has to have skipped slots (had 0 txns in them), so it
220 : cannot cause this limit to go higher.
221 :
222 : Transactions referenced by a particular blockhash.
223 : Transactions are stored in pages of 16,384, so we might need up
224 : to 4,800 of these pages to store all the transactions in a
225 : slot. */
226 :
227 0 : ulong result = 1UL+(max_txn_per_slot*150UL-1UL)/FD_TXNCACHE_TXNS_PER_PAGE;
228 0 : if( FD_UNLIKELY( result>USHORT_MAX ) ) return 0;
229 0 : return (ushort)result;
230 0 : }
231 :
232 : FD_FN_CONST static uint
233 : fd_txncache_max_txnpages( ulong max_live_slots,
234 0 : ulong max_txn_per_slot ) {
235 : /* We need to be able to store potentially every slot that is live
236 : being completely full of transactions. This would be
237 :
238 : max_live_slots*max_txn_per_slot
239 :
240 : transactions, except that we are counting pages here, not
241 : transactions. It's not enough to divide by the page size, because
242 : pages might be wasted. The maximum page wastage occurs when all
243 : the blockhashes except one have one transaction in them, and the
244 : remaining blockhash has all other transactions. In that case, the
245 : full blockhash needs
246 :
247 : (max_live_slots*max_txn_per_slot)/FD_TXNCACHE_TXNS_PER_PAGE
248 :
249 : pages, and the other blockhashes need 1 page each. */
250 :
251 0 : ulong result = max_live_slots-1UL+max_live_slots*(1UL+(max_txn_per_slot-1UL)/FD_TXNCACHE_TXNS_PER_PAGE);
252 0 : if( FD_UNLIKELY( result>UINT_MAX ) ) return 0;
253 0 : return (uint)result;
254 0 : }
255 :
256 : FD_FN_CONST ulong
257 0 : fd_txncache_align( void ) {
258 0 : return FD_TXNCACHE_ALIGN;
259 0 : }
260 :
261 : FD_FN_CONST ulong
262 : fd_txncache_footprint( ulong max_rooted_slots,
263 : ulong max_live_slots,
264 0 : ulong max_txn_per_slot ) {
265 0 : if( FD_UNLIKELY( max_rooted_slots<1UL || max_live_slots<1UL ) ) return 0UL;
266 0 : if( FD_UNLIKELY( max_live_slots<max_rooted_slots ) ) return 0UL;
267 0 : if( FD_UNLIKELY( max_txn_per_slot<1UL ) ) return 0UL;
268 0 : if( FD_UNLIKELY( !fd_ulong_is_pow2( max_live_slots ) || !fd_ulong_is_pow2( max_txn_per_slot ) ) ) return 0UL;
269 :
270 : /* To save memory, txnpages are referenced as uint which is enough
271 : to support mainnet parameters without overflow. */
272 0 : uint max_txnpages = fd_txncache_max_txnpages( max_live_slots, max_txn_per_slot );
273 0 : if( FD_UNLIKELY( !max_txnpages ) ) return 0UL;
274 :
275 0 : ulong max_txnpages_per_blockhash = fd_txncache_max_txnpages_per_blockhash( max_txn_per_slot );
276 0 : if( FD_UNLIKELY( !max_txnpages_per_blockhash ) ) return 0UL;
277 :
278 0 : ulong l;
279 0 : l = FD_LAYOUT_INIT;
280 0 : l = FD_LAYOUT_APPEND( l, FD_TXNCACHE_ALIGN, sizeof(fd_txncache_t) );
281 0 : l = FD_LAYOUT_APPEND( l, alignof(ulong), max_rooted_slots*sizeof(ulong) ); /* root_slots */
282 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_txncache_private_blockcache_t), max_live_slots*sizeof(fd_txncache_private_blockcache_t) ); /* blockcache */
283 0 : l = FD_LAYOUT_APPEND( l, alignof(uint), max_live_slots*max_txnpages_per_blockhash*sizeof(uint) ); /* blockcache->pages */
284 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_txncache_private_slotcache_t), max_live_slots*sizeof(fd_txncache_private_slotcache_t ) ); /* slotcache */
285 0 : l = FD_LAYOUT_APPEND( l, alignof(uint), max_txnpages*sizeof(uint) ); /* txnpages_free */
286 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_txncache_private_txnpage_t), max_txnpages*sizeof(fd_txncache_private_txnpage_t) ); /* txnpages */
287 0 : l = FD_LAYOUT_APPEND( l, alignof(ulong), max_live_slots*sizeof(ulong) ); /* probed entries */
288 0 : return FD_LAYOUT_FINI( l, FD_TXNCACHE_ALIGN );
289 0 : }
290 :
291 : void *
292 : fd_txncache_new( void * shmem,
293 : ulong max_rooted_slots,
294 : ulong max_live_slots,
295 0 : ulong max_txn_per_slot ) {
296 0 : fd_txncache_t * tc = (fd_txncache_t *)shmem;
297 :
298 0 : if( FD_UNLIKELY( !shmem ) ) {
299 0 : FD_LOG_WARNING(( "NULL shmem" ));
300 0 : return NULL;
301 0 : }
302 :
303 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_txncache_align() ) ) ) {
304 0 : FD_LOG_WARNING(( "misaligned shmem" ));
305 0 : return NULL;
306 0 : }
307 :
308 0 : if( FD_UNLIKELY( !max_rooted_slots ) ) return NULL;
309 0 : if( FD_UNLIKELY( !max_live_slots ) ) return NULL;
310 0 : if( FD_UNLIKELY( max_live_slots<max_rooted_slots ) ) return NULL;
311 0 : if( FD_UNLIKELY( !max_txn_per_slot ) ) return NULL;
312 0 : if( FD_UNLIKELY( !fd_ulong_is_pow2( max_live_slots ) || !fd_ulong_is_pow2( max_txn_per_slot ) ) ) return NULL;
313 :
314 0 : uint max_txnpages = fd_txncache_max_txnpages( max_live_slots, max_txn_per_slot );
315 0 : ushort max_txnpages_per_blockhash = fd_txncache_max_txnpages_per_blockhash( max_txn_per_slot );
316 :
317 0 : if( FD_UNLIKELY( !max_txnpages ) ) return NULL;
318 0 : if( FD_UNLIKELY( !max_txnpages_per_blockhash ) ) return NULL;
319 :
320 0 : FD_SCRATCH_ALLOC_INIT( l, shmem );
321 0 : fd_txncache_t * txncache = FD_SCRATCH_ALLOC_APPEND( l, FD_TXNCACHE_ALIGN, sizeof(fd_txncache_t) );
322 0 : void * _root_slots = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), max_rooted_slots*sizeof(ulong) );
323 0 : void * _blockcache = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_txncache_private_blockcache_t), max_live_slots*sizeof(fd_txncache_private_blockcache_t) );
324 0 : void * _blockcache_pages = FD_SCRATCH_ALLOC_APPEND( l, alignof(uint), max_live_slots*max_txnpages_per_blockhash*sizeof(uint) );
325 0 : void * _slotcache = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_txncache_private_slotcache_t), max_live_slots*sizeof(fd_txncache_private_slotcache_t ) );
326 0 : void * _txnpages_free = FD_SCRATCH_ALLOC_APPEND( l, alignof(uint), max_txnpages*sizeof(uint) );
327 0 : void * _txnpages = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_txncache_private_txnpage_t), max_txnpages*sizeof(fd_txncache_private_txnpage_t) );
328 0 : void * _probed_entries = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), max_live_slots*sizeof(ulong) );
329 :
330 : /* We calculate and store the offsets for these allocations. */
331 0 : txncache->root_slots_off = (ulong)_root_slots - (ulong)txncache;
332 0 : txncache->blockcache_off = (ulong)_blockcache - (ulong)txncache;
333 0 : txncache->slotcache_off = (ulong)_slotcache - (ulong)txncache;
334 0 : txncache->txnpages_free_off = (ulong)_txnpages_free - (ulong)txncache;
335 0 : txncache->txnpages_off = (ulong)_txnpages - (ulong)txncache;
336 0 : txncache->blockcache_pages_off = (ulong)_blockcache_pages - (ulong)txncache;
337 0 : txncache->probed_entries_off = (ulong)_probed_entries - (ulong)txncache;
338 :
339 0 : tc->lock->value = 0;
340 0 : tc->root_slots_cnt = 0UL;
341 :
342 0 : tc->root_slots_max = max_rooted_slots;
343 0 : tc->live_slots_max = max_live_slots;
344 0 : tc->txnpages_per_blockhash_max = max_txnpages_per_blockhash;
345 0 : tc->txnpages_max = max_txnpages;
346 :
347 0 : ulong * root_slots = (ulong *)_root_slots;
348 0 : memset( root_slots, 0xFF, max_rooted_slots*sizeof(ulong) );
349 :
350 0 : fd_txncache_private_blockcache_t * blockcache = (fd_txncache_private_blockcache_t *)_blockcache;
351 0 : fd_txncache_private_slotcache_t * slotcache = (fd_txncache_private_slotcache_t *)_slotcache;
352 0 : ulong * probed_entries = (ulong *)_probed_entries;
353 0 : for( ulong i=0UL; i<max_live_slots; i++ ) {
354 0 : blockcache[ i ].max_slot = FD_TXNCACHE_EMPTY_ENTRY;
355 0 : slotcache[ i ].slot = FD_TXNCACHE_EMPTY_ENTRY;
356 0 : probed_entries[ i ] = 0UL;
357 0 : }
358 :
359 0 : tc->txnpages_free_cnt = max_txnpages;
360 0 : uint * txnpages_free = _txnpages_free;
361 0 : for( uint i=0; i<max_txnpages; i++ ) txnpages_free[ i ] = i;
362 :
363 0 : FD_COMPILER_MFENCE();
364 0 : FD_VOLATILE( tc->magic ) = FD_TXNCACHE_MAGIC;
365 0 : FD_COMPILER_MFENCE();
366 :
367 0 : return (void *)tc;
368 0 : }
369 :
370 : fd_txncache_t *
371 0 : fd_txncache_join( void * shtc ) {
372 0 : if( FD_UNLIKELY( !shtc ) ) {
373 0 : FD_LOG_WARNING(( "NULL shtc" ));
374 0 : return NULL;
375 0 : }
376 :
377 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shtc, fd_txncache_align() ) ) ) {
378 0 : FD_LOG_WARNING(( "misaligned shtc" ));
379 0 : return NULL;
380 0 : }
381 :
382 0 : fd_txncache_t * tc = (fd_txncache_t *)shtc;
383 :
384 0 : if( FD_UNLIKELY( tc->magic!=FD_TXNCACHE_MAGIC ) ) {
385 0 : FD_LOG_WARNING(( "bad magic" ));
386 0 : return NULL;
387 0 : }
388 :
389 0 : uchar * base = (uchar *)tc;
390 0 : fd_txncache_private_blockcache_t * blockcache = (fd_txncache_private_blockcache_t *)( base + tc->blockcache_off );
391 :
392 0 : void * _blockcache_pages = base + tc->blockcache_pages_off;
393 0 : for( ulong i=0UL; i<tc->live_slots_max; i++ ) {
394 0 : blockcache[ i ].pages = (uint *)_blockcache_pages + i*tc->txnpages_per_blockhash_max;
395 0 : }
396 0 : return tc;
397 0 : }
398 :
399 : void *
400 0 : fd_txncache_leave( fd_txncache_t * tc ) {
401 0 : if( FD_UNLIKELY( !tc ) ) {
402 0 : FD_LOG_WARNING(( "NULL tc" ));
403 0 : return NULL; }
404 :
405 0 : return (void *)tc;
406 0 : }
407 :
408 : void *
409 0 : fd_txncache_delete( void * shtc ) {
410 0 : if( FD_UNLIKELY( !shtc ) ) {
411 0 : FD_LOG_WARNING(( "NULL shtc" ));
412 0 : return NULL;
413 0 : }
414 :
415 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shtc, fd_txncache_align() ) ) ) {
416 0 : FD_LOG_WARNING(( "misaligned shtc" ));
417 0 : return NULL;
418 0 : }
419 :
420 0 : fd_txncache_t * tc = (fd_txncache_t *)shtc;
421 :
422 0 : if( FD_UNLIKELY( tc->magic!=FD_TXNCACHE_MAGIC ) ) {
423 0 : FD_LOG_WARNING(( "bad magic" ));
424 0 : return NULL;
425 0 : }
426 :
427 0 : FD_COMPILER_MFENCE();
428 0 : FD_VOLATILE( tc->magic ) = 0UL;
429 0 : FD_COMPILER_MFENCE();
430 :
431 0 : return (void *)tc;
432 0 : }
433 :
434 : static void
435 : fd_txncache_remove_blockcache_idx( fd_txncache_t * tc,
436 0 : ulong idx ) {
437 0 : fd_txncache_private_blockcache_t * blockcache = fd_txncache_get_blockcache( tc );
438 0 : ulong * probed_entries = fd_txncache_get_probed_entries( tc );
439 0 : uint * txnpages_free = fd_txncache_get_txnpages_free( tc );
440 :
441 : /* Check if removing this element caused there to be no overflow for a hash index. */
442 0 : ulong hash_idx = FD_LOAD( ulong, blockcache[ idx ].blockhash )%tc->live_slots_max;
443 :
444 0 : ulong j = hash_idx;
445 0 : while( j != idx ) {
446 0 : probed_entries[ j ]--;
447 : /* If there is no overflow and the slot is a tombstone, mark it as free. */
448 0 : if( probed_entries[ j ] == 0 && blockcache[ j ].max_slot == FD_TXNCACHE_TOMBSTONE_ENTRY ) {
449 0 : blockcache[ j ].max_slot = FD_TXNCACHE_EMPTY_ENTRY;
450 0 : }
451 0 : j = (j+1)%tc->live_slots_max;
452 0 : }
453 :
454 : /* Remove from block cache. */
455 0 : blockcache[ idx ].max_slot = (probed_entries[ idx ] == 0 ? FD_TXNCACHE_EMPTY_ENTRY : FD_TXNCACHE_TOMBSTONE_ENTRY);
456 :
457 : /* Free pages. */
458 0 : memcpy( txnpages_free+tc->txnpages_free_cnt, blockcache[ idx ].pages, blockcache[ idx ].pages_cnt*sizeof(uint) );
459 0 : tc->txnpages_free_cnt += blockcache[ idx ].pages_cnt;
460 0 : }
461 :
462 : static void
463 : fd_txncache_remove_slotcache_idx( fd_txncache_t * tc,
464 0 : ulong idx ) {
465 0 : fd_txncache_private_slotcache_t * slotcache = fd_txncache_get_slotcache( tc );
466 0 : slotcache[ idx ].slot = FD_TXNCACHE_TOMBSTONE_ENTRY;
467 0 : }
468 :
469 : static void
470 : fd_txncache_purge_slot( fd_txncache_t * tc,
471 0 : ulong slot ) {
472 0 : ulong not_purged_cnt = 0;
473 0 : ulong purged_cnt = 0;
474 0 : ulong max_distance = 0;
475 0 : ulong sum_distance = 0;
476 0 : ulong empty_entry_cnt = 0;
477 0 : ulong tombstone_entry_cnt = 0;
478 0 : fd_txncache_private_blockcache_t * blockcache = fd_txncache_get_blockcache( tc );
479 0 : for( ulong i=0UL; i<tc->live_slots_max; i++ ) {
480 0 : if( FD_LIKELY( blockcache[ i ].max_slot==FD_TXNCACHE_EMPTY_ENTRY || blockcache[ i ].max_slot==FD_TXNCACHE_TOMBSTONE_ENTRY || (blockcache[ i ].max_slot)>slot ) ) {
481 0 : if( blockcache[ i ].max_slot==FD_TXNCACHE_EMPTY_ENTRY ) {
482 0 : empty_entry_cnt++;
483 0 : } else if ( blockcache[ i ].max_slot==FD_TXNCACHE_TOMBSTONE_ENTRY ) {
484 0 : tombstone_entry_cnt++;
485 0 : } else {
486 0 : not_purged_cnt++;
487 0 : ulong dist = blockcache[ i ].max_slot-slot;
488 0 : max_distance = fd_ulong_max( max_distance, dist );
489 0 : sum_distance += blockcache[ i ].max_slot-slot;
490 0 : }
491 0 : continue;
492 0 : }
493 0 : fd_txncache_remove_blockcache_idx( tc, i );
494 0 : purged_cnt++;
495 0 : }
496 0 : ulong avg_distance = (not_purged_cnt==0) ? ULONG_MAX : (sum_distance/not_purged_cnt);
497 0 : FD_LOG_INFO(( "not purging cnt - purge_slot: %lu, purged_cnt: %lu, not_purged_cnt: %lu, empty_entry_cnt: %lu, tombstone_entry_cnt: %lu, max_distance: %lu, avg_distance: %lu",
498 0 : slot, purged_cnt, not_purged_cnt, empty_entry_cnt, tombstone_entry_cnt, max_distance, avg_distance ));
499 :
500 : /* TODO: figure out how to make slotcache purging work with the max_slot purge for blockcache.
501 : The blockcache and slotcache share txnpages and the aggressive purging from the blockcache
502 : can lead to corruption in the slotcache. Does it make sense to generate the delta map (as
503 : generated by Agave) from the blockcache when producing snapshots? TBD. */
504 0 : fd_txncache_private_slotcache_t * slotcache = fd_txncache_get_slotcache( tc );
505 0 : for( ulong i=0UL; i<tc->live_slots_max; i++ ) {
506 0 : if( FD_LIKELY( slotcache[ i ].slot==FD_TXNCACHE_EMPTY_ENTRY || slotcache[ i ].slot==FD_TXNCACHE_TOMBSTONE_ENTRY || slotcache[ i ].slot>slot ) ) continue;
507 0 : fd_txncache_remove_slotcache_idx( tc, i );
508 0 : }
509 0 : }
510 :
511 : /* fd_txncache_register_root_slot_private is a helper function that
512 : actually registers the root. This function assumes that the
513 : caller has already obtained a lock to the status cache. */
514 :
515 : static void
516 : fd_txncache_register_root_slot_private( fd_txncache_t * tc,
517 0 : ulong slot ) {
518 :
519 0 : ulong * root_slots = fd_txncache_get_root_slots( tc );
520 0 : ulong idx;
521 0 : for( idx=0UL; idx<tc->root_slots_cnt; idx++ ) {
522 0 : if( FD_UNLIKELY( root_slots[ idx ]==slot ) ) return;
523 0 : if( FD_UNLIKELY( root_slots[ idx ]>slot ) ) break;
524 0 : }
525 :
526 0 : if( FD_UNLIKELY( tc->root_slots_cnt>=tc->root_slots_max ) ) {
527 0 : if( FD_LIKELY( idx ) ) {
528 0 : fd_txncache_purge_slot( tc, root_slots[ 0 ] );
529 0 : memmove( root_slots, root_slots+1UL, (idx-1UL)*sizeof(ulong) );
530 0 : root_slots[ (idx-1UL) ] = slot;
531 0 : } else {
532 0 : fd_txncache_purge_slot( tc, slot );
533 0 : }
534 0 : } else {
535 0 : if( FD_UNLIKELY( idx<tc->root_slots_cnt ) ) {
536 0 : memmove( root_slots+idx+1UL, root_slots+idx, (tc->root_slots_cnt-idx)*sizeof(ulong) );
537 0 : }
538 0 : root_slots[ idx ] = slot;
539 0 : tc->root_slots_cnt++;
540 0 : }
541 0 : }
542 :
543 : void
544 : fd_txncache_register_root_slot( fd_txncache_t * tc,
545 0 : ulong slot ) {
546 :
547 0 : fd_rwlock_write( tc->lock );
548 :
549 0 : fd_txncache_register_root_slot_private( tc, slot );
550 :
551 0 : fd_rwlock_unwrite( tc->lock );
552 0 : }
553 :
554 : void
555 : fd_txncache_root_slots( fd_txncache_t * tc,
556 0 : ulong * out_slots ) {
557 0 : fd_rwlock_write( tc->lock );
558 0 : ulong * root_slots = fd_txncache_get_root_slots( tc );
559 0 : memcpy( out_slots, root_slots, tc->root_slots_max*sizeof(ulong) );
560 0 : fd_rwlock_unwrite( tc->lock );
561 0 : }
562 :
563 0 : #define FD_TXNCACHE_FIND_FOUND (0)
564 0 : #define FD_TXNCACHE_FIND_FOUNDEMPTY (1)
565 0 : #define FD_TXNCACHE_FIND_FULL (2)
566 :
567 : static int
568 : fd_txncache_find_blockhash( fd_txncache_t const * tc,
569 : uchar const blockhash[ static 32 ],
570 : uint is_insert,
571 0 : fd_txncache_private_blockcache_t ** out_blockcache ) {
572 0 : ulong hash = FD_LOAD( ulong, blockhash );
573 0 : fd_txncache_private_blockcache_t * tc_blockcache = fd_txncache_get_blockcache_const( tc );
574 0 : ulong * probed_entries = fd_txncache_get_probed_entries_const( tc );
575 0 : ulong first_tombstone = ULONG_MAX;
576 :
577 0 : for( ulong i=0UL; i<tc->live_slots_max; i++ ) {
578 0 : ulong blockcache_idx = (hash+i)%tc->live_slots_max;
579 0 : fd_txncache_private_blockcache_t * blockcache = &tc_blockcache[ blockcache_idx ];
580 :
581 0 : if( FD_UNLIKELY( blockcache->max_slot==FD_TXNCACHE_EMPTY_ENTRY ) ) {
582 0 : if( first_tombstone != ULONG_MAX ) {
583 0 : *out_blockcache = &tc_blockcache[ first_tombstone ];
584 0 : return FD_TXNCACHE_FIND_FOUNDEMPTY;
585 0 : }
586 0 : *out_blockcache = blockcache;
587 0 : return FD_TXNCACHE_FIND_FOUNDEMPTY;
588 0 : } else if ( blockcache->max_slot==FD_TXNCACHE_TOMBSTONE_ENTRY ) {
589 0 : if( is_insert && first_tombstone == ULONG_MAX ) {
590 0 : first_tombstone = blockcache_idx;
591 0 : }
592 0 : continue;
593 0 : }
594 :
595 0 : while( FD_UNLIKELY( blockcache->max_slot==FD_TXNCACHE_TEMP_ENTRY ) ) {
596 0 : FD_SPIN_PAUSE();
597 0 : }
598 0 : FD_COMPILER_MFENCE(); /* Prevent reordering of the blockhash read to before the atomic lock
599 : (highest_slot) has been fully released by the writer. */
600 0 : if( FD_LIKELY( !memcmp( blockcache->blockhash, blockhash, 32UL ) ) ) {
601 0 : *out_blockcache = blockcache;
602 0 : if( is_insert ) {
603 : /* Undo the probed entry changes since we found the blockhash. */
604 0 : for( ulong j=hash%tc->live_slots_max; j!=fd_ulong_min(first_tombstone, blockcache_idx); ) {
605 0 : probed_entries[ j ]--;
606 0 : j = (j+1)%tc->live_slots_max;
607 0 : }
608 0 : }
609 0 : return FD_TXNCACHE_FIND_FOUND;
610 0 : }
611 : /* If the entry we are passing is full and we haven't seen tombstones,
612 : there is an overflow. */
613 0 : if( is_insert && first_tombstone == ULONG_MAX ) {
614 0 : probed_entries[ blockcache_idx ]++;
615 0 : }
616 0 : }
617 :
618 0 : if( first_tombstone != ULONG_MAX ) {
619 0 : *out_blockcache = &tc_blockcache[ first_tombstone ];
620 0 : return FD_TXNCACHE_FIND_FOUNDEMPTY;
621 0 : }
622 0 : return FD_TXNCACHE_FIND_FULL;
623 0 : }
624 :
625 : static int
626 : fd_txncache_find_slot( fd_txncache_t const * tc,
627 : ulong slot,
628 : uint is_insert,
629 0 : fd_txncache_private_slotcache_t ** out_slotcache ) {
630 0 : fd_txncache_private_slotcache_t * tc_slotcache = fd_txncache_get_slotcache_const( tc );
631 0 : for( ulong i=0UL; i<tc->live_slots_max; i++ ) {
632 0 : ulong slotcache_idx = (slot+i)%tc->live_slots_max;
633 0 : fd_txncache_private_slotcache_t * slotcache = &tc_slotcache[ slotcache_idx ];
634 0 : if( FD_UNLIKELY( slotcache->slot==FD_TXNCACHE_EMPTY_ENTRY ) ) {
635 0 : *out_slotcache = slotcache;
636 0 : return FD_TXNCACHE_FIND_FOUNDEMPTY;
637 0 : } else if( FD_UNLIKELY( slotcache->slot==FD_TXNCACHE_TOMBSTONE_ENTRY ) ) {
638 0 : if( is_insert ) {
639 0 : *out_slotcache = slotcache;
640 0 : return FD_TXNCACHE_FIND_FOUNDEMPTY;
641 0 : }
642 0 : continue;
643 0 : }
644 0 : while( FD_UNLIKELY( slotcache->slot==FD_TXNCACHE_TEMP_ENTRY ) ) {
645 0 : FD_SPIN_PAUSE();
646 0 : }
647 0 : FD_COMPILER_MFENCE(); /* Prevent reordering of the slot read to before the atomic lock
648 : (slot) has been fully released by the writer. */
649 0 : if( FD_LIKELY( slotcache->slot==slot ) ) {
650 0 : *out_slotcache = slotcache;
651 0 : return FD_TXNCACHE_FIND_FOUND;
652 0 : }
653 0 : }
654 0 : return FD_TXNCACHE_FIND_FULL;
655 0 : }
656 :
657 : static int
658 : fd_txncache_find_slot_blockhash( fd_txncache_private_slotcache_t * slotcache,
659 : uchar const blockhash[ static 32 ],
660 0 : fd_txncache_private_slotblockcache_t ** out_slotblockcache ) {
661 0 : ulong hash = FD_LOAD( ulong, blockhash );
662 0 : for( ulong i=0UL; i<300UL; i++ ) {
663 0 : ulong slotblockcache_idx = (hash+i)%300UL;
664 0 : fd_txncache_private_slotblockcache_t * slotblockcache = &slotcache->blockcache[ slotblockcache_idx ];
665 0 : if( FD_UNLIKELY( slotblockcache->txnhash_offset==ULONG_MAX ) ) {
666 0 : *out_slotblockcache = slotblockcache;
667 0 : return FD_TXNCACHE_FIND_FOUNDEMPTY;
668 0 : }
669 0 : while( FD_UNLIKELY( slotblockcache->txnhash_offset==ULONG_MAX-1UL ) ) {
670 0 : FD_SPIN_PAUSE();
671 0 : }
672 0 : FD_COMPILER_MFENCE(); /* Prevent reordering of the blockhash read to before the atomic lock
673 : (txnhash_offset) has been fully released by the writer. */
674 0 : if( FD_LIKELY( !memcmp( slotblockcache->blockhash, blockhash, 32UL ) ) ) {
675 0 : *out_slotblockcache = slotblockcache;
676 0 : return FD_TXNCACHE_FIND_FOUND;
677 0 : }
678 0 : }
679 0 : return FD_TXNCACHE_FIND_FULL;
680 0 : }
681 :
682 : static int
683 : fd_txncache_ensure_blockcache( fd_txncache_t * tc,
684 : uchar const blockhash[ static 32 ],
685 0 : fd_txncache_private_blockcache_t ** out_blockcache ) {
686 0 : for(;;) {
687 0 : int blockcache_find = fd_txncache_find_blockhash( tc, blockhash, 1, out_blockcache );
688 0 : if( FD_LIKELY( blockcache_find==FD_TXNCACHE_FIND_FOUND ) ) return 1;
689 0 : else if( FD_UNLIKELY( blockcache_find==FD_TXNCACHE_FIND_FULL ) ) return 0;
690 :
691 0 : if( FD_LIKELY( FD_ATOMIC_CAS( &(*out_blockcache)->max_slot, FD_TXNCACHE_EMPTY_ENTRY, FD_TXNCACHE_TEMP_ENTRY ) ||
692 0 : FD_ATOMIC_CAS( &(*out_blockcache)->max_slot, FD_TXNCACHE_TOMBSTONE_ENTRY, FD_TXNCACHE_TEMP_ENTRY ) ) ) {
693 0 : memcpy( (*out_blockcache)->blockhash, blockhash, 32UL );
694 0 : memset( (*out_blockcache)->heads, 0xFF, FD_TXNCACHE_BLOCKCACHE_MAP_CNT*sizeof(uint) );
695 0 : (*out_blockcache)->pages_cnt = 0;
696 0 : (*out_blockcache)->txnhash_offset = 0UL;
697 0 : memset( (*out_blockcache)->pages, 0xFF, tc->txnpages_per_blockhash_max*sizeof(uint) );
698 0 : FD_COMPILER_MFENCE();
699 : /* Set it to max unreserved value possible */
700 0 : (*out_blockcache)->max_slot = ULONG_MAX-3UL;
701 0 : return 1;
702 0 : }
703 0 : FD_SPIN_PAUSE();
704 0 : }
705 0 : }
706 :
707 : static int
708 : fd_txncache_ensure_slotcache( fd_txncache_t * tc,
709 : ulong slot,
710 0 : fd_txncache_private_slotcache_t ** out_slotcache ) {
711 0 : for(;;) {
712 0 : int slotcache_find = fd_txncache_find_slot( tc, slot, 1, out_slotcache );
713 0 : if( FD_LIKELY( slotcache_find==FD_TXNCACHE_FIND_FOUND ) ) return 1;
714 0 : else if( FD_UNLIKELY( slotcache_find==FD_TXNCACHE_FIND_FULL ) ) return 0;
715 :
716 0 : if( FD_LIKELY( FD_ATOMIC_CAS( &(*out_slotcache)->slot, FD_TXNCACHE_EMPTY_ENTRY, FD_TXNCACHE_TEMP_ENTRY ) ||
717 0 : FD_ATOMIC_CAS( &(*out_slotcache)->slot, FD_TXNCACHE_TOMBSTONE_ENTRY, FD_TXNCACHE_TEMP_ENTRY ) ) ) {
718 0 : for( ulong i=0UL; i<300UL; i++ ) {
719 0 : (*out_slotcache)->blockcache[ i ].txnhash_offset = ULONG_MAX;
720 0 : }
721 0 : FD_COMPILER_MFENCE();
722 0 : (*out_slotcache)->slot = slot;
723 0 : return 1;
724 0 : }
725 0 : FD_SPIN_PAUSE();
726 0 : }
727 0 : }
728 :
729 : static int
730 : fd_txncache_ensure_slotblockcache( fd_txncache_private_slotcache_t * slotcache,
731 : uchar const blockhash[ static 32 ],
732 0 : fd_txncache_private_slotblockcache_t ** out_slotblockcache ) {
733 0 : for(;;) {
734 0 : int slotblockcache_find = fd_txncache_find_slot_blockhash( slotcache, blockhash, out_slotblockcache );
735 0 : if( FD_LIKELY( slotblockcache_find==FD_TXNCACHE_FIND_FOUND ) ) return 1;
736 0 : else if( FD_UNLIKELY( slotblockcache_find==FD_TXNCACHE_FIND_FULL ) ) return 0;
737 :
738 0 : if( FD_LIKELY( FD_ATOMIC_CAS( &(*out_slotblockcache)->txnhash_offset, ULONG_MAX, ULONG_MAX-1UL ) ) ) {
739 0 : memcpy( (*out_slotblockcache)->blockhash, blockhash, 32UL );
740 0 : memset( (*out_slotblockcache)->heads, 0xFF, FD_TXNCACHE_SLOTCACHE_MAP_CNT*sizeof(uint) );
741 0 : FD_COMPILER_MFENCE();
742 0 : (*out_slotblockcache)->txnhash_offset = 0UL;
743 0 : return 1;
744 0 : }
745 0 : FD_SPIN_PAUSE();
746 0 : }
747 0 : }
748 :
749 : static fd_txncache_private_txnpage_t *
750 : fd_txncache_ensure_txnpage( fd_txncache_t * tc,
751 0 : fd_txncache_private_blockcache_t * blockcache ) {
752 0 : ushort page_cnt = blockcache->pages_cnt;
753 0 : if( FD_UNLIKELY( page_cnt>tc->txnpages_per_blockhash_max ) ) return NULL;
754 0 : fd_txncache_private_txnpage_t * txnpages = fd_txncache_get_txnpages( tc );
755 :
756 0 : if( FD_LIKELY( page_cnt ) ) {
757 0 : uint txnpage_idx = blockcache->pages[ page_cnt-1 ];
758 0 : ushort txnpage_free = txnpages[ txnpage_idx ].free;
759 0 : if( FD_LIKELY( txnpage_free ) ) {
760 0 : return &txnpages[ txnpage_idx ];
761 0 : }
762 0 : }
763 :
764 0 : if( FD_UNLIKELY( page_cnt==tc->txnpages_per_blockhash_max ) ) return NULL;
765 0 : if( FD_LIKELY( FD_ATOMIC_CAS( &blockcache->pages[ page_cnt ], UINT_MAX, UINT_MAX-1UL )==UINT_MAX ) ) {
766 0 : ulong txnpages_free_cnt = tc->txnpages_free_cnt;
767 0 : for(;;) {
768 0 : if( FD_UNLIKELY( !txnpages_free_cnt ) ) return NULL;
769 0 : ulong old_txnpages_free_cnt = FD_ATOMIC_CAS( &tc->txnpages_free_cnt, (uint)txnpages_free_cnt, (uint)(txnpages_free_cnt-1UL) );
770 0 : if( FD_LIKELY( old_txnpages_free_cnt==txnpages_free_cnt ) ) break;
771 0 : txnpages_free_cnt = old_txnpages_free_cnt;
772 0 : FD_SPIN_PAUSE();
773 0 : }
774 0 : uint * txnpages_free = fd_txncache_get_txnpages_free( tc );
775 :
776 0 : uint txnpage_idx = txnpages_free[ txnpages_free_cnt-1UL ];
777 0 : fd_txncache_private_txnpage_t * txnpage = &txnpages[ txnpage_idx ];
778 0 : txnpage->free = FD_TXNCACHE_TXNS_PER_PAGE;
779 0 : FD_COMPILER_MFENCE();
780 0 : blockcache->pages[ page_cnt ] = txnpage_idx;
781 0 : FD_COMPILER_MFENCE();
782 0 : blockcache->pages_cnt = (ushort)(page_cnt+1);
783 0 : return txnpage;
784 0 : } else {
785 0 : uint txnpage_idx = blockcache->pages[ page_cnt ];
786 0 : while( FD_UNLIKELY( txnpage_idx>=UINT_MAX-1UL ) ) {
787 0 : txnpage_idx = blockcache->pages[ page_cnt ];
788 0 : FD_SPIN_PAUSE();
789 0 : }
790 0 : return &txnpages[ txnpage_idx ];
791 0 : }
792 0 : }
793 :
794 : static int
795 : fd_txncache_insert_txn( fd_txncache_t * tc,
796 : fd_txncache_private_blockcache_t * blockcache,
797 : fd_txncache_private_slotblockcache_t * slotblockcache,
798 : fd_txncache_private_txnpage_t * txnpage,
799 0 : fd_txncache_insert_t const * txn ) {
800 0 : fd_txncache_private_txnpage_t * txnpages = fd_txncache_get_txnpages( tc );
801 0 : ulong txnpage_idx = (ulong)(txnpage - txnpages);
802 :
803 0 : for(;;) {
804 0 : ushort txnpage_free = txnpage->free;
805 0 : if( FD_UNLIKELY( !txnpage_free ) ) return 0;
806 0 : if( FD_UNLIKELY( FD_ATOMIC_CAS( &txnpage->free, txnpage_free, txnpage_free-1UL )!=txnpage_free ) ) continue;
807 :
808 0 : ulong txn_idx = FD_TXNCACHE_TXNS_PER_PAGE-txnpage_free;
809 0 : ulong txnhash_offset = blockcache->txnhash_offset;
810 0 : ulong txnhash = FD_LOAD( ulong, txn->txnhash+txnhash_offset );
811 0 : memcpy( txnpage->txns[ txn_idx ]->txnhash, txn->txnhash+txnhash_offset, 20UL );
812 0 : txnpage->txns[ txn_idx ]->result = *txn->result;
813 0 : txnpage->txns[ txn_idx ]->slot = txn->slot;
814 0 : FD_COMPILER_MFENCE();
815 :
816 0 : for(;;) {
817 0 : ulong txn_bucket = txnhash%FD_TXNCACHE_BLOCKCACHE_MAP_CNT;
818 0 : uint head = blockcache->heads[ txn_bucket ];
819 0 : txnpage->txns[ txn_idx ]->blockcache_next = head;
820 0 : FD_COMPILER_MFENCE();
821 0 : if( FD_LIKELY( FD_ATOMIC_CAS( &blockcache->heads[ txn_bucket ], head, (uint)(FD_TXNCACHE_TXNS_PER_PAGE*txnpage_idx+txn_idx) )==head ) ) break;
822 0 : FD_SPIN_PAUSE();
823 0 : }
824 :
825 0 : for(;;) {
826 0 : ulong txn_bucket = txnhash%FD_TXNCACHE_SLOTCACHE_MAP_CNT;
827 0 : uint head = slotblockcache->heads[ txn_bucket ];
828 0 : txnpage->txns[ txn_idx ]->slotblockcache_next = head;
829 0 : FD_COMPILER_MFENCE();
830 0 : if( FD_LIKELY( FD_ATOMIC_CAS( &slotblockcache->heads[ txn_bucket ], head, (uint)(FD_TXNCACHE_TXNS_PER_PAGE*txnpage_idx+txn_idx) )==head ) ) break;
831 0 : FD_SPIN_PAUSE();
832 0 : }
833 :
834 0 : for(;;) {
835 0 : ulong max_slot = blockcache->max_slot;
836 :
837 0 : if( FD_UNLIKELY( txn->slot<=max_slot && max_slot != ULONG_MAX-3UL) ) break;
838 0 : if( FD_LIKELY( FD_ATOMIC_CAS( &blockcache->max_slot, max_slot, txn->slot )==max_slot ) ) break;
839 0 : FD_SPIN_PAUSE();
840 0 : }
841 0 : return 1;
842 0 : }
843 0 : }
844 :
845 : int
846 : fd_txncache_insert_batch( fd_txncache_t * tc,
847 : fd_txncache_insert_t const * txns,
848 0 : ulong txns_cnt ) {
849 0 : fd_rwlock_read( tc->lock );
850 :
851 0 : for( ulong i=0UL; i<txns_cnt; i++ ) {
852 0 : fd_txncache_private_blockcache_t * blockcache;
853 0 : if( FD_UNLIKELY( !fd_txncache_ensure_blockcache( tc, txns[ i ].blockhash, &blockcache ) ) ) {
854 0 : FD_LOG_WARNING(( "no blockcache found" ));
855 0 : goto unlock_fail;
856 0 : }
857 :
858 0 : fd_txncache_private_slotcache_t * slotcache;
859 0 : if( FD_UNLIKELY( !fd_txncache_ensure_slotcache( tc, txns[ i ].slot, &slotcache ) ) ) {
860 0 : FD_LOG_WARNING(( "no slotcache found" ));
861 0 : goto unlock_fail;
862 0 : }
863 :
864 0 : fd_txncache_private_slotblockcache_t * slotblockcache;
865 0 : if( FD_UNLIKELY( !fd_txncache_ensure_slotblockcache( slotcache, txns[ i ].blockhash, &slotblockcache ) ) ) {
866 0 : FD_LOG_WARNING(( "no slotblockcache found" ));
867 0 : goto unlock_fail;
868 0 : }
869 :
870 0 : for(;;) {
871 0 : fd_txncache_private_txnpage_t * txnpage = fd_txncache_ensure_txnpage( tc, blockcache );
872 0 : if( FD_UNLIKELY( !txnpage ) ) {
873 0 : goto unlock_fail;
874 0 : FD_LOG_WARNING(( "no txnpage found" ));
875 0 : }
876 :
877 0 : int success = fd_txncache_insert_txn( tc, blockcache, slotblockcache, txnpage, &txns[ i ] );
878 0 : if( FD_LIKELY( success ) ) break;
879 0 : FD_SPIN_PAUSE();
880 0 : }
881 0 : }
882 :
883 0 : fd_rwlock_unread( tc->lock );
884 0 : return 1;
885 :
886 0 : unlock_fail:
887 0 : fd_rwlock_unread( tc->lock );
888 0 : return 0;
889 0 : }
890 :
891 : void
892 : fd_txncache_query_batch( fd_txncache_t * tc,
893 : fd_txncache_query_t const * queries,
894 : ulong queries_cnt,
895 : void * query_func_ctx,
896 : int ( * query_func )( ulong slot, void * ctx ),
897 0 : int * out_results ) {
898 0 : fd_rwlock_read( tc->lock );
899 0 : fd_txncache_private_txnpage_t * txnpages = fd_txncache_get_txnpages( tc );
900 0 : for( ulong i=0UL; i<queries_cnt; i++ ) {
901 0 : out_results[ i ] = 0;
902 :
903 0 : fd_txncache_query_t const * query = &queries[ i ];
904 0 : fd_txncache_private_blockcache_t * blockcache;
905 0 : int result = fd_txncache_find_blockhash( tc, query->blockhash, 0, &blockcache );
906 :
907 0 : if( FD_UNLIKELY( result!=FD_TXNCACHE_FIND_FOUND ) ) {
908 0 : continue;
909 0 : }
910 :
911 0 : ulong txnhash_offset = blockcache->txnhash_offset;
912 0 : ulong head_hash = FD_LOAD( ulong, query->txnhash+txnhash_offset ) % FD_TXNCACHE_BLOCKCACHE_MAP_CNT;
913 0 : for( uint head=blockcache->heads[ head_hash ]; head!=UINT_MAX; head=txnpages[ head/FD_TXNCACHE_TXNS_PER_PAGE ].txns[ head%FD_TXNCACHE_TXNS_PER_PAGE ]->blockcache_next ) {
914 0 : fd_txncache_private_txn_t * txn = txnpages[ head/FD_TXNCACHE_TXNS_PER_PAGE ].txns[ head%FD_TXNCACHE_TXNS_PER_PAGE ];
915 0 : if( FD_LIKELY( !memcmp( query->txnhash+txnhash_offset, txn->txnhash, 20UL ) ) ) {
916 0 : if( FD_LIKELY( !query_func || query_func( txn->slot, query_func_ctx ) ) ) {
917 0 : out_results[ i ] = 1;
918 0 : break;
919 0 : }
920 0 : }
921 0 : }
922 0 : }
923 :
924 0 : fd_rwlock_unread( tc->lock );
925 0 : }
926 :
927 : int
928 : fd_txncache_snapshot( fd_txncache_t * tc,
929 : void * ctx,
930 0 : int ( * write )( uchar const * data, ulong data_sz, void * ctx ) ) {
931 0 : if( !write ) {
932 0 : FD_LOG_WARNING(("No write method provided to snapshotter"));
933 0 : return 1;
934 0 : }
935 0 : fd_rwlock_read( tc->lock );
936 :
937 0 : fd_txncache_private_txnpage_t * txnpages = fd_txncache_get_txnpages( tc );
938 0 : ulong * root_slots = fd_txncache_get_root_slots( tc );
939 0 : for( ulong i=0UL; i<tc->root_slots_cnt; i++ ) {
940 0 : ulong slot = root_slots[ i ];
941 :
942 0 : fd_txncache_private_slotcache_t * slotcache;
943 0 : if( FD_UNLIKELY( FD_TXNCACHE_FIND_FOUND!=fd_txncache_find_slot( tc, slot, 0, &slotcache ) ) ) continue;
944 :
945 0 : for( ulong j=0UL; j<300UL; j++ ) {
946 0 : fd_txncache_private_slotblockcache_t * slotblockcache = &slotcache->blockcache[ j ];
947 0 : if( FD_UNLIKELY( slotblockcache->txnhash_offset>=ULONG_MAX-1UL ) ) continue;
948 :
949 0 : for( ulong k=0UL; k<FD_TXNCACHE_SLOTCACHE_MAP_CNT; k++ ) {
950 0 : uint head = slotblockcache->heads[ k ];
951 0 : for( ; head!=UINT_MAX; head=txnpages[ head/FD_TXNCACHE_TXNS_PER_PAGE ].txns[ head%FD_TXNCACHE_TXNS_PER_PAGE ]->slotblockcache_next ) {
952 0 : fd_txncache_private_txn_t * txn = txnpages[ head/FD_TXNCACHE_TXNS_PER_PAGE ].txns[ head%FD_TXNCACHE_TXNS_PER_PAGE ];
953 :
954 0 : fd_txncache_snapshot_entry_t entry = {
955 0 : .slot = slot,
956 0 : .txn_idx = slotblockcache->txnhash_offset,
957 0 : .result = txn->result
958 0 : };
959 0 : fd_memcpy( entry.blockhash, slotblockcache->blockhash, 32 );
960 0 : fd_memcpy( entry.txnhash, txn->txnhash, 20 );
961 0 : int err = write( (uchar*)&entry, sizeof(fd_txncache_snapshot_entry_t), ctx );
962 0 : if( err ) {
963 0 : fd_rwlock_unread( tc->lock );
964 0 : return err;
965 0 : }
966 0 : }
967 0 : }
968 0 : }
969 0 : }
970 :
971 0 : fd_rwlock_unread( tc->lock );
972 0 : return 0;
973 0 : }
974 :
975 : int
976 : fd_txncache_set_txnhash_offset( fd_txncache_t * tc,
977 : ulong slot,
978 : uchar blockhash[ 32 ],
979 0 : ulong txnhash_offset ) {
980 0 : fd_rwlock_read( tc->lock );
981 0 : fd_txncache_private_blockcache_t * blockcache;
982 0 : if( FD_UNLIKELY( !fd_txncache_ensure_blockcache( tc, blockhash, &blockcache ) ) ) goto unlock_fail;
983 :
984 0 : blockcache->txnhash_offset = txnhash_offset;
985 0 : fd_txncache_private_slotcache_t * slotcache;
986 0 : if( FD_UNLIKELY( !fd_txncache_ensure_slotcache( tc, slot, &slotcache ) ) ) goto unlock_fail;
987 :
988 0 : fd_txncache_private_slotblockcache_t * slotblockcache;
989 0 : if( FD_UNLIKELY( !fd_txncache_ensure_slotblockcache( slotcache, blockhash, &slotblockcache ) ) ) goto unlock_fail;
990 0 : slotblockcache->txnhash_offset = txnhash_offset;
991 :
992 0 : fd_rwlock_unread( tc->lock );
993 0 : return 0;
994 :
995 0 : unlock_fail:
996 0 : fd_rwlock_unread( tc->lock );
997 0 : return 1;
998 0 : }
999 :
1000 : int
1001 : fd_txncache_is_rooted_slot( fd_txncache_t * tc,
1002 0 : ulong slot ) {
1003 0 : fd_rwlock_read( tc->lock );
1004 :
1005 0 : ulong * root_slots = fd_txncache_get_root_slots( tc );
1006 0 : for( ulong idx=0UL; idx<tc->root_slots_cnt; idx++ ) {
1007 0 : if( FD_UNLIKELY( root_slots[ idx ]==slot ) ) {
1008 0 : fd_rwlock_unread( tc->lock );
1009 0 : return 1;
1010 0 : }
1011 0 : if( FD_UNLIKELY( root_slots[ idx ]>slot ) ) break;
1012 0 : }
1013 :
1014 0 : fd_rwlock_unread( tc->lock );
1015 0 : return 0;
1016 0 : }
1017 :
1018 : int
1019 : fd_txncache_get_entries( fd_txncache_t * tc,
1020 : fd_bank_slot_deltas_t * slot_deltas,
1021 0 : fd_spad_t * spad ) {
1022 :
1023 0 : fd_rwlock_read( tc->lock );
1024 :
1025 0 : slot_deltas->slot_deltas_len = tc->root_slots_cnt;
1026 0 : slot_deltas->slot_deltas = fd_spad_alloc( spad, FD_SLOT_DELTA_ALIGN, tc->root_slots_cnt * sizeof(fd_slot_delta_t) );
1027 :
1028 0 : fd_txncache_private_txnpage_t * txnpages = fd_txncache_get_txnpages( tc );
1029 0 : ulong * root_slots = fd_txncache_get_root_slots( tc );
1030 :
1031 0 : for( ulong i=0UL; i<tc->root_slots_cnt; i++ ) {
1032 0 : ulong slot = root_slots[ i ];
1033 :
1034 0 : slot_deltas->slot_deltas[ i ].slot = slot;
1035 0 : slot_deltas->slot_deltas[ i ].is_root = 1;
1036 0 : slot_deltas->slot_deltas[ i ].slot_delta_vec = fd_spad_alloc( spad, FD_STATUS_PAIR_ALIGN, FD_TXNCACHE_DEFAULT_MAX_ROOTED_SLOTS * sizeof(fd_status_pair_t) );
1037 0 : slot_deltas->slot_deltas[ i ].slot_delta_vec_len = 0UL;
1038 0 : ulong slot_delta_vec_len = 0UL;
1039 :
1040 0 : fd_txncache_private_slotcache_t * slotcache;
1041 0 : if( FD_UNLIKELY( FD_TXNCACHE_FIND_FOUND!=fd_txncache_find_slot( tc, slot, 0, &slotcache ) ) ) {
1042 0 : continue;
1043 0 : }
1044 :
1045 0 : for( ulong j=0UL; j<FD_TXNCACHE_DEFAULT_MAX_ROOTED_SLOTS; j++ ) {
1046 0 : fd_txncache_private_slotblockcache_t * slotblockcache = &slotcache->blockcache[ j ];
1047 0 : if( FD_UNLIKELY( slotblockcache->txnhash_offset>=ULONG_MAX-1UL ) ) {
1048 0 : continue;
1049 0 : }
1050 0 : fd_status_pair_t * status_pair = &slot_deltas->slot_deltas[ i ].slot_delta_vec[ slot_delta_vec_len++ ];
1051 0 : fd_memcpy( &status_pair->hash, slotblockcache->blockhash, sizeof(fd_hash_t) );
1052 0 : status_pair->value.txn_idx = slotblockcache->txnhash_offset;
1053 :
1054 : /* First count through the number of etnries you expect to encounter
1055 : and size out the data structure to store them. */
1056 :
1057 0 : ulong num_statuses = 0UL;
1058 0 : for( ulong k=0UL; k<FD_TXNCACHE_SLOTCACHE_MAP_CNT; k++ ) {
1059 0 : uint head = slotblockcache->heads[ k ];
1060 0 : for( ; head!=UINT_MAX; head=txnpages[ head/FD_TXNCACHE_TXNS_PER_PAGE ].txns[ head%FD_TXNCACHE_TXNS_PER_PAGE ]->slotblockcache_next ) {
1061 0 : num_statuses++;
1062 0 : }
1063 0 : }
1064 :
1065 0 : status_pair->value.statuses_len = num_statuses;
1066 0 : status_pair->value.statuses = fd_spad_alloc( spad, FD_CACHE_STATUS_ALIGN, num_statuses * sizeof(fd_cache_status_t) );
1067 0 : fd_memset( status_pair->value.statuses, 0, num_statuses * sizeof(fd_cache_status_t) );
1068 :
1069 : /* Copy over every entry for the given slot into the slot deltas. */
1070 :
1071 0 : num_statuses = 0UL;
1072 0 : for( ulong k=0UL; k<FD_TXNCACHE_SLOTCACHE_MAP_CNT; k++ ) {
1073 0 : uint head = slotblockcache->heads[ k ];
1074 0 : for( ; head!=UINT_MAX; head=txnpages[ head/FD_TXNCACHE_TXNS_PER_PAGE ].txns[ head%FD_TXNCACHE_TXNS_PER_PAGE ]->slotblockcache_next ) {
1075 0 : fd_txncache_private_txn_t * txn = txnpages[ head/FD_TXNCACHE_TXNS_PER_PAGE ].txns[ head%FD_TXNCACHE_TXNS_PER_PAGE ];
1076 0 : fd_memcpy( status_pair->value.statuses[ num_statuses ].key_slice, txn->txnhash, 20 );
1077 0 : status_pair->value.statuses[ num_statuses++ ].result.discriminant = txn->result;
1078 0 : }
1079 0 : }
1080 0 : }
1081 0 : slot_deltas->slot_deltas[ i ].slot_delta_vec_len = slot_delta_vec_len;
1082 0 : }
1083 :
1084 0 : fd_rwlock_unread( tc->lock );
1085 :
1086 0 : return 0;
1087 :
1088 0 : }
|