Line data Source code
1 : #include "fd_rpc_history.h"
2 : #include <unistd.h>
3 : #include <fcntl.h>
4 :
5 : #include "../../ballet/block/fd_microblock.h"
6 : #include "../../ballet/shred/fd_shred.h"
7 : #include "../../flamenco/runtime/fd_system_ids.h"
8 :
9 : struct fd_rpc_block {
10 : ulong slot;
11 : ulong next;
12 : fd_replay_notif_msg_t info;
13 : ulong file_offset;
14 : ulong file_size;
15 : };
16 :
17 : typedef struct fd_rpc_block fd_rpc_block_t;
18 :
19 : #define MAP_NAME fd_rpc_block_map
20 0 : #define MAP_T fd_rpc_block_t
21 : #define MAP_KEY_T ulong
22 0 : #define MAP_KEY slot
23 0 : #define MAP_KEY_EQ(k0,k1) ((*k0)==(*k1))
24 0 : #define MAP_KEY_HASH(key,seed) fd_ulong_hash(*key ^ seed)
25 : #include "../../util/tmpl/fd_map_giant.c"
26 :
27 : struct fd_rpc_txn {
28 : fd_rpc_txn_key_t sig;
29 : ulong next;
30 : ulong slot;
31 : ulong file_offset;
32 : ulong file_size;
33 : };
34 : typedef struct fd_rpc_txn fd_rpc_txn_t;
35 :
36 : FD_FN_PURE int
37 0 : fd_rpc_txn_key_equal( fd_rpc_txn_key_t const * k0, fd_rpc_txn_key_t const * k1 ) {
38 0 : for( ulong i = 0; i < FD_ED25519_SIG_SZ / sizeof( ulong ); ++i )
39 0 : if( k0->v[i] != k1->v[i] ) return 0;
40 0 : return 1;
41 0 : }
42 :
43 : FD_FN_PURE ulong
44 0 : fd_rpc_txn_key_hash( fd_rpc_txn_key_t const * k, ulong seed ) {
45 0 : ulong h = seed;
46 0 : for( ulong i = 0; i < FD_ED25519_SIG_SZ / sizeof( ulong ); ++i )
47 0 : h ^= k->v[i];
48 0 : return h;
49 0 : }
50 :
51 : #define MAP_NAME fd_rpc_txn_map
52 0 : #define MAP_T fd_rpc_txn_t
53 0 : #define MAP_KEY sig
54 : #define MAP_KEY_T fd_rpc_txn_key_t
55 0 : #define MAP_KEY_EQ(k0,k1) fd_rpc_txn_key_equal(k0,k1)
56 0 : #define MAP_KEY_HASH(key,seed) fd_rpc_txn_key_hash(key,seed)
57 : #include "../../util/tmpl/fd_map_giant.c"
58 :
59 : struct fd_rpc_acct_map_elem {
60 : fd_pubkey_t key;
61 : ulong next;
62 : ulong slot;
63 : ulong age;
64 : fd_rpc_txn_key_t sig; /* Transaction signature */
65 : };
66 : typedef struct fd_rpc_acct_map_elem fd_rpc_acct_map_elem_t;
67 : #define MAP_NAME fd_rpc_acct_map
68 : #define MAP_KEY_T fd_pubkey_t
69 0 : #define MAP_ELE_T fd_rpc_acct_map_elem_t
70 0 : #define MAP_KEY_HASH(key,seed) fd_hash( seed, key, sizeof(fd_pubkey_t) )
71 0 : #define MAP_KEY_EQ(k0,k1) fd_pubkey_eq( k0, k1 )
72 : #define MAP_MULTI 1
73 : #include "../../util/tmpl/fd_map_chain.c"
74 : #define POOL_NAME fd_rpc_acct_map_pool
75 0 : #define POOL_T fd_rpc_acct_map_elem_t
76 : #include "../../util/tmpl/fd_pool.c"
77 :
78 : struct fd_rpc_history {
79 : fd_spad_t * spad;
80 : fd_rpc_block_t * block_map;
81 : ulong block_cnt;
82 : fd_rpc_txn_t * txn_map;
83 : fd_rpc_acct_map_t * acct_map;
84 : fd_rpc_acct_map_elem_t * acct_pool;
85 : ulong first_slot;
86 : ulong latest_slot;
87 : int file_fd;
88 : ulong file_totsz;
89 : };
90 :
91 : fd_rpc_history_t *
92 0 : fd_rpc_history_create(fd_rpcserver_args_t * args) {
93 0 : fd_spad_t * spad = args->spad;
94 0 : fd_rpc_history_t * hist = (fd_rpc_history_t *)fd_spad_alloc( spad, alignof(fd_rpc_history_t), sizeof(fd_rpc_history_t) );
95 0 : memset(hist, 0, sizeof(fd_rpc_history_t));
96 0 : hist->spad = spad;
97 :
98 0 : hist->first_slot = ULONG_MAX;
99 0 : hist->latest_slot = 0;
100 :
101 0 : hist->block_map = fd_rpc_block_map_join( fd_rpc_block_map_new( fd_spad_alloc( spad, fd_rpc_block_map_align(), fd_rpc_block_map_footprint(args->block_index_max) ), args->block_index_max, 0 ) );
102 :
103 0 : hist->txn_map = fd_rpc_txn_map_join( fd_rpc_txn_map_new( fd_spad_alloc( spad, fd_rpc_txn_map_align(), fd_rpc_txn_map_footprint(args->txn_index_max) ), args->txn_index_max, 0 ) );
104 :
105 0 : void * mem = fd_spad_alloc( spad, fd_rpc_acct_map_align(), fd_rpc_acct_map_footprint( args->acct_index_max/2 ) );
106 0 : hist->acct_map = fd_rpc_acct_map_join( fd_rpc_acct_map_new( mem, args->acct_index_max/2, 0 ) );
107 0 : mem = fd_spad_alloc( spad, fd_rpc_acct_map_pool_align(), fd_rpc_acct_map_pool_footprint( args->acct_index_max ) );
108 0 : hist->acct_pool = fd_rpc_acct_map_pool_join( fd_rpc_acct_map_pool_new( mem, args->acct_index_max ) );
109 :
110 0 : hist->file_fd = open( args->history_file, O_CREAT | O_RDWR | O_TRUNC, 0644 );
111 0 : if( hist->file_fd == -1 ) FD_LOG_ERR(( "unable to open rpc history file: %s", args->history_file ));
112 0 : hist->file_totsz = 0;
113 :
114 0 : return hist;
115 0 : }
116 :
117 : void
118 0 : fd_rpc_history_save(fd_rpc_history_t * hist, fd_replay_notif_msg_t * info) {
119 0 : FD_SPAD_FRAME_BEGIN( hist->spad ) {
120 0 : if( fd_rpc_block_map_is_full( hist->block_map ) ) return; /* Out of space */
121 :
122 0 : ulong blk_max = info->slot_exec.shred_cnt * FD_SHRED_MAX_SZ;
123 0 : uchar * blk_data = fd_spad_alloc( hist->spad, 1, blk_max );
124 0 : ulong blk_sz = 0;
125 : // if( fd_blockstore_slice_query( blockstore, info->slot_exec.slot, 0, (uint)(info->slot_exec.shred_cnt-1), blk_max, blk_data, &blk_sz) ) {
126 : // FD_LOG_WARNING(( "unable to read slot %lu block", info->slot_exec.slot ));
127 : // return;
128 : // }
129 :
130 0 : FD_LOG_NOTICE(( "saving slot %lu block", info->slot_exec.slot ));
131 :
132 0 : if( hist->first_slot == ULONG_MAX ) hist->first_slot = info->slot_exec.slot;
133 0 : hist->latest_slot = info->slot_exec.slot;
134 :
135 0 : fd_rpc_block_t * blk = fd_rpc_block_map_insert( hist->block_map, &info->slot_exec.slot );
136 0 : if( blk == NULL ) {
137 0 : FD_LOG_ERR(( "unable to save slot %lu block", info->slot_exec.slot ));
138 0 : return;
139 0 : }
140 0 : blk->info = *info;
141 :
142 0 : if( pwrite( hist->file_fd, blk_data, blk_sz, (long)hist->file_totsz ) != (ssize_t)blk_sz ) {
143 0 : FD_LOG_ERR(( "unable to write to rpc history file" ));
144 0 : }
145 0 : ulong base_offset = blk->file_offset = hist->file_totsz;
146 0 : blk->file_size = blk_sz;
147 0 : hist->file_totsz += blk_sz;
148 0 : hist->block_cnt ++;
149 :
150 0 : ulong blockoff = 0;
151 0 : while (blockoff < blk_sz) {
152 0 : if ( blockoff + sizeof(ulong) > blk_sz )
153 0 : return;
154 0 : ulong mcount = *(const ulong *)(blk_data + blockoff);
155 0 : blockoff += sizeof(ulong);
156 :
157 : /* Loop across microblocks */
158 0 : for (ulong mblk = 0; mblk < mcount; ++mblk) {
159 0 : if ( blockoff + sizeof(fd_microblock_hdr_t) > blk_sz )
160 0 : FD_LOG_ERR(("premature end of block"));
161 0 : fd_microblock_hdr_t * hdr = (fd_microblock_hdr_t *)((const uchar *)blk_data + blockoff);
162 0 : blockoff += sizeof(fd_microblock_hdr_t);
163 :
164 : /* Loop across transactions */
165 0 : for ( ulong txn_idx = 0; txn_idx < hdr->txn_cnt; txn_idx++ ) {
166 0 : uchar txn_out[FD_TXN_MAX_SZ];
167 0 : ulong pay_sz = 0;
168 0 : const uchar* raw = (const uchar *)blk_data + blockoff;
169 0 : ulong txn_sz = fd_txn_parse_core(raw, fd_ulong_min(blk_sz - blockoff, FD_TXN_MTU), txn_out, NULL, &pay_sz);
170 0 : if ( txn_sz == 0 || txn_sz > FD_TXN_MAX_SZ ) {
171 0 : FD_LOG_ERR( ( "failed to parse transaction %lu in microblock %lu", txn_idx, mblk ) );
172 0 : }
173 0 : fd_txn_t * txn = (fd_txn_t *)txn_out;
174 :
175 : /* Loop across signatures */
176 0 : fd_ed25519_sig_t const * sigs = (fd_ed25519_sig_t const *)(raw + txn->signature_off);
177 0 : for ( uchar j = 0; j < txn->signature_cnt; j++ ) {
178 0 : if( fd_rpc_txn_map_is_full( hist->txn_map ) ) break; /* Out of space */
179 0 : fd_rpc_txn_key_t key;
180 0 : memcpy(&key, (const uchar*)&sigs[j], sizeof(key));
181 0 : fd_rpc_txn_t * ent = fd_rpc_txn_map_insert( hist->txn_map, &key );
182 0 : ent->file_offset = base_offset + blockoff;
183 0 : ent->file_size = pay_sz;
184 0 : ent->slot = info->slot_exec.slot;
185 0 : }
186 :
187 : /* Loop across accoounts */
188 0 : fd_rpc_txn_key_t sig0;
189 0 : memcpy(&sig0, (const uchar*)sigs, sizeof(sig0));
190 0 : fd_pubkey_t * accs = (fd_pubkey_t *)((uchar *)raw + txn->acct_addr_off);
191 0 : for( ulong i = 0UL; i < txn->acct_addr_cnt; i++ ) {
192 0 : if( !memcmp(&accs[i], fd_solana_vote_program_id.key, sizeof(fd_pubkey_t)) ) continue; /* Ignore votes */
193 0 : if( !fd_rpc_acct_map_pool_free( hist->acct_pool ) ) break;
194 0 : fd_rpc_acct_map_elem_t * ele = fd_rpc_acct_map_pool_ele_acquire( hist->acct_pool );
195 0 : ele->key = accs[i];
196 0 : ele->slot = info->slot_exec.slot;
197 0 : ele->sig = sig0;
198 0 : fd_rpc_acct_map_ele_insert( hist->acct_map, ele, hist->acct_pool );
199 0 : }
200 :
201 0 : blockoff += pay_sz;
202 0 : }
203 0 : }
204 0 : }
205 0 : if ( blockoff != blk_sz )
206 0 : FD_LOG_ERR(("garbage at end of block"));
207 :
208 0 : } FD_SPAD_FRAME_END;
209 0 : }
210 :
211 : ulong
212 0 : fd_rpc_history_first_slot(fd_rpc_history_t * hist) {
213 0 : return hist->first_slot;
214 0 : }
215 :
216 : ulong
217 0 : fd_rpc_history_latest_slot(fd_rpc_history_t * hist) {
218 0 : return hist->latest_slot;
219 0 : }
220 :
221 : fd_replay_notif_msg_t *
222 0 : fd_rpc_history_get_block_info(fd_rpc_history_t * hist, ulong slot) {
223 0 : fd_rpc_block_t * blk = fd_rpc_block_map_query( hist->block_map, &slot, NULL );
224 0 : if( !blk ) {
225 0 : return NULL;
226 0 : }
227 0 : return &blk->info;
228 0 : }
229 :
230 : fd_replay_notif_msg_t *
231 0 : fd_rpc_history_get_block_info_by_hash(fd_rpc_history_t * hist, fd_hash_t * h) {
232 0 : for( fd_rpc_block_map_iter_t i = fd_rpc_block_map_iter_init( hist->block_map );
233 0 : !fd_rpc_block_map_iter_done( hist->block_map, i );
234 0 : i = fd_rpc_block_map_iter_next( hist->block_map, i ) ) {
235 0 : fd_rpc_block_t * ele = fd_rpc_block_map_iter_ele( hist->block_map, i );
236 0 : if( fd_hash_eq( &ele->info.slot_exec.block_hash, h ) ) return &ele->info;
237 0 : }
238 0 : return NULL;
239 0 : }
240 :
241 : uchar *
242 0 : fd_rpc_history_get_block(fd_rpc_history_t * hist, ulong slot, ulong * blk_sz) {
243 0 : fd_rpc_block_t * blk = fd_rpc_block_map_query( hist->block_map, &slot, NULL );
244 0 : if( !blk ) {
245 0 : *blk_sz = ULONG_MAX;
246 0 : return NULL;
247 0 : }
248 0 : uchar * blk_data = fd_spad_alloc( hist->spad, 1, blk->file_size );
249 0 : if( pread( hist->file_fd, blk_data, blk->file_size, (long)blk->file_offset ) != (ssize_t)blk->file_size ) {
250 0 : FD_LOG_ERR(( "unable to read rpc history file" ));
251 0 : *blk_sz = ULONG_MAX;
252 0 : return NULL;
253 0 : }
254 0 : *blk_sz = blk->file_size;
255 0 : return blk_data;
256 0 : }
257 :
258 : uchar *
259 0 : fd_rpc_history_get_txn(fd_rpc_history_t * hist, fd_rpc_txn_key_t * sig, ulong * txn_sz, ulong * slot) {
260 0 : fd_rpc_txn_t * txn = fd_rpc_txn_map_query( hist->txn_map, sig, NULL );
261 0 : if( !txn ) {
262 0 : *txn_sz = ULONG_MAX;
263 0 : return NULL;
264 0 : }
265 0 : uchar * txn_data = fd_spad_alloc( hist->spad, 1, txn->file_size );
266 0 : if( pread( hist->file_fd, txn_data, txn->file_size, (long)txn->file_offset ) != (ssize_t)txn->file_size ) {
267 0 : FD_LOG_ERR(( "unable to read rpc history file" ));
268 0 : *txn_sz = ULONG_MAX;
269 0 : return NULL;
270 0 : }
271 0 : *txn_sz = txn->file_size;
272 0 : *slot = txn->slot;
273 0 : return txn_data;
274 0 : }
275 :
276 : const void *
277 0 : fd_rpc_history_first_txn_for_acct(fd_rpc_history_t * hist, fd_pubkey_t * acct, fd_rpc_txn_key_t * sig, ulong * slot) {
278 0 : fd_rpc_acct_map_elem_t const * ele = fd_rpc_acct_map_ele_query_const( hist->acct_map, acct, NULL, hist->acct_pool );
279 0 : if( ele == NULL ) return NULL;
280 0 : *sig = ele->sig;
281 0 : *slot = ele->slot;
282 0 : return ele;
283 0 : }
284 :
285 : const void *
286 0 : fd_rpc_history_next_txn_for_acct(fd_rpc_history_t * hist, fd_rpc_txn_key_t * sig, ulong * slot, const void * iter) {
287 0 : fd_rpc_acct_map_elem_t const * ele = (fd_rpc_acct_map_elem_t const *)iter;
288 0 : ele = fd_rpc_acct_map_ele_next_const( ele, NULL, hist->acct_pool );
289 0 : if( ele == NULL ) return NULL;
290 0 : *sig = ele->sig;
291 0 : *slot = ele->slot;
292 0 : return ele;
293 0 : }
|