Line data Source code
1 : #ifndef HEADER_fd_src_discof_repair_fd_fec_repair_h
2 : #define HEADER_fd_src_discof_repair_fd_fec_repair_h
3 :
4 : /* This provides APIs for orchestrating repair of FEC sets as they are
5 : received from the cluster.
6 :
7 : Concepts:
8 :
9 : - Blocks are aggregations of entries aka. microblocks which are
10 : groupings of txns and are constructed by the block producer (see
11 : fd_pack).
12 :
13 : - Entries are grouped into entry batches by the block producer (see
14 : fd_pack / fd_shredder).
15 :
16 : - Entry batches are divided into chunks known as shreds by the block
17 : producer (see fd_shredder).
18 :
19 : - Shreds are grouped into forward-error-correction sets (FEC sets) by
20 : the block producer (see fd_shredder).
21 :
22 : - Shreds are transmitted to the rest of the cluster via the Turbine
23 : protocol (see fd_shredder / fd_shred).
24 :
25 : - Once enough shreds within a FEC set are received to recover the
26 : entirety of the shred data encoded by that FEC set, the receiver
27 : can "complete" the FEC set (see fd_fec_resolver).
28 :
29 : - If shreds in the FEC set are missing such that it can't complete,
30 : the receiver can use the Repair protocol to request missing shreds
31 : in FEC set (see fd_fec_repair).
32 :
33 : - The current Repair protocol does not support requesting coding
34 : shreds. As a result, some FEC sets might be actually complete
35 : (contain all data shreds). Repair currently hacks around this by
36 : forcing completion but the long-term solution is to add support for
37 : fec_repairing coding shreds via Repair. */
38 :
39 : #include "../../disco/fd_disco_base.h"
40 : #include "../../ballet/reedsol/fd_reedsol.h"
41 : #include "../../tango/fseq/fd_fseq.h"
42 : #include "fd_fec_chainer.h"
43 :
44 :
45 : /* FD_REPAIR_USE_HANDHOLDING: Define this to non-zero at compile time
46 : to turn on additional runtime checks and logging. */
47 :
48 : #ifndef FD_REPAIR_USE_HANDHOLDING
49 : #define FD_REPAIR_USE_HANDHOLDING 1
50 : #endif
51 :
52 : /* fd_fec_intra_idxs is a bit vec that tracks the received data shred
53 : idxs in the FEC set. */
54 :
55 : #define SET_NAME fd_fec_intra_idxs
56 : #define SET_MAX FD_REEDSOL_DATA_SHREDS_MAX
57 : #include "../../util/tmpl/fd_set.c"
58 :
59 : /* fd_fec_intra_t tracks in-progress FEC sets to repair "intra"-FEC set
60 : ie. repairing shreds that are missing within a given FEC set. This
61 : should roughly track the same set of in-progress FEC sets as
62 : fec_set_resolver. */
63 :
64 : struct fd_fec_intra {
65 : ulong key; /* map key. 32 msb = slot, 32 lsb = fec_set_idx */
66 : ulong prev; /* internal use by dlist */
67 : ulong next; /* internal use by map_chain */
68 :
69 : ulong slot; /* slot of the block this fec set is part of */
70 : ushort parent_off; /* parent slot's offset from slot */
71 : uint fec_set_idx; /* index of the first data shred */
72 : long ts; /* timestamp upon receiving the first shred */
73 : ulong recv_cnt; /* count of shreds received so far data + coding */
74 : uint data_cnt; /* count of total data shreds in the FEC set */
75 :
76 : fd_ed25519_sig_t sig; /* Ed25519 sig identifier of the FEC. */
77 :
78 : uint buffered_idx; /* wmk of shreds buffered contiguously, inclusive. Starts at 0 */
79 : uint completes_idx; /* UINT_MAX unless this FEC contains a shred with a batch_complete or slot_complete flag. shred_idx - fec_set_idx */
80 :
81 : uint shred_tile_idx; /* index of the shred tile this FEC set is part of */
82 : ulong deque_ele_idx; /* index of the element in the corresponding dlist */
83 :
84 : fd_fec_intra_idxs_t idxs[fd_fec_intra_idxs_word_cnt]; /* bit vec of rx'd data shred idxs */
85 : };
86 : typedef struct fd_fec_intra fd_fec_intra_t;
87 :
88 : #define POOL_NAME fd_fec_intra_pool
89 0 : #define POOL_T fd_fec_intra_t
90 : #include "../../util/tmpl/fd_pool.c"
91 :
92 : #define MAP_NAME fd_fec_intra_map
93 : #define MAP_ELE_T fd_fec_intra_t
94 : #include "../../util/tmpl/fd_map_chain.c"
95 :
96 : struct fd_fec_order {
97 : ulong key; /* 32 msb slot, 32 lsb fec_set_idx */
98 : ulong prev; /* internal use by dlist */
99 : ulong next; /* internal use by dlist */
100 : };
101 : typedef struct fd_fec_order fd_fec_order_t;
102 :
103 : #define POOL_NAME fd_fec_order_pool
104 0 : #define POOL_T fd_fec_order_t
105 : #include "../../util/tmpl/fd_pool.c"
106 :
107 : #define DLIST_NAME fd_fec_order_dlist
108 : #define DLIST_ELE_T fd_fec_order_t
109 : #include "../../util/tmpl/fd_dlist.c"
110 :
111 : /* fd_fec_repair_t is the top-level structure that maintains an LRU cache
112 : (pool, dlist, map) of the outstanding block slices that need fec_repair.
113 :
114 : The fec_repair order is FIFO so the first slice to go into the LRU will
115 : also be the first to attempt fec_repair. */
116 :
117 : struct __attribute__((aligned(128UL))) fd_fec_repair {
118 : /* These two parameters are tightly coupled with fd_fec_resolver,
119 : because fec_intra aims to exactly mirror the in-progress FEC sets
120 : across all the fec_resolver tiles. fec_max should be the number of
121 : in progress FEC sets each of the fec_resolvers can hold, which is
122 : max_pending_shred_sets + 1. fec_repair will size its intra pool to
123 : be able to hold all FECs across all fec_resolvers, so
124 : fec_max * ( max_pending_shred_sets + 1 ), although may be rounded
125 : up to the nearest power of 2. The dlist is sized to only hold
126 : max_pending_shred_sets + 1, and we create a dlist for every
127 : fec_resolver in order to maintain queue order. */
128 :
129 : ulong fec_max;
130 : ulong shred_tile_cnt;
131 :
132 : fd_fec_intra_t * intra_pool;
133 : fd_fec_intra_map_t * intra_map;
134 :
135 : fd_fec_order_t * * order_pool_lst; /* List[shred_tile_cnt] of pointers to dlist pool */
136 : fd_fec_order_dlist_t * * order_dlist_lst; /* Maintains insertion order of FEC sets in FEC resolver */
137 :
138 : /* fd_fec_order_t * order_pool;
139 : fd_fec_order_dlist_t * order_dlist; */ /* Maintains insertion order of FEC sets in FEC resolver */
140 : };
141 : typedef struct fd_fec_repair fd_fec_repair_t;
142 :
143 : FD_PROTOTYPES_BEGIN
144 :
145 : /* Constructors */
146 :
147 : /* fd_fec_repair_{align,footprint} return the required alignment and
148 : footprint of a memory region suitable for use as fec_repair with up to
149 : slice_max slices and block_max blocks. */
150 :
151 : FD_FN_CONST static inline ulong
152 0 : fd_fec_repair_align( void ) {
153 0 : return alignof(fd_fec_repair_t);
154 0 : }
155 :
156 : FD_FN_CONST static inline ulong
157 0 : fd_fec_repair_footprint( ulong fec_max, uint shred_tile_cnt ) {
158 0 : ulong total_fecs_pow2 = fd_ulong_pow2_up( fec_max * shred_tile_cnt );
159 :
160 0 : FD_TEST( fd_fec_intra_map_footprint( total_fecs_pow2 ) > 0 );
161 0 : FD_TEST( fd_fec_intra_pool_footprint( total_fecs_pow2 ) > 0 );
162 :
163 0 : ulong footprint =
164 0 : FD_LAYOUT_APPEND(
165 0 : FD_LAYOUT_APPEND(
166 0 : FD_LAYOUT_APPEND(
167 0 : FD_LAYOUT_APPEND(
168 0 : FD_LAYOUT_APPEND(
169 0 : FD_LAYOUT_INIT,
170 0 : alignof(fd_fec_repair_t), sizeof(fd_fec_repair_t) ),
171 0 : fd_fec_intra_pool_align (), fd_fec_intra_pool_footprint ( total_fecs_pow2 ) ),
172 0 : fd_fec_intra_map_align (), fd_fec_intra_map_footprint ( total_fecs_pow2 ) ),
173 0 : alignof(ulong), sizeof(fd_fec_order_t*) * shred_tile_cnt ),
174 0 : alignof(ulong), sizeof(fd_fec_order_dlist_t*) * shred_tile_cnt );
175 :
176 0 : for( ulong i = 0UL; i < shred_tile_cnt; i++ ) {
177 0 : footprint = FD_LAYOUT_APPEND( footprint, fd_fec_order_pool_align(), fd_fec_order_pool_footprint( fec_max ) );
178 0 : footprint = FD_LAYOUT_APPEND( footprint, fd_fec_order_dlist_align(), fd_fec_order_dlist_footprint() );
179 0 : }
180 :
181 0 : return FD_LAYOUT_FINI(footprint, fd_fec_repair_align());
182 0 : }
183 :
184 : /* fd_fec_repair_new formats an unused memory region for use as a
185 : fec_repair. mem is a non-NULL pointer to this region in the local
186 : address space with the required footprint and alignment. fec_max is a
187 : very specific number. fec_max should be the maximum number of pending
188 : FECs each fec_resolver can hold (usually max_pending_shred_sets + 2)
189 : We then size the fec_intra map to hold shred_tile_cnt * fec_max. Note
190 : that since fec_max will almost never be a power of 2, but the map
191 : chain cnt must be a power of 2, we size the intra pool to be the next
192 : largest power of 2 > shred_tile_cnt * fec_max, but we can limit the
193 : number of fec_intras to match the fec_resolvers using the fec
194 : ordering dlist. */
195 :
196 : void *
197 : fd_fec_repair_new( void * shmem, ulong fec_max, uint shred_tile_cnt, ulong seed );
198 :
199 : /* fd_fec_repair_join joins the caller to the fec_repair. fec_repair points to the
200 : first byte of the memory region backing the fec_repair in the caller's
201 : address space.
202 :
203 : Returns a pointer in the local address space to fec_repair on success. */
204 :
205 : fd_fec_repair_t *
206 : fd_fec_repair_join( void * fec_repair );
207 :
208 : /* fd_fec_repair_leave leaves a current local join. Returns a pointer to the
209 : underlying shared memory region on success and NULL on failure (logs
210 : details). Reasons for failure include fec_repair is NULL. */
211 :
212 : void *
213 : fd_fec_repair_leave( fd_fec_repair_t const * fec_repair );
214 :
215 : /* fd_fec_repair_delete unformats a memory region used as a fec_repair.
216 : Assumes only the nobody is joined to the region. Returns a
217 : pointer to the underlying shared memory region or NULL if used
218 : obviously in error (e.g. fec_repair is obviously not a fec_repair ... logs
219 : details). The ownership of the memory region is transferred to the
220 : caller. */
221 :
222 : void *
223 : fd_fec_repair_delete( void * fec_repair );
224 :
225 : // /* fd_fec_repair_ele_query returns a pointer to the in-progress FEC keyed
226 : // by slot and fec_set_idx. Returns NULL if not found. */
227 :
228 : FD_FN_PURE static inline fd_fec_intra_t *
229 0 : fd_fec_repair_query( fd_fec_repair_t * fec_repair, ulong slot, uint fec_set_idx ) {
230 0 : ulong key = slot << 32 | (ulong)fec_set_idx;
231 0 : return fd_fec_intra_map_ele_query( fec_repair->intra_map, &key, NULL, fec_repair->intra_pool );
232 0 : }
233 :
234 : // /* fd_fec_repair_ele_insert inserts and returns a new in-progress FEC set
235 : // keyed by slot and fec_set_idx into the map. Returns NULL if the map
236 : // is full. */
237 :
238 : static inline void
239 0 : fd_fec_repair_remove( fd_fec_repair_t * fec_repair, ulong key ) {
240 0 : FD_LOG_NOTICE(( "remove %lu %u", key >> 32, (uint)key ));
241 0 : fd_fec_intra_t * fec = fd_fec_intra_map_ele_query( fec_repair->intra_map, &key, NULL, fec_repair->intra_pool );
242 0 : FD_TEST( fec );
243 0 :
244 0 : uint shred_tile_idx = fec->shred_tile_idx;
245 0 : ulong deque_ele_idx = fec->deque_ele_idx;
246 0 :
247 0 : fd_fec_intra_t * ele = fd_fec_intra_map_ele_remove( fec_repair->intra_map, &key, NULL, fec_repair->intra_pool ); /* cannot fail */
248 0 : fd_fec_intra_pool_ele_release( fec_repair->intra_pool, ele ); /* cannot fail, hopefully */
249 0 :
250 0 : /* Queue removal */
251 0 :
252 0 : fd_fec_order_dlist_t * fec_order_dlist = fec_repair->order_dlist_lst[shred_tile_idx];
253 0 : fd_fec_order_t * fec_order_pool = fec_repair->order_pool_lst[shred_tile_idx];
254 0 :
255 0 : fd_fec_order_dlist_idx_remove( fec_order_dlist, deque_ele_idx, fec_order_pool );
256 0 : fd_fec_order_pool_idx_release( fec_order_pool, deque_ele_idx );
257 0 : }
258 :
259 : static inline fd_fec_intra_t *
260 : fd_fec_repair_insert( fd_fec_repair_t * fec_repair,
261 : ulong slot,
262 : uint fec_set_idx,
263 : uint shred_idx_or_data_cnt,
264 : int completes,
265 : int is_code,
266 0 : uint shred_tile_idx ) {
267 0 : FD_TEST( shred_tile_idx < fec_repair->shred_tile_cnt );
268 0 : FD_LOG_NOTICE(( "insert %lu %u", slot, fec_set_idx ));
269 :
270 0 : ulong key = slot << 32 | (ulong)fec_set_idx;
271 0 : fd_fec_intra_t * fec = fd_fec_intra_map_ele_query( fec_repair->intra_map, &key, NULL, fec_repair->intra_pool );
272 0 : if( FD_UNLIKELY( !fec ) ) {
273 :
274 : /* Check if the fec_resolver of shred_tile_idx has evicted any
275 : incomplete FECs. Deque ordering insertion */
276 :
277 0 : fd_fec_order_dlist_t * fec_order_dlist = fec_repair->order_dlist_lst[shred_tile_idx];
278 0 : fd_fec_order_t * fec_order_pool = fec_repair->order_pool_lst[shred_tile_idx];
279 :
280 0 : if( !fd_fec_order_pool_free( fec_order_pool ) ) {
281 : /* fec_resolver must have evicted something from their free list. */
282 0 : fd_fec_order_t * pop_ele = fd_fec_order_dlist_ele_pop_head( fec_order_dlist, fec_order_pool );
283 0 : fd_fec_order_pool_ele_release( fec_order_pool, pop_ele );
284 :
285 0 : fd_fec_intra_t * ele = fd_fec_intra_map_ele_remove( fec_repair->intra_map, &pop_ele->key, NULL, fec_repair->intra_pool ); /* cannot fail */
286 0 : fd_fec_intra_pool_ele_release( fec_repair->intra_pool, ele ); /* cannot fail, hopefully */
287 : //FD_LOG_WARNING(( "shred_tile:%u overflowing, popping from queue, slot %lu, fec %u", shred_tile_idx, pop_ele->key >> 32, (uint)pop_ele->key ));
288 0 : }
289 0 : fd_fec_order_t * fec_order = fd_fec_order_pool_ele_acquire( fec_order_pool );
290 0 : fec_order->key = key;
291 0 : fd_fec_order_dlist_ele_push_tail( fec_order_dlist, fec_order, fec_order_pool ); /* cannot fail */
292 :
293 : /* Map insertion */
294 :
295 0 : if( FD_UNLIKELY( !fd_fec_intra_pool_free( fec_repair->intra_pool ) ) ) { /* we definitely should have a free element */
296 0 : FD_LOG_ERR(( "fec_repair pool full. Almost certainly signifies fec_repair corruption, as the size of fec_order_pool <= fec_intra_pool." ));
297 0 : }
298 :
299 0 : fec = fd_fec_intra_pool_ele_acquire( fec_repair->intra_pool );
300 : //FD_LOG_INFO(("Inserting shred into fec repair map, slot %lu, fec %u. %lu/%lu eles used. On tile %u, queue usage %lu/%lu", slot, fec_set_idx,
301 : //fd_fec_intra_pool_used( fec_repair->intra_pool ), fd_fec_intra_pool_max( fec_repair->intra_pool ),
302 : //shred_tile_idx, fd_fec_order_pool_used( fec_order_pool ), fd_fec_order_pool_max( fec_order_pool ) ));
303 :
304 0 : fec->key = key;
305 0 : fec->slot = slot;
306 0 : fec->fec_set_idx = fec_set_idx;
307 0 : fec->ts = fd_log_wallclock();
308 0 : fec->recv_cnt = 0;
309 0 : fec->data_cnt = 0;
310 0 : fec->completes_idx = UINT_MAX;
311 0 : fec->buffered_idx = UINT_MAX;
312 0 : fec->shred_tile_idx = shred_tile_idx;
313 0 : fec->deque_ele_idx = fd_fec_order_pool_idx( fec_order_pool, fec_order );
314 0 : memset( fec->sig, 0, sizeof(fd_ed25519_sig_t));
315 0 : fd_fec_intra_idxs_null( fec->idxs );
316 0 : fd_fec_intra_map_ele_insert( fec_repair->intra_map, fec, fec_repair->intra_pool ); /* cannot fail */
317 0 : }
318 :
319 0 : if( FD_UNLIKELY( is_code ) ) {
320 0 : fec->data_cnt = shred_idx_or_data_cnt;
321 0 : fec->completes_idx = fec->data_cnt - 1;
322 0 : } else {
323 0 : uint shred_idx = shred_idx_or_data_cnt;
324 0 : fd_fec_intra_idxs_insert( fec->idxs, shred_idx - fec_set_idx );
325 0 : if( FD_UNLIKELY( completes ) ) fec->completes_idx = shred_idx - fec_set_idx;
326 0 : }
327 :
328 :
329 0 : fec->recv_cnt++;
330 : /* advanced buffered if possible */
331 0 : for( uint i = fec->buffered_idx + 1; i <= fec->completes_idx; i++ ) {
332 0 : if( fd_fec_intra_idxs_test( fec->idxs, i ) ) {
333 0 : fec->buffered_idx = i;
334 0 : } else {
335 0 : break;
336 0 : }
337 0 : }
338 :
339 0 : return fec;
340 0 : }
341 :
342 : int
343 : check_blind_fec_completed( fd_fec_repair_t const * fec_repair,
344 : fd_fec_chainer_t * fec_chainer,
345 : ulong slot,
346 : uint fec_set_idx );
347 : int
348 : check_set_blind_fec_completed( fd_fec_repair_t * fec_repair,
349 : fd_fec_chainer_t * fec_chainer,
350 : ulong slot,
351 : uint fec_set_idx );
352 :
353 :
354 : // /* fd_fec_repair_ele_query removes an in-progress FEC set from the map.
355 : // Returns NULL if no fec set keyed by slot and fec_set_idx is found. */
356 :
357 : // static inline void
358 : // fd_fec_repair_ele_remove( fd_fec_repair_t * fec_repair, ulong slot, uint fec_set_idx ) {
359 : // ulong key = slot << 32 | (ulong)fec_set_idx;
360 : // fd_fec_intra_t * fec = fd_fec_repair_ele_map_query( fec_repair->map, key, NULL );
361 : // FD_TEST( fec );
362 : // fd_fec_repair_ele_map_remove( fec_repair->map, fec ); /* cannot fail */
363 : // }
364 : FD_PROTOTYPES_END
365 :
366 : #endif /* HEADER_fd_src_discof_repair_fd_fec_repair_h */
|