Line data Source code
1 : /* fd_snapin_tile_funk.c contains APIs to load accounts into funk. */
2 :
3 : #include "fd_snapin_tile_private.h"
4 : #include "../../flamenco/accdb/fd_accdb_sync.h"
5 : #include "../../funk/fd_funk.h"
6 :
7 : int
8 : fd_snapin_process_account_header_funk( fd_snapin_tile_t * ctx,
9 0 : fd_ssparse_advance_result_t * result ) {
10 0 : fd_funk_t * funk = ctx->funk;
11 :
12 0 : fd_funk_rec_key_t id = FD_LOAD( fd_funk_rec_key_t, result->account_header.pubkey );
13 0 : fd_funk_rec_query_t query[1];
14 0 : fd_funk_rec_t * rec = fd_funk_rec_query_try( funk, ctx->xid, &id, query );
15 0 : fd_funk_rec_t const * existing_rec = rec;
16 :
17 0 : ctx->metrics.accounts_loaded++;
18 :
19 0 : int early_exit = 0;
20 0 : if( !ctx->full && !existing_rec ) {
21 0 : existing_rec = fd_funk_rec_query_try( funk, fd_funk_root( funk ), &id, query );
22 0 : }
23 0 : if( FD_UNLIKELY( existing_rec ) ) {
24 0 : fd_account_meta_t * meta = fd_funk_val( existing_rec, funk->wksp );
25 0 : if( FD_UNLIKELY( meta ) ) {
26 0 : if( FD_LIKELY( meta->slot>result->account_header.slot ) ) {
27 0 : ctx->acc_data = NULL;
28 0 : ctx->metrics.accounts_ignored++;
29 0 : fd_snapin_send_duplicate_account( ctx, result->account_header.lamports, NULL, result->account_header.data_len, (uchar)result->account_header.executable, result->account_header.owner, result->account_header.pubkey, 0, &early_exit );
30 0 : return early_exit;
31 0 : }
32 0 : ctx->metrics.accounts_replaced++;
33 0 : fd_snapin_send_duplicate_account( ctx, meta->lamports, (uchar const *)meta + sizeof(fd_account_meta_t), meta->dlen, meta->executable, meta->owner, result->account_header.pubkey, 1, &early_exit);
34 0 : }
35 0 : }
36 :
37 0 : int should_publish = 0;
38 0 : fd_funk_rec_prepare_t prepare[1];
39 0 : if( FD_LIKELY( !rec ) ) {
40 0 : should_publish = 1;
41 0 : rec = fd_funk_rec_prepare( funk, ctx->xid, &id, prepare, NULL );
42 0 : FD_TEST( rec );
43 0 : }
44 :
45 0 : fd_account_meta_t * meta = fd_funk_val( rec, funk->wksp );
46 : /* Allocate data space from heap, free old value (if any) */
47 0 : fd_funk_val_flush( rec, funk->alloc, funk->wksp );
48 0 : ulong const alloc_sz = sizeof(fd_account_meta_t)+result->account_header.data_len;
49 0 : ulong alloc_max;
50 0 : meta = fd_alloc_malloc_at_least( funk->alloc, 16UL, alloc_sz, &alloc_max );
51 0 : if( FD_UNLIKELY( !meta ) ) FD_LOG_ERR(( "Ran out of heap memory while loading snapshot (increase [funk.heap_size_gib])" ));
52 0 : memset( meta, 0, sizeof(fd_account_meta_t) );
53 0 : rec->val_gaddr = fd_wksp_gaddr_fast( funk->wksp, meta );
54 0 : rec->val_max = (uint)( fd_ulong_min( alloc_max, FD_FUNK_REC_VAL_MAX ) & FD_FUNK_REC_VAL_MAX );
55 0 : rec->val_sz = (uint)( alloc_sz & FD_FUNK_REC_VAL_MAX );
56 :
57 0 : meta->dlen = (uint)result->account_header.data_len;
58 0 : meta->slot = result->account_header.slot;
59 0 : memcpy( meta->owner, result->account_header.owner, sizeof(fd_pubkey_t) );
60 0 : meta->lamports = result->account_header.lamports;
61 0 : meta->executable = (uchar)result->account_header.executable;
62 :
63 0 : ctx->acc_data = (uchar*)meta + sizeof(fd_account_meta_t);
64 :
65 0 : if( FD_LIKELY( should_publish ) ) fd_funk_rec_publish( funk, prepare );
66 0 : return early_exit;
67 0 : }
68 :
69 : int
70 : fd_snapin_process_account_data_funk( fd_snapin_tile_t * ctx,
71 0 : fd_ssparse_advance_result_t * result ) {
72 0 : int early_exit = 0;
73 0 : if( FD_UNLIKELY( !ctx->acc_data ) ) {
74 0 : fd_snapin_send_duplicate_account_data( ctx, result->account_data.data, result->account_data.data_sz, &early_exit );
75 0 : return early_exit;
76 0 : }
77 :
78 0 : fd_memcpy( ctx->acc_data, result->account_data.data, result->account_data.data_sz );
79 0 : ctx->acc_data += result->account_data.data_sz;
80 0 : return 0;
81 0 : }
82 :
83 : /* streamlined_insert inserts an unfragmented account.
84 : Only used while loading a full snapshot, not an incremental. */
85 :
86 : static void
87 : streamlined_insert( fd_snapin_tile_t * ctx,
88 : fd_funk_rec_t * rec,
89 : uchar const * frame,
90 0 : ulong slot ) {
91 0 : ulong data_len = fd_ulong_load_8_fast( frame+0x08UL );
92 0 : ulong lamports = fd_ulong_load_8_fast( frame+0x30UL );
93 0 : ulong rent_epoch = fd_ulong_load_8_fast( frame+0x38UL ); (void)rent_epoch;
94 0 : uchar owner[32]; memcpy( owner, frame+0x40UL, 32UL );
95 0 : _Bool executable = !!frame[ 0x60UL ];
96 :
97 0 : fd_funk_t * funk = ctx->funk;
98 0 : if( FD_UNLIKELY( data_len > FD_RUNTIME_ACC_SZ_MAX ) ) FD_LOG_ERR(( "Found unusually large account (data_sz=%lu), aborting", data_len ));
99 0 : fd_funk_val_flush( rec, funk->alloc, funk->wksp );
100 0 : ulong const alloc_sz = sizeof(fd_account_meta_t)+data_len;
101 0 : ulong alloc_max;
102 0 : fd_account_meta_t * meta = fd_alloc_malloc_at_least( funk->alloc, 16UL, alloc_sz, &alloc_max );
103 0 : if( FD_UNLIKELY( !meta ) ) FD_LOG_ERR(( "Ran out of heap memory while loading snapshot (increase [funk.heap_size_gib])" ));
104 0 : memset( meta, 0, sizeof(fd_account_meta_t) );
105 0 : rec->val_gaddr = fd_wksp_gaddr_fast( funk->wksp, meta );
106 0 : rec->val_max = (uint)( fd_ulong_min( alloc_max, FD_FUNK_REC_VAL_MAX ) & FD_FUNK_REC_VAL_MAX );
107 0 : rec->val_sz = (uint)( alloc_sz & FD_FUNK_REC_VAL_MAX );
108 :
109 : /* Write metadata */
110 0 : meta->dlen = (uint)data_len;
111 0 : meta->slot = slot;
112 0 : memcpy( meta->owner, owner, sizeof(fd_pubkey_t) );
113 0 : meta->lamports = lamports;
114 0 : meta->executable = (uchar)executable;
115 :
116 : /* Write data */
117 0 : uchar * acc_data = (uchar *)( meta+1 );
118 0 : fd_memcpy( acc_data, frame+0x88UL, data_len );
119 0 : }
120 :
121 : /* process_account_batch is a happy path performance optimization
122 : handling insertion of lots of small accounts.
123 :
124 : The main optimization implemented for funk is doing hash map memory
125 : accesses in parallel to amortize DRAM latency. */
126 :
127 : int
128 : fd_snapin_process_account_batch_funk( fd_snapin_tile_t * ctx,
129 : fd_ssparse_advance_result_t * result,
130 0 : buffered_account_batch_t * buffered_batch ) {
131 0 : int early_exit = 0;
132 0 : ulong start_idx = result ? 0 : buffered_batch->remaining_idx;
133 0 : fd_funk_t * funk = ctx->funk;
134 0 : fd_funk_rec_map_t * rec_map = funk->rec_map;
135 0 : fd_funk_rec_t * rec_tbl = funk->rec_pool->ele;
136 0 : fd_funk_rec_map_shmem_private_chain_t * chain_tbl = fd_funk_rec_map_shmem_private_chain( rec_map->map, 0UL );
137 :
138 : /* Derive map chains */
139 0 : uint chain_idx[ FD_SSPARSE_ACC_BATCH_MAX ];
140 0 : ulong chain_mask = rec_map->map->chain_cnt-1UL;
141 0 : for( ulong i=start_idx; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
142 0 : uchar const * frame = result ? result->account_batch.batch[ i ] : buffered_batch->batch[ i ];
143 0 : uchar const * pubkey = frame+0x10UL;
144 0 : ulong memo = fd_funk_rec_key_hash1( pubkey, rec_map->map->seed );
145 0 : chain_idx[ i ] = (uint)( memo&chain_mask );
146 0 : }
147 :
148 : /* Parallel load hash chain heads */
149 0 : uint map_node [ FD_SSPARSE_ACC_BATCH_MAX ];
150 0 : uint chain_cnt[ FD_SSPARSE_ACC_BATCH_MAX ];
151 0 : for( ulong i=start_idx; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
152 0 : map_node [ i ] = chain_tbl[ chain_idx[ i ] ].head_cidx;
153 0 : chain_cnt[ i ] = (uint)chain_tbl[ chain_idx[ i ] ].ver_cnt;
154 0 : }
155 0 : uint chain_max = 0U;
156 0 : for( ulong i=start_idx; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
157 0 : chain_max = fd_uint_max( chain_max, chain_cnt[ i ] );
158 0 : }
159 :
160 : /* Parallel walk hash chains */
161 0 : static fd_funk_rec_t dummy_rec = { .map_next = UINT_MAX };
162 0 : fd_funk_rec_t * rec[ FD_SSPARSE_ACC_BATCH_MAX ] = {0};
163 0 : for( ulong j=0UL; j<chain_max; j++ ) {
164 0 : for( ulong i=start_idx; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
165 0 : uchar const * frame = result ? result->account_batch.batch[ i ] : buffered_batch->batch[ i ];
166 0 : uchar const * pubkey = frame+0x10UL;
167 0 : int const has_node = j<chain_cnt[ i ];
168 0 : fd_funk_rec_t * node = has_node ? rec_tbl+map_node[ i ] : &dummy_rec;
169 0 : int const key_match = 0==memcmp( node->pair.key, pubkey, sizeof(fd_funk_rec_key_t) );
170 0 : if( has_node && key_match ) rec[ i ] = node;
171 0 : map_node[ i ] = node->map_next;
172 0 : }
173 0 : }
174 :
175 : /* Create map entries */
176 0 : ulong insert_limit = FD_SSPARSE_ACC_BATCH_MAX;
177 0 : for( ulong i=start_idx; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
178 0 : ulong slot = result ? result->account_batch.slot : buffered_batch->slot;
179 0 : uchar const * frame = result ? result->account_batch.batch[ i ] : buffered_batch->batch[ i ];
180 0 : uchar const * pubkey = frame+0x10UL;
181 0 : ulong data_len = fd_ulong_load_8_fast( frame+0x08UL );
182 0 : ulong lamports = fd_ulong_load_8_fast( frame+0x30UL );
183 0 : ulong rent_epoch = fd_ulong_load_8_fast( frame+0x38UL ); (void)rent_epoch;
184 0 : _Bool executable = !!frame[ 0x60UL ];
185 0 : uchar const * data = frame+0x88UL;
186 0 : uchar owner[32]; memcpy( owner, frame+0x40UL, 32UL );
187 0 : fd_funk_rec_key_t key = FD_LOAD( fd_funk_rec_key_t, pubkey );
188 :
189 0 : ctx->metrics.accounts_loaded++;
190 0 : fd_funk_rec_t * r = rec[ i ];
191 0 : if( FD_LIKELY( !r ) ) { /* optimize for new account */
192 0 : r = fd_funk_rec_pool_acquire( funk->rec_pool, NULL, 0, NULL );
193 0 : FD_TEST( r );
194 :
195 0 : fd_funk_txn_xid_copy( r->pair.xid, ctx->xid );
196 0 : fd_funk_rec_key_copy( r->pair.key, &key );
197 0 : r->map_next = 0U;
198 0 : r->ver_lock = fd_funk_rec_ver_lock( 1UL, 0UL );
199 0 : r->next_idx = UINT_MAX;
200 0 : r->prev_idx = UINT_MAX;
201 0 : r->val_sz = 0;
202 0 : r->val_max = 0;
203 0 : r->tag = 0;
204 0 : r->val_gaddr = 0UL;
205 :
206 : /* Insert to hash map. In theory, a key could appear twice in the
207 : same batch. All accounts in a batch are guaranteed to be from
208 : the same slot though, so this is fine, assuming that accdb code
209 : gracefully handles duplicate hash map entries. */
210 0 : fd_funk_rec_map_shmem_private_chain_t * chain = &chain_tbl[ chain_idx[ i ] ];
211 0 : ulong ver_cnt = chain->ver_cnt;
212 0 : uint head_cidx = chain->head_cidx;
213 0 : chain->ver_cnt = fd_funk_rec_map_private_vcnt( fd_funk_rec_map_private_vcnt_ver( ver_cnt ), fd_funk_rec_map_private_vcnt_cnt( ver_cnt )+1UL );
214 0 : chain->head_cidx = (uint)( r-rec_tbl );
215 0 : r->map_next = head_cidx;
216 0 : rec[ i ] = r;
217 0 : } else { /* existing record for key found */
218 0 : fd_account_meta_t const * existing = fd_funk_val( r, funk->wksp );
219 0 : if( FD_UNLIKELY( !existing ) ) FD_LOG_HEXDUMP_NOTICE(( "r", r, sizeof(fd_funk_rec_t) ));
220 0 : FD_TEST( existing );
221 0 : if( existing->slot > slot ) {
222 0 : rec[ i ] = NULL; /* skip record if existing value is newer */
223 : /* send the skipped account to the subtracting hash tile */
224 0 : ctx->metrics.accounts_ignored++;
225 0 : fd_snapin_send_duplicate_account( ctx, lamports, data, data_len, executable, owner, pubkey, 1, &early_exit );
226 0 : } else if( slot > existing->slot) {
227 : /* send the to-be-replaced account to the subtracting hash tile */
228 0 : ctx->metrics.accounts_replaced++;
229 0 : fd_snapin_send_duplicate_account( ctx, existing->lamports, (uchar const *)existing + sizeof(fd_account_meta_t), existing->dlen, existing->executable, existing->owner, pubkey, 1, &early_exit );
230 0 : } else { /* slot==existing->slot */
231 0 : FD_TEST( 0 );
232 0 : }
233 :
234 0 : if( FD_LIKELY( early_exit ) ) {
235 : /* buffer account batch if not already buffered */
236 0 : if( FD_LIKELY( result && i<FD_SSPARSE_ACC_BATCH_MAX-1UL ) ) {
237 0 : FD_TEST( ctx->buffered_batch.batch_cnt==0UL );
238 0 : fd_memcpy( ctx->buffered_batch.batch, result->account_batch.batch, sizeof(uchar const*)*FD_SSPARSE_ACC_BATCH_MAX );
239 0 : ctx->buffered_batch.slot = result->account_batch.slot;
240 0 : ctx->buffered_batch.batch_cnt = result->account_batch.batch_cnt;
241 0 : ctx->buffered_batch.remaining_idx = i + 1UL;
242 0 : }
243 :
244 0 : insert_limit = i+1UL;
245 0 : break;
246 0 : }
247 0 : }
248 0 : }
249 :
250 : /* Actually insert accounts */
251 0 : for( ulong i=start_idx; i<insert_limit; i++ ) {
252 0 : uchar const * frame = result ? result->account_batch.batch[ i ] : buffered_batch->batch[ i ];
253 0 : ulong slot = result ? result->account_batch.slot : buffered_batch->slot;
254 0 : if( rec[ i ] ) {
255 0 : streamlined_insert( ctx, rec[ i ], frame, slot );
256 0 : }
257 0 : }
258 :
259 0 : if( FD_LIKELY( buffered_batch ) ) {
260 0 : if( FD_LIKELY( insert_limit==FD_SSPARSE_ACC_BATCH_MAX ) ) {
261 0 : buffered_batch->batch_cnt = 0UL;
262 0 : buffered_batch->remaining_idx = 0UL;
263 0 : } else {
264 0 : buffered_batch->remaining_idx = insert_limit;
265 0 : }
266 0 : }
267 :
268 0 : return early_exit;
269 0 : }
270 :
271 : void
272 : fd_snapin_read_account_funk( fd_snapin_tile_t * ctx,
273 : void const * acct_addr,
274 : fd_account_meta_t * meta,
275 : uchar * data,
276 0 : ulong data_max ) {
277 0 : memset( meta, 0, sizeof(fd_account_meta_t) );
278 :
279 : /* Start a speculative database query.
280 : It is assumed that no conflicting database accesses take place
281 : while the account is being read from funk. */
282 :
283 0 : fd_accdb_ro_t ro[1];
284 0 : if( FD_UNLIKELY( !fd_accdb_open_ro( ctx->accdb, ro, ctx->xid, acct_addr ) ) ) {
285 0 : return;
286 0 : }
287 :
288 0 : ulong data_sz = fd_accdb_ref_data_sz( ro );
289 0 : if( FD_UNLIKELY( data_sz>data_max ) ) {
290 0 : FD_BASE58_ENCODE_32_BYTES( acct_addr, acct_addr_b58 );
291 0 : FD_LOG_CRIT(( "failed to read account %s: account data size (%lu bytes) exceeds buffer size (%lu bytes)",
292 0 : acct_addr_b58, data_sz, data_max ));
293 0 : }
294 :
295 0 : memcpy( meta->owner, fd_accdb_ref_owner( ro ), sizeof(fd_pubkey_t) );
296 0 : meta->lamports = fd_accdb_ref_lamports( ro );
297 0 : meta->slot = fd_accdb_ref_slot( ro );
298 0 : meta->dlen = (uint)data_sz;
299 0 : meta->executable = !!fd_accdb_ref_exec_bit( ro );
300 0 : fd_memcpy( data, fd_accdb_ref_data_const( ro ), data_sz );
301 :
302 0 : fd_accdb_close_ro( ctx->accdb, ro );
303 0 : }
|