Line data Source code
1 : #include <stdio.h>
2 : #include <stdlib.h>
3 : #include <signal.h>
4 : #include <errno.h>
5 : #include <unistd.h>
6 : #include <netdb.h>
7 : #include <sys/socket.h>
8 : #include <netinet/in.h>
9 : #include <arpa/inet.h>
10 : #include "fd_geyser.h"
11 : #include "../../funk/fd_funk_filemap.h"
12 : #include "../../tango/mcache/fd_mcache.h"
13 : #include "../../flamenco/runtime/fd_acc_mgr.h"
14 : #include "../../util/wksp/fd_wksp_private.h"
15 : #include "../topo/fd_topo.h"
16 :
17 : #define SHAM_LINK_CONTEXT fd_geyser_t
18 : #define SHAM_LINK_STATE fd_replay_notif_msg_t
19 : #define SHAM_LINK_NAME replay_sham_link
20 : #include "sham_link.h"
21 :
22 : #define SHAM_LINK_CONTEXT fd_geyser_t
23 : #define SHAM_LINK_STATE fd_stake_ci_t
24 : #define SHAM_LINK_NAME stake_sham_link
25 : #include "sham_link.h"
26 :
27 : struct fd_geyser {
28 : fd_funk_t * funk;
29 : fd_blockstore_t * blockstore;
30 : int blockstore_fd;
31 : fd_stake_ci_t * stake_ci;
32 : replay_sham_link_t * rep_notify;
33 : stake_sham_link_t * stake_notify;
34 :
35 : void * fun_arg;
36 : fd_geyser_execute_fun execute_fun; /* Slot numbers, bank hash */
37 : fd_geyser_block_fun block_fun; /* Raw block data, additional metadata */
38 : fd_geyser_entry_fun entry_fun; /* Every entry/microblock */
39 : fd_geyser_txn_fun txn_fun; /* executed individual transaction */
40 : fd_geyser_block_done_fun block_done_fun; /* Called after block specific updates are done */
41 :
42 : fd_geyser_acct_fun acct_fun; /* Account written */
43 : };
44 :
45 : ulong
46 0 : fd_geyser_footprint( void ) {
47 0 : ulong l = FD_LAYOUT_INIT;
48 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_geyser_t), sizeof(fd_geyser_t) );
49 0 : l = FD_LAYOUT_APPEND( l, fd_stake_ci_align(), fd_stake_ci_footprint() );
50 0 : l = FD_LAYOUT_APPEND( l, replay_sham_link_align(), replay_sham_link_footprint() );
51 0 : l = FD_LAYOUT_APPEND( l, stake_sham_link_align(), stake_sham_link_footprint() );
52 0 : return FD_LAYOUT_FINI( l, 1UL );
53 0 : }
54 :
55 : ulong
56 0 : fd_geyser_align( void ) {
57 0 : return alignof(fd_geyser_t);
58 0 : }
59 :
60 : void *
61 0 : fd_geyser_new( void * mem, fd_geyser_args_t * args ) {
62 0 : FD_SCRATCH_ALLOC_INIT( l, mem );
63 0 : fd_geyser_t * self = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_geyser_t), sizeof(fd_geyser_t) );
64 0 : void * stake_ci_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_stake_ci_align(), fd_stake_ci_footprint() );
65 0 : void * rep_notify_mem = FD_SCRATCH_ALLOC_APPEND( l, replay_sham_link_align(), replay_sham_link_footprint() );
66 0 : void * stake_notify_mem = FD_SCRATCH_ALLOC_APPEND( l, stake_sham_link_align(), stake_sham_link_footprint() );
67 0 : ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, 1UL );
68 0 : FD_TEST( scratch_top <= (ulong)mem + fd_geyser_footprint() );
69 :
70 0 : self->funk = fd_funk_open_file( args->funk_file, 1, 0, 0, 0, 0, FD_FUNK_READONLY, NULL );
71 0 : if( self->funk == NULL ) {
72 0 : FD_LOG_ERR(( "failed to join a funky" ));
73 0 : }
74 :
75 0 : fd_wksp_t * wksp = fd_wksp_attach( args->blockstore_wksp );
76 0 : if( FD_UNLIKELY( !wksp ) )
77 0 : FD_LOG_ERR(( "unable to attach to \"%s\"\n\tprobably does not exist or bad permissions", args->blockstore_wksp ));
78 0 : fd_wksp_tag_query_info_t info;
79 0 : ulong tag = 1;
80 0 : if( fd_wksp_tag_query( wksp, &tag, 1, &info, 1 ) <= 0 ) {
81 0 : FD_LOG_ERR(( "workspace \"%s\" does not contain a blockstore", args->blockstore_wksp ));
82 0 : }
83 0 : void * shmem = fd_wksp_laddr_fast( wksp, info.gaddr_lo );
84 0 : self->blockstore = fd_blockstore_join( shmem );
85 0 : if( self->blockstore == NULL ) {
86 0 : FD_LOG_ERR(( "failed to join a blockstore" ));
87 0 : }
88 0 : self->blockstore_fd = args->blockstore_fd;
89 0 : FD_LOG_NOTICE(( "blockstore has slot root=%lu", self->blockstore->smr ));
90 0 : fd_wksp_mprotect( wksp, 1 );
91 :
92 0 : fd_pubkey_t identity_key[1]; /* Just the public key */
93 0 : memset( identity_key, 0xa5, sizeof(fd_pubkey_t) );
94 0 : self->stake_ci = fd_stake_ci_join( fd_stake_ci_new( stake_ci_mem, identity_key ) );
95 :
96 0 : self->rep_notify = replay_sham_link_new( rep_notify_mem, "fd1_replay_notif.wksp" );
97 0 : self->stake_notify = stake_sham_link_new( stake_notify_mem, "fd1_stake_out.wksp" );
98 :
99 0 : replay_sham_link_start( self->rep_notify );
100 0 : stake_sham_link_start( self->stake_notify );
101 :
102 0 : self->execute_fun = args->execute_fun;
103 0 : self->block_fun = args->block_fun;
104 0 : self->block_done_fun = args->block_done_fun;
105 0 : self->entry_fun = args->entry_fun;
106 0 : self->txn_fun = args->txn_fun;
107 0 : self->acct_fun = args->acct_fun;
108 0 : self->fun_arg = args->fun_arg;
109 :
110 0 : return mem;
111 0 : }
112 :
113 : fd_geyser_t *
114 0 : fd_geyser_join( void * mem ) {
115 0 : FD_SCRATCH_ALLOC_INIT( l, mem );
116 0 : return FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_geyser_t), sizeof(fd_geyser_t) );
117 0 : }
118 :
119 : void *
120 0 : fd_geyser_leave( fd_geyser_t * self ) {
121 0 : return self;
122 0 : }
123 :
124 : void *
125 0 : fd_geyser_delete( void * mem ) {
126 0 : return mem;
127 0 : }
128 :
129 : static void
130 0 : fd_geyser_scan_txns( fd_geyser_t * ctx, ulong slotn, uchar * data, ulong sz ) {
131 0 : ulong blockoff = 0;
132 0 : while( blockoff < sz ) {
133 0 : if( blockoff + sizeof( ulong ) > sz ) FD_LOG_ERR(( "premature end of block" ));
134 0 : ulong mcount = FD_LOAD( ulong, (const uchar *)data + blockoff );
135 0 : blockoff += sizeof( ulong );
136 :
137 : /* Loop across microblocks */
138 0 : for( ulong mblk = 0; mblk < mcount; ++mblk ) {
139 0 : if( blockoff + sizeof( fd_microblock_hdr_t ) > sz ) {
140 0 : FD_LOG_WARNING(( "premature end of block" ));
141 0 : return;
142 0 : }
143 0 : fd_microblock_hdr_t * hdr = (fd_microblock_hdr_t *)( (const uchar *)data + blockoff );
144 0 : blockoff += sizeof( fd_microblock_hdr_t );
145 :
146 0 : if( ctx->entry_fun != NULL ) {
147 0 : (*ctx->entry_fun)( slotn, hdr, ctx->fun_arg );
148 0 : }
149 :
150 : /* Loop across transactions */
151 0 : for( ulong txn_idx = 0; txn_idx < hdr->txn_cnt; txn_idx++ ) {
152 0 : uchar txn_out[FD_TXN_MAX_SZ];
153 0 : uchar const * raw = (uchar const *)data + blockoff;
154 0 : ulong pay_sz = 0;
155 0 : ulong txn_sz = fd_txn_parse_core( (uchar const *)raw,
156 0 : fd_ulong_min( sz - blockoff, FD_TXN_MTU ),
157 0 : txn_out,
158 0 : NULL,
159 0 : &pay_sz );
160 0 : if( txn_sz == 0 || txn_sz > FD_TXN_MTU ) {
161 0 : FD_LOG_WARNING(( "failed to parse transaction %lu in microblock %lu in slot %lu. txn size: %lu",
162 0 : txn_idx,
163 0 : mblk,
164 0 : slotn,
165 0 : txn_sz ));
166 0 : return;
167 0 : }
168 0 : fd_txn_t const * txn = (fd_txn_t const *)txn_out;
169 :
170 0 : if( ctx->txn_fun != NULL ) {
171 0 : (*ctx->txn_fun)( slotn, txn, raw, txn_sz, ctx->fun_arg );
172 0 : }
173 :
174 0 : blockoff += pay_sz;
175 0 : }
176 0 : }
177 0 : }
178 0 : }
179 :
180 : void
181 0 : fd_geyser_replay_block( fd_geyser_t * ctx, ulong slotn ) {
182 0 : if( ctx->block_fun != NULL || ctx->entry_fun != NULL || ctx->txn_fun != NULL ) {
183 0 : FD_SCRATCH_SCOPE_BEGIN {
184 0 : fd_block_map_t meta[1];
185 0 : fd_block_rewards_t rewards[1];
186 0 : fd_hash_t parent_hash;
187 0 : uchar * blk_data;
188 0 : ulong blk_sz;
189 0 : if( fd_blockstore_block_data_query_volatile( ctx->blockstore, ctx->blockstore_fd, slotn, fd_scratch_virtual(), &parent_hash, meta, rewards, &blk_data, &blk_sz ) ) {
190 0 : FD_LOG_WARNING(( "failed to retrieve block for slot %lu", slotn ));
191 0 : return;
192 0 : }
193 0 : if( ctx->block_fun != NULL ) {
194 0 : (*ctx->block_fun)( slotn, meta, &parent_hash, blk_data, blk_sz, ctx->fun_arg );
195 0 : }
196 0 : if( ctx->entry_fun != NULL || ctx->txn_fun != NULL ) {
197 0 : fd_geyser_scan_txns( ctx, slotn, blk_data, blk_sz );
198 0 : }
199 0 : } FD_SCRATCH_SCOPE_END;
200 0 : }
201 0 : if( ctx->block_done_fun != NULL ) {
202 0 : ( *ctx->block_done_fun ) ( slotn, ctx->fun_arg );
203 0 : }
204 0 : }
205 :
206 : static void
207 0 : replay_sham_link_during_frag( fd_geyser_t * ctx, fd_replay_notif_msg_t * state, void const * msg, int sz ) {
208 0 : (void)ctx;
209 0 : FD_TEST( sz == (int)sizeof(fd_replay_notif_msg_t) );
210 0 : fd_memcpy(state, msg, sizeof(fd_replay_notif_msg_t));
211 0 : }
212 :
213 : static void
214 0 : replay_sham_link_after_frag(fd_geyser_t * ctx, fd_replay_notif_msg_t * msg) {
215 0 : (void)ctx;
216 0 : if( msg->type == FD_REPLAY_SLOT_TYPE ) {
217 0 : ulong slotn = msg->slot_exec.slot;
218 0 : if( ctx->execute_fun != NULL ) {
219 0 : ( *ctx->execute_fun ) ( msg, ctx->fun_arg );
220 0 : }
221 0 : fd_geyser_replay_block( ctx, slotn );
222 :
223 0 : } else if( msg->type == FD_REPLAY_ACCTS_TYPE ) {
224 0 : if( ctx->acct_fun != NULL ) {
225 0 : for( uint i = 0; i < msg->accts.accts_cnt; ++i ) {
226 0 : FD_SCRATCH_SCOPE_BEGIN {
227 0 : fd_pubkey_t addr;
228 0 : fd_memcpy(&addr, msg->accts.accts[i].id, 32U );
229 0 : fd_funk_rec_key_t key = fd_acc_funk_key( &addr );
230 0 : ulong datalen;
231 0 : void * data = fd_funk_rec_query_xid_safe( ctx->funk, &key, &msg->accts.funk_xid, fd_scratch_virtual(), &datalen );
232 0 : if( data ) {
233 0 : fd_account_meta_t const * meta = fd_type_pun_const( data );
234 0 : (*ctx->acct_fun)( msg->accts.funk_xid.ul[0], msg->accts.sig, &addr, meta, (uchar*)data + meta->hlen, meta->dlen, ctx->fun_arg );
235 0 : }
236 0 : } FD_SCRATCH_SCOPE_END;
237 0 : }
238 0 : }
239 0 : }
240 0 : }
241 :
242 : static void
243 0 : stake_sham_link_during_frag( fd_geyser_t * ctx, fd_stake_ci_t * state, void const * msg, int sz ) {
244 0 : (void)ctx; (void)sz;
245 0 : fd_stake_ci_stake_msg_init( state, msg );
246 0 : }
247 :
248 : static void
249 0 : stake_sham_link_after_frag(fd_geyser_t * ctx, fd_stake_ci_t * state) {
250 0 : (void)ctx;
251 0 : fd_stake_ci_stake_msg_fini( state );
252 0 : }
253 :
254 : void
255 0 : fd_geyser_poll( fd_geyser_t * self ) {
256 0 : fd_replay_notif_msg_t msg;
257 0 : replay_sham_link_poll( self->rep_notify, self, &msg );
258 :
259 0 : stake_sham_link_poll( self->stake_notify, self, self->stake_ci );
260 0 : }
|