Line data Source code
1 : /* fd_snapin_tile_funk.c contains APIs to load accounts into funk. */
2 :
3 : #include "fd_snapin_tile_private.h"
4 : #include "../../flamenco/accdb/fd_accdb_sync.h"
5 :
6 : int
7 : fd_snapin_process_account_header_funk( fd_snapin_tile_t * ctx,
8 0 : fd_ssparse_advance_result_t * result ) {
9 0 : fd_funk_t * funk = ctx->accdb_admin->funk;
10 :
11 0 : fd_funk_rec_key_t id = FD_LOAD( fd_funk_rec_key_t, result->account_header.pubkey );
12 0 : fd_funk_rec_query_t query[1];
13 0 : fd_funk_rec_t * rec = fd_funk_rec_query_try( funk, ctx->xid, &id, query );
14 0 : fd_funk_rec_t const * existing_rec = rec;
15 :
16 0 : int early_exit = 0;
17 0 : if( !ctx->full && !existing_rec ) {
18 0 : fd_accdb_peek_t peek[1];
19 0 : if( fd_accdb_peek( ctx->accdb, peek, ctx->xid, result->account_header.pubkey ) ) {
20 0 : existing_rec = peek->acc->rec;
21 0 : }
22 0 : }
23 0 : if( FD_UNLIKELY( existing_rec ) ) {
24 0 : fd_account_meta_t * meta = fd_funk_val( existing_rec, funk->wksp );
25 0 : if( FD_UNLIKELY( meta ) ) {
26 0 : if( FD_LIKELY( meta->slot>result->account_header.slot ) ) {
27 0 : ctx->acc_data = NULL;
28 0 : fd_snapin_send_duplicate_account( ctx, result->account_header.lamports, NULL, result->account_header.data_len, (uchar)result->account_header.executable, result->account_header.owner, result->account_header.pubkey, 0, &early_exit );
29 0 : return early_exit;
30 0 : }
31 0 : fd_snapin_send_duplicate_account( ctx, meta->lamports, (uchar const *)meta + sizeof(fd_account_meta_t), meta->dlen, meta->executable, meta->owner, result->account_header.pubkey, 1, &early_exit);
32 0 : }
33 0 : }
34 :
35 0 : int should_publish = 0;
36 0 : fd_funk_rec_prepare_t prepare[1];
37 0 : if( FD_LIKELY( !rec ) ) {
38 0 : should_publish = 1;
39 0 : rec = fd_funk_rec_prepare( funk, ctx->xid, &id, prepare, NULL );
40 0 : FD_TEST( rec );
41 0 : }
42 :
43 0 : fd_account_meta_t * meta = fd_funk_val( rec, funk->wksp );
44 : /* Allocate data space from heap, free old value (if any) */
45 0 : fd_funk_val_flush( rec, funk->alloc, funk->wksp );
46 0 : ulong const alloc_sz = sizeof(fd_account_meta_t)+result->account_header.data_len;
47 0 : ulong alloc_max;
48 0 : meta = fd_alloc_malloc_at_least( funk->alloc, 16UL, alloc_sz, &alloc_max );
49 0 : if( FD_UNLIKELY( !meta ) ) FD_LOG_ERR(( "Ran out of heap memory while loading snapshot (increase [funk.heap_size_gib])" ));
50 0 : memset( meta, 0, sizeof(fd_account_meta_t) );
51 0 : rec->val_gaddr = fd_wksp_gaddr_fast( funk->wksp, meta );
52 0 : rec->val_max = (uint)( fd_ulong_min( alloc_max, FD_FUNK_REC_VAL_MAX ) & FD_FUNK_REC_VAL_MAX );
53 0 : rec->val_sz = (uint)( alloc_sz & FD_FUNK_REC_VAL_MAX );
54 :
55 0 : meta->dlen = (uint)result->account_header.data_len;
56 0 : meta->slot = result->account_header.slot;
57 0 : memcpy( meta->owner, result->account_header.owner, sizeof(fd_pubkey_t) );
58 0 : meta->lamports = result->account_header.lamports;
59 0 : meta->executable = (uchar)result->account_header.executable;
60 :
61 0 : ctx->acc_data = (uchar*)meta + sizeof(fd_account_meta_t);
62 0 : ctx->metrics.accounts_inserted++;
63 :
64 0 : if( FD_LIKELY( should_publish ) ) fd_funk_rec_publish( funk, prepare );
65 0 : return early_exit;
66 0 : }
67 :
68 : int
69 : fd_snapin_process_account_data_funk( fd_snapin_tile_t * ctx,
70 0 : fd_ssparse_advance_result_t * result ) {
71 0 : int early_exit = 0;
72 0 : if( FD_UNLIKELY( !ctx->acc_data ) ) {
73 0 : fd_snapin_send_duplicate_account_data( ctx, result->account_data.data, result->account_data.data_sz, &early_exit );
74 0 : return early_exit;
75 0 : }
76 :
77 0 : fd_memcpy( ctx->acc_data, result->account_data.data, result->account_data.data_sz );
78 0 : ctx->acc_data += result->account_data.data_sz;
79 0 : return 0;
80 0 : }
81 :
82 : /* streamlined_insert inserts an unfragmented account.
83 : Only used while loading a full snapshot, not an incremental. */
84 :
85 : static void
86 : streamlined_insert( fd_snapin_tile_t * ctx,
87 : fd_funk_rec_t * rec,
88 : uchar const * frame,
89 0 : ulong slot ) {
90 0 : ulong data_len = fd_ulong_load_8_fast( frame+0x08UL );
91 0 : ulong lamports = fd_ulong_load_8_fast( frame+0x30UL );
92 0 : ulong rent_epoch = fd_ulong_load_8_fast( frame+0x38UL ); (void)rent_epoch;
93 0 : uchar owner[32]; memcpy( owner, frame+0x40UL, 32UL );
94 0 : _Bool executable = !!frame[ 0x60UL ];
95 :
96 0 : fd_funk_t * funk = ctx->accdb_admin->funk;
97 0 : if( FD_UNLIKELY( data_len > FD_RUNTIME_ACC_SZ_MAX ) ) FD_LOG_ERR(( "Found unusually large account (data_sz=%lu), aborting", data_len ));
98 0 : fd_funk_val_flush( rec, funk->alloc, funk->wksp );
99 0 : ulong const alloc_sz = sizeof(fd_account_meta_t)+data_len;
100 0 : ulong alloc_max;
101 0 : fd_account_meta_t * meta = fd_alloc_malloc_at_least( funk->alloc, 16UL, alloc_sz, &alloc_max );
102 0 : if( FD_UNLIKELY( !meta ) ) FD_LOG_ERR(( "Ran out of heap memory while loading snapshot (increase [funk.heap_size_gib])" ));
103 0 : memset( meta, 0, sizeof(fd_account_meta_t) );
104 0 : rec->val_gaddr = fd_wksp_gaddr_fast( funk->wksp, meta );
105 0 : rec->val_max = (uint)( fd_ulong_min( alloc_max, FD_FUNK_REC_VAL_MAX ) & FD_FUNK_REC_VAL_MAX );
106 0 : rec->val_sz = (uint)( alloc_sz & FD_FUNK_REC_VAL_MAX );
107 :
108 : /* Write metadata */
109 0 : meta->dlen = (uint)data_len;
110 0 : meta->slot = slot;
111 0 : memcpy( meta->owner, owner, sizeof(fd_pubkey_t) );
112 0 : meta->lamports = lamports;
113 0 : meta->executable = (uchar)executable;
114 :
115 : /* Write data */
116 0 : uchar * acc_data = (uchar *)( meta+1 );
117 0 : fd_memcpy( acc_data, frame+0x88UL, data_len );
118 :
119 0 : ctx->metrics.accounts_inserted++;
120 0 : }
121 :
122 : /* process_account_batch is a happy path performance optimization
123 : handling insertion of lots of small accounts.
124 :
125 : The main optimization implemented for funk is doing hash map memory
126 : accesses in parallel to amortize DRAM latency. */
127 :
128 : int
129 : fd_snapin_process_account_batch_funk( fd_snapin_tile_t * ctx,
130 : fd_ssparse_advance_result_t * result,
131 0 : buffered_account_batch_t * buffered_batch ) {
132 0 : int early_exit = 0;
133 0 : ulong start_idx = result ? 0 : buffered_batch->remaining_idx;
134 0 : fd_funk_t * funk = ctx->accdb_admin->funk;
135 0 : fd_funk_rec_map_t * rec_map = funk->rec_map;
136 0 : fd_funk_rec_t * rec_tbl = funk->rec_pool->ele;
137 0 : fd_funk_rec_map_shmem_private_chain_t * chain_tbl = fd_funk_rec_map_shmem_private_chain( rec_map->map, 0UL );
138 :
139 : /* Derive map chains */
140 0 : uint chain_idx[ FD_SSPARSE_ACC_BATCH_MAX ];
141 0 : ulong chain_mask = rec_map->map->chain_cnt-1UL;
142 0 : for( ulong i=start_idx; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
143 0 : uchar const * frame = result ? result->account_batch.batch[ i ] : buffered_batch->batch[ i ];
144 0 : uchar const * pubkey = frame+0x10UL;
145 0 : ulong memo = fd_funk_rec_key_hash1( pubkey, rec_map->map->seed );
146 0 : chain_idx[ i ] = (uint)( memo&chain_mask );
147 0 : }
148 :
149 : /* Parallel load hash chain heads */
150 0 : uint map_node [ FD_SSPARSE_ACC_BATCH_MAX ];
151 0 : uint chain_cnt[ FD_SSPARSE_ACC_BATCH_MAX ];
152 0 : for( ulong i=start_idx; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
153 0 : map_node [ i ] = chain_tbl[ chain_idx[ i ] ].head_cidx;
154 0 : chain_cnt[ i ] = (uint)chain_tbl[ chain_idx[ i ] ].ver_cnt;
155 0 : }
156 0 : uint chain_max = 0U;
157 0 : for( ulong i=start_idx; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
158 0 : chain_max = fd_uint_max( chain_max, chain_cnt[ i ] );
159 0 : }
160 :
161 : /* Parallel walk hash chains */
162 0 : static fd_funk_rec_t dummy_rec = { .map_next = UINT_MAX };
163 0 : fd_funk_rec_t * rec[ FD_SSPARSE_ACC_BATCH_MAX ] = {0};
164 0 : for( ulong j=0UL; j<chain_max; j++ ) {
165 0 : for( ulong i=start_idx; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
166 0 : uchar const * frame = result ? result->account_batch.batch[ i ] : buffered_batch->batch[ i ];
167 0 : uchar const * pubkey = frame+0x10UL;
168 0 : int const has_node = j<chain_cnt[ i ];
169 0 : fd_funk_rec_t * node = has_node ? rec_tbl+map_node[ i ] : &dummy_rec;
170 0 : int const key_match = 0==memcmp( node->pair.key, pubkey, sizeof(fd_funk_rec_key_t) );
171 0 : if( has_node && key_match ) rec[ i ] = node;
172 0 : map_node[ i ] = node->map_next;
173 0 : }
174 0 : }
175 :
176 : /* Create map entries */
177 0 : ulong insert_limit = FD_SSPARSE_ACC_BATCH_MAX;
178 0 : for( ulong i=start_idx; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
179 0 : ulong slot = result ? result->account_batch.slot : buffered_batch->slot;
180 0 : uchar const * frame = result ? result->account_batch.batch[ i ] : buffered_batch->batch[ i ];
181 0 : uchar const * pubkey = frame+0x10UL;
182 0 : ulong data_len = fd_ulong_load_8_fast( frame+0x08UL );
183 0 : ulong lamports = fd_ulong_load_8_fast( frame+0x30UL );
184 0 : ulong rent_epoch = fd_ulong_load_8_fast( frame+0x38UL ); (void)rent_epoch;
185 0 : _Bool executable = !!frame[ 0x60UL ];
186 0 : uchar const * data = frame+0x88UL;
187 0 : uchar owner[32]; memcpy( owner, frame+0x40UL, 32UL );
188 0 : fd_funk_rec_key_t key = FD_LOAD( fd_funk_rec_key_t, pubkey );
189 :
190 0 : fd_funk_rec_t * r = rec[ i ];
191 0 : if( FD_LIKELY( !r ) ) { /* optimize for new account */
192 0 : r = fd_funk_rec_pool_acquire( funk->rec_pool, NULL, 0, NULL );
193 0 : FD_TEST( r );
194 0 : memset( r, 0, sizeof(fd_funk_rec_t) );
195 0 : fd_funk_txn_xid_copy( r->pair.xid, ctx->xid );
196 0 : fd_funk_rec_key_copy( r->pair.key, &key );
197 0 : r->prev_idx = UINT_MAX;
198 0 : r->next_idx = UINT_MAX;
199 :
200 : /* Insert to hash map. In theory, a key could appear twice in the
201 : same batch. All accounts in a batch are guaranteed to be from
202 : the same slot though, so this is fine, assuming that accdb code
203 : gracefully handles duplicate hash map entries. */
204 0 : fd_funk_rec_map_shmem_private_chain_t * chain = &chain_tbl[ chain_idx[ i ] ];
205 0 : ulong ver_cnt = chain->ver_cnt;
206 0 : uint head_cidx = chain->head_cidx;
207 0 : chain->ver_cnt = fd_funk_rec_map_private_vcnt( fd_funk_rec_map_private_vcnt_ver( ver_cnt ), fd_funk_rec_map_private_vcnt_cnt( ver_cnt )+1UL );
208 0 : chain->head_cidx = (uint)( r-rec_tbl );
209 0 : r->map_next = head_cidx;
210 0 : rec[ i ] = r;
211 0 : } else { /* existing record for key found */
212 0 : fd_account_meta_t const * existing = fd_funk_val( r, funk->wksp );
213 0 : if( FD_UNLIKELY( !existing ) ) FD_LOG_HEXDUMP_NOTICE(( "r", r, sizeof(fd_funk_rec_t) ));
214 0 : FD_TEST( existing );
215 0 : if( existing->slot > slot ) {
216 0 : rec[ i ] = NULL; /* skip record if existing value is newer */
217 : /* send the skipped account to the subtracting hash tile */
218 0 : fd_snapin_send_duplicate_account( ctx, lamports, data, data_len, executable, owner, pubkey, 1, &early_exit );
219 0 : } else if( slot > existing->slot) {
220 : /* send the to-be-replaced account to the subtracting hash tile */
221 0 : fd_snapin_send_duplicate_account( ctx, existing->lamports, (uchar const *)existing + sizeof(fd_account_meta_t), existing->dlen, existing->executable, existing->owner, pubkey, 1, &early_exit );
222 0 : } else { /* slot==existing->slot */
223 0 : FD_TEST( 0 );
224 0 : }
225 :
226 0 : if( FD_LIKELY( early_exit ) ) {
227 : /* buffer account batch if not already buffered */
228 0 : if( FD_LIKELY( result && i<FD_SSPARSE_ACC_BATCH_MAX-1UL ) ) {
229 0 : FD_TEST( ctx->buffered_batch.batch_cnt==0UL );
230 0 : fd_memcpy( ctx->buffered_batch.batch, result->account_batch.batch, sizeof(uchar const*)*FD_SSPARSE_ACC_BATCH_MAX );
231 0 : ctx->buffered_batch.slot = result->account_batch.slot;
232 0 : ctx->buffered_batch.batch_cnt = result->account_batch.batch_cnt;
233 0 : ctx->buffered_batch.remaining_idx = i + 1UL;
234 0 : }
235 :
236 0 : insert_limit = i+1UL;
237 0 : break;
238 0 : }
239 0 : }
240 0 : }
241 :
242 : /* Actually insert accounts */
243 0 : for( ulong i=start_idx; i<insert_limit; i++ ) {
244 0 : uchar const * frame = result ? result->account_batch.batch[ i ] : buffered_batch->batch[ i ];
245 0 : ulong slot = result ? result->account_batch.slot : buffered_batch->slot;
246 0 : if( rec[ i ] ) {
247 0 : streamlined_insert( ctx, rec[ i ], frame, slot );
248 0 : }
249 0 : }
250 :
251 0 : if( FD_LIKELY( buffered_batch ) ) {
252 0 : if( FD_LIKELY( insert_limit==FD_SSPARSE_ACC_BATCH_MAX ) ) {
253 0 : buffered_batch->batch_cnt = 0UL;
254 0 : buffered_batch->remaining_idx = 0UL;
255 0 : } else {
256 0 : buffered_batch->remaining_idx = insert_limit;
257 0 : }
258 0 : }
259 :
260 0 : return early_exit;
261 0 : }
262 :
263 : void
264 : fd_snapin_read_account_funk( fd_snapin_tile_t * ctx,
265 : void const * acct_addr,
266 : fd_account_meta_t * meta,
267 : uchar * data,
268 0 : ulong data_max ) {
269 0 : memset( meta, 0, sizeof(fd_account_meta_t) );
270 :
271 : /* Start a speculative database query.
272 : It is assumed that no conflicting database accesses take place
273 : while the account is being read from funk. */
274 :
275 0 : fd_accdb_peek_t peek_[1];
276 0 : fd_accdb_peek_t * peek = fd_accdb_peek( ctx->accdb, peek_, ctx->xid, acct_addr );
277 0 : if( FD_UNLIKELY( !peek ) ) return;
278 :
279 0 : ulong data_sz = fd_accdb_ref_data_sz( peek->acc );
280 0 : if( FD_UNLIKELY( data_sz>data_max ) ) {
281 0 : FD_BASE58_ENCODE_32_BYTES( acct_addr, acct_addr_b58 );
282 0 : FD_LOG_WARNING(( "failed to read account %s: account data size (%lu bytes) exceeds buffer size (%lu bytes)",
283 0 : acct_addr_b58, (ulong)meta->dlen, data_max ));
284 0 : }
285 :
286 0 : memcpy( meta->owner, fd_accdb_ref_owner( peek->acc ), sizeof(fd_pubkey_t) );
287 0 : meta->lamports = fd_accdb_ref_lamports( peek->acc );
288 0 : meta->slot = fd_accdb_ref_slot( peek->acc );
289 0 : meta->dlen = (uint)data_sz;
290 0 : meta->executable = !!fd_accdb_ref_exec_bit( peek->acc );
291 0 : fd_memcpy( data, fd_accdb_ref_data_const( peek->acc ), data_sz );
292 :
293 0 : FD_CRIT( fd_accdb_peek_test( peek ), "invalid read" );
294 0 : fd_accdb_peek_drop( peek );
295 0 : }
|