Line data Source code
1 :
2 : #include "fd_geyser.h"
3 :
4 : #include "../../funk/fd_funk_filemap.h"
5 : #include "../../tango/mcache/fd_mcache.h"
6 : #include "../../flamenco/runtime/fd_acc_mgr.h"
7 : #include "../../util/wksp/fd_wksp_private.h"
8 : #include "../../disco/topo/fd_topo.h"
9 :
10 : #include <stdio.h>
11 : #include <stdlib.h>
12 : #include <signal.h>
13 : #include <errno.h>
14 : #include <unistd.h>
15 : #include <netdb.h>
16 : #include <sys/socket.h>
17 : #include <netinet/in.h>
18 : #include <arpa/inet.h>
19 :
20 : #define SHAM_LINK_CONTEXT fd_geyser_t
21 : #define SHAM_LINK_STATE fd_replay_notif_msg_t
22 : #define SHAM_LINK_NAME replay_sham_link
23 : #include "sham_link.h"
24 :
25 : #define SHAM_LINK_CONTEXT fd_geyser_t
26 : #define SHAM_LINK_STATE fd_stake_ci_t
27 : #define SHAM_LINK_NAME stake_sham_link
28 : #include "sham_link.h"
29 :
30 : struct fd_geyser {
31 : fd_funk_t * funk;
32 : fd_blockstore_t blockstore_ljoin;
33 : fd_blockstore_t * blockstore;
34 : int blockstore_fd;
35 : fd_stake_ci_t * stake_ci;
36 : replay_sham_link_t * rep_notify;
37 : stake_sham_link_t * stake_notify;
38 :
39 : void * fun_arg;
40 : fd_geyser_execute_fun execute_fun; /* Slot numbers, bank hash */
41 : fd_geyser_block_fun block_fun; /* Raw block data, additional metadata */
42 : fd_geyser_entry_fun entry_fun; /* Every entry/microblock */
43 : fd_geyser_txn_fun txn_fun; /* executed individual transaction */
44 : fd_geyser_block_done_fun block_done_fun; /* Called after block specific updates are done */
45 :
46 : fd_geyser_acct_fun acct_fun; /* Account written */
47 : };
48 :
49 : ulong
50 0 : fd_geyser_footprint( void ) {
51 0 : ulong l = FD_LAYOUT_INIT;
52 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_geyser_t), sizeof(fd_geyser_t) );
53 0 : l = FD_LAYOUT_APPEND( l, fd_stake_ci_align(), fd_stake_ci_footprint() );
54 0 : l = FD_LAYOUT_APPEND( l, replay_sham_link_align(), replay_sham_link_footprint() );
55 0 : l = FD_LAYOUT_APPEND( l, stake_sham_link_align(), stake_sham_link_footprint() );
56 0 : return FD_LAYOUT_FINI( l, 1UL );
57 0 : }
58 :
59 : ulong
60 0 : fd_geyser_align( void ) {
61 0 : return alignof(fd_geyser_t);
62 0 : }
63 :
64 : void *
65 0 : fd_geyser_new( void * mem, fd_geyser_args_t * args ) {
66 0 : FD_SCRATCH_ALLOC_INIT( l, mem );
67 0 : fd_geyser_t * self = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_geyser_t), sizeof(fd_geyser_t) );
68 0 : void * stake_ci_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_stake_ci_align(), fd_stake_ci_footprint() );
69 0 : void * rep_notify_mem = FD_SCRATCH_ALLOC_APPEND( l, replay_sham_link_align(), replay_sham_link_footprint() );
70 0 : void * stake_notify_mem = FD_SCRATCH_ALLOC_APPEND( l, stake_sham_link_align(), stake_sham_link_footprint() );
71 0 : ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, 1UL );
72 0 : FD_TEST( scratch_top <= (ulong)mem + fd_geyser_footprint() );
73 :
74 0 : self->funk = fd_funk_open_file( args->funk_file, 1, 0, 0, 0, 0, FD_FUNK_READONLY, NULL );
75 0 : if( self->funk == NULL ) {
76 0 : FD_LOG_ERR(( "failed to join a funky" ));
77 0 : }
78 :
79 0 : fd_wksp_t * wksp = fd_wksp_attach( args->blockstore_wksp );
80 0 : if( FD_UNLIKELY( !wksp ) )
81 0 : FD_LOG_ERR(( "unable to attach to \"%s\"\n\tprobably does not exist or bad permissions", args->blockstore_wksp ));
82 0 : fd_wksp_tag_query_info_t info;
83 0 : ulong tag = 1;
84 0 : if( fd_wksp_tag_query( wksp, &tag, 1, &info, 1 ) <= 0 ) {
85 0 : FD_LOG_ERR(( "workspace \"%s\" does not contain a blockstore", args->blockstore_wksp ));
86 0 : }
87 0 : void * shmem = fd_wksp_laddr_fast( wksp, info.gaddr_lo );
88 0 : self->blockstore = fd_blockstore_join( &self->blockstore_ljoin, shmem );
89 0 : if( self->blockstore->shmem->magic != FD_BLOCKSTORE_MAGIC ) {
90 0 : FD_LOG_ERR(( "failed to join a blockstore" ));
91 0 : }
92 0 : self->blockstore_fd = args->blockstore_fd;
93 0 : fd_wksp_mprotect( wksp, 1 );
94 :
95 0 : fd_pubkey_t identity_key[1]; /* Just the public key */
96 0 : memset( identity_key, 0xa5, sizeof(fd_pubkey_t) );
97 0 : self->stake_ci = fd_stake_ci_join( fd_stake_ci_new( stake_ci_mem, identity_key ) );
98 :
99 0 : self->rep_notify = replay_sham_link_new( rep_notify_mem, "fd1_replay_notif.wksp" );
100 0 : self->stake_notify = stake_sham_link_new( stake_notify_mem, "fd1_stake_out.wksp" );
101 :
102 0 : replay_sham_link_start( self->rep_notify );
103 0 : stake_sham_link_start( self->stake_notify );
104 :
105 0 : self->execute_fun = args->execute_fun;
106 0 : self->block_fun = args->block_fun;
107 0 : self->block_done_fun = args->block_done_fun;
108 0 : self->entry_fun = args->entry_fun;
109 0 : self->txn_fun = args->txn_fun;
110 0 : self->acct_fun = args->acct_fun;
111 0 : self->fun_arg = args->fun_arg;
112 :
113 0 : return mem;
114 0 : }
115 :
116 : fd_geyser_t *
117 0 : fd_geyser_join( void * mem ) {
118 0 : FD_SCRATCH_ALLOC_INIT( l, mem );
119 0 : return FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_geyser_t), sizeof(fd_geyser_t) );
120 0 : }
121 :
122 : void *
123 0 : fd_geyser_leave( fd_geyser_t * self ) {
124 0 : return self;
125 0 : }
126 :
127 : void *
128 0 : fd_geyser_delete( void * mem ) {
129 0 : return mem;
130 0 : }
131 :
132 : static void
133 0 : fd_geyser_scan_txns( fd_geyser_t * ctx, ulong slotn, uchar * data, ulong sz ) {
134 0 : ulong blockoff = 0;
135 0 : while( blockoff < sz ) {
136 0 : if( blockoff + sizeof( ulong ) > sz ) FD_LOG_ERR(( "premature end of block" ));
137 0 : ulong mcount = FD_LOAD( ulong, (const uchar *)data + blockoff );
138 0 : blockoff += sizeof( ulong );
139 :
140 : /* Loop across microblocks */
141 0 : for( ulong mblk = 0; mblk < mcount; ++mblk ) {
142 0 : if( blockoff + sizeof( fd_microblock_hdr_t ) > sz ) {
143 0 : FD_LOG_WARNING(( "premature end of block" ));
144 0 : return;
145 0 : }
146 0 : fd_microblock_hdr_t * hdr = (fd_microblock_hdr_t *)( (const uchar *)data + blockoff );
147 0 : blockoff += sizeof( fd_microblock_hdr_t );
148 :
149 0 : if( ctx->entry_fun != NULL ) {
150 0 : (*ctx->entry_fun)( slotn, hdr, ctx->fun_arg );
151 0 : }
152 :
153 : /* Loop across transactions */
154 0 : for( ulong txn_idx = 0; txn_idx < hdr->txn_cnt; txn_idx++ ) {
155 0 : uchar txn_out[FD_TXN_MAX_SZ];
156 0 : uchar const * raw = (uchar const *)data + blockoff;
157 0 : ulong pay_sz = 0;
158 0 : ulong txn_sz = fd_txn_parse_core( (uchar const *)raw,
159 0 : fd_ulong_min( sz - blockoff, FD_TXN_MTU ),
160 0 : txn_out,
161 0 : NULL,
162 0 : &pay_sz );
163 0 : if( txn_sz == 0 || txn_sz > FD_TXN_MTU ) {
164 0 : FD_LOG_WARNING(( "failed to parse transaction %lu in microblock %lu in slot %lu. txn size: %lu",
165 0 : txn_idx,
166 0 : mblk,
167 0 : slotn,
168 0 : txn_sz ));
169 0 : return;
170 0 : }
171 0 : fd_txn_t const * txn = (fd_txn_t const *)txn_out;
172 :
173 0 : if( ctx->txn_fun != NULL ) {
174 0 : (*ctx->txn_fun)( slotn, txn, raw, txn_sz, ctx->fun_arg );
175 0 : }
176 :
177 0 : blockoff += pay_sz;
178 0 : }
179 0 : }
180 0 : }
181 0 : }
182 :
183 : void
184 0 : fd_geyser_replay_block( fd_geyser_t * ctx, ulong slotn ) {
185 0 : if( ctx->block_fun != NULL || ctx->entry_fun != NULL || ctx->txn_fun != NULL ) {
186 0 : FD_SCRATCH_SCOPE_BEGIN {
187 0 : fd_block_info_t meta[1];
188 0 : fd_block_rewards_t rewards[1];
189 0 : fd_hash_t parent_hash;
190 0 : uchar * blk_data;
191 0 : ulong blk_sz;
192 0 : if( fd_blockstore_block_data_query_volatile( ctx->blockstore, ctx->blockstore_fd, slotn, fd_scratch_virtual(), &parent_hash, meta, rewards, &blk_data, &blk_sz ) ) {
193 0 : FD_LOG_WARNING(( "failed to retrieve block for slot %lu", slotn ));
194 0 : return;
195 0 : }
196 0 : if( ctx->block_fun != NULL ) {
197 0 : (*ctx->block_fun)( slotn, meta, &parent_hash, blk_data, blk_sz, ctx->fun_arg );
198 0 : }
199 0 : if( ctx->entry_fun != NULL || ctx->txn_fun != NULL ) {
200 0 : fd_geyser_scan_txns( ctx, slotn, blk_data, blk_sz );
201 0 : }
202 0 : } FD_SCRATCH_SCOPE_END;
203 0 : }
204 0 : if( ctx->block_done_fun != NULL ) {
205 0 : ( *ctx->block_done_fun ) ( slotn, ctx->fun_arg );
206 0 : }
207 0 : }
208 :
209 : static void
210 0 : replay_sham_link_during_frag( fd_geyser_t * ctx, fd_replay_notif_msg_t * state, void const * msg, int sz ) {
211 0 : (void)ctx;
212 0 : FD_TEST( sz == (int)sizeof(fd_replay_notif_msg_t) );
213 0 : fd_memcpy(state, msg, sizeof(fd_replay_notif_msg_t));
214 0 : }
215 :
216 : static void
217 0 : replay_sham_link_after_frag(fd_geyser_t * ctx, fd_replay_notif_msg_t * msg) {
218 0 : if( msg->type == FD_REPLAY_SLOT_TYPE ) {
219 0 : ulong slotn = msg->slot_exec.slot;
220 0 : if( ctx->execute_fun != NULL ) {
221 0 : ( *ctx->execute_fun ) ( msg, ctx->fun_arg );
222 0 : }
223 0 : fd_geyser_replay_block( ctx, slotn );
224 :
225 0 : } else if( msg->type == FD_REPLAY_ACCTS_TYPE ) {
226 0 : if( ctx->acct_fun != NULL ) {
227 0 : for( uint i = 0; i < msg->accts.accts_cnt; ++i ) {
228 0 : FD_SCRATCH_SCOPE_BEGIN {
229 0 : fd_pubkey_t addr;
230 0 : fd_memcpy(&addr, msg->accts.accts[i].id, 32U );
231 0 : fd_funk_rec_key_t key = fd_acc_funk_key( &addr );
232 0 : ulong datalen;
233 0 : void * data = fd_funk_rec_query_xid_safe( ctx->funk, &key, &msg->accts.funk_xid, fd_scratch_virtual(), &datalen );
234 0 : if( data ) {
235 0 : fd_account_meta_t const * meta = fd_type_pun_const( data );
236 0 : (*ctx->acct_fun)( msg->accts.funk_xid.ul[0], msg->accts.sig, &addr, meta, (uchar*)data + meta->hlen, meta->dlen, ctx->fun_arg );
237 0 : }
238 0 : } FD_SCRATCH_SCOPE_END;
239 0 : }
240 0 : }
241 0 : }
242 0 : }
243 :
244 : static void
245 0 : stake_sham_link_during_frag( fd_geyser_t * ctx, fd_stake_ci_t * state, void const * msg, int sz ) {
246 0 : (void)ctx; (void)sz;
247 0 : fd_stake_ci_stake_msg_init( state, msg );
248 0 : }
249 :
250 : static void
251 0 : stake_sham_link_after_frag(fd_geyser_t * ctx, fd_stake_ci_t * state) {
252 0 : (void)ctx;
253 0 : fd_stake_ci_stake_msg_fini( state );
254 0 : }
255 :
256 : void
257 0 : fd_geyser_poll( fd_geyser_t * self ) {
258 0 : fd_replay_notif_msg_t msg;
259 0 : replay_sham_link_poll( self->rep_notify, self, &msg );
260 :
261 0 : stake_sham_link_poll( self->stake_notify, self, self->stake_ci );
262 0 : }
|