Line data Source code
1 : #include "fd_tpool.h"
2 :
3 : struct fd_tpool_private_worker_cfg {
4 : fd_tpool_t * tpool;
5 : ulong tile_idx;
6 : void * scratch;
7 : ulong scratch_sz;
8 : };
9 :
10 : typedef struct fd_tpool_private_worker_cfg fd_tpool_private_worker_cfg_t;
11 :
12 : /* This is not static to allow tile 0 to attach to this if desired. */
13 :
14 : FD_TL ulong fd_tpool_private_scratch_frame[ FD_TPOOL_WORKER_SCRATCH_DEPTH ] __attribute((aligned(FD_SCRATCH_FMEM_ALIGN)));
15 :
16 : static int
17 : fd_tpool_private_worker( int argc,
18 84 : char ** argv ) {
19 84 : ulong worker_idx = (ulong)(uint)argc;
20 84 : fd_tpool_private_worker_cfg_t * cfg = (fd_tpool_private_worker_cfg_t *)argv;
21 :
22 84 : fd_tpool_t * tpool = cfg->tpool;
23 84 : ulong tile_idx = cfg->tile_idx;
24 84 : void * scratch = cfg->scratch;
25 84 : ulong scratch_sz = cfg->scratch_sz;
26 :
27 84 : fd_tpool_private_worker_t worker[1];
28 84 : FD_COMPILER_MFENCE();
29 84 : FD_VOLATILE( worker->state ) = FD_TPOOL_WORKER_STATE_BOOT;
30 84 : FD_COMPILER_MFENCE();
31 :
32 84 : worker->tile_idx = (uint)tile_idx;
33 84 : worker->scratch = scratch;
34 84 : worker->scratch_sz = scratch_sz;
35 :
36 84 : if( scratch_sz ) fd_scratch_attach( scratch, fd_tpool_private_scratch_frame, scratch_sz, FD_TPOOL_WORKER_SCRATCH_DEPTH );
37 :
38 84 : FD_COMPILER_MFENCE();
39 84 : FD_VOLATILE( worker->state ) = FD_TPOOL_WORKER_STATE_IDLE;
40 84 : FD_COMPILER_MFENCE();
41 84 : FD_VOLATILE( fd_tpool_private_worker( tpool )[ worker_idx ] ) = worker;
42 84 : FD_COMPILER_MFENCE();
43 :
44 1974323373 : for(;;) {
45 :
46 : /* We are IDLE ... see what we should do next */
47 :
48 1974323373 : int state = FD_VOLATILE_CONST( worker->state );
49 2024685412 : if( FD_UNLIKELY( state!=FD_TPOOL_WORKER_STATE_EXEC ) ) {
50 2024685412 : if( FD_UNLIKELY( state!=FD_TPOOL_WORKER_STATE_IDLE ) ) break;
51 2024685328 : FD_SPIN_PAUSE();
52 2024685328 : continue;
53 2024685412 : }
54 :
55 : /* We are EXEC ... do the task and then transition to IDLE */
56 :
57 5 : fd_tpool_task_t task = FD_VOLATILE_CONST( worker->task );
58 :
59 5 : void * task_tpool = FD_VOLATILE_CONST( worker->task_tpool );
60 5 : ulong task_t0 = FD_VOLATILE_CONST( worker->task_t0 ); ulong task_t1 = FD_VOLATILE_CONST( worker->task_t1 );
61 5 : void * task_args = FD_VOLATILE_CONST( worker->task_args );
62 5 : void * task_reduce = FD_VOLATILE_CONST( worker->task_reduce ); ulong task_stride = FD_VOLATILE_CONST( worker->task_stride );
63 5 : ulong task_l0 = FD_VOLATILE_CONST( worker->task_l0 ); ulong task_l1 = FD_VOLATILE_CONST( worker->task_l1 );
64 5 : ulong task_m0 = FD_VOLATILE_CONST( worker->task_m0 ); ulong task_m1 = FD_VOLATILE_CONST( worker->task_m1 );
65 5 : ulong task_n0 = FD_VOLATILE_CONST( worker->task_n0 ); ulong task_n1 = FD_VOLATILE_CONST( worker->task_n1 );
66 :
67 5 : try {
68 5 : task( task_tpool,task_t0,task_t1, task_args, task_reduce,task_stride, task_l0,task_l1, task_m0,task_m1, task_n0,task_n1 );
69 5 : } catch( ... ) {
70 0 : FD_LOG_WARNING(( "uncaught exception; attempting to continue" ));
71 0 : }
72 :
73 5 : FD_COMPILER_MFENCE();
74 29554744 : FD_VOLATILE( worker->state ) = FD_TPOOL_WORKER_STATE_IDLE;
75 29554744 : FD_COMPILER_MFENCE();
76 29554744 : }
77 :
78 : /* state is HALT, clean up and then reset back to BOOT */
79 :
80 79916867 : if( scratch_sz ) fd_scratch_detach( NULL );
81 :
82 79916867 : FD_COMPILER_MFENCE();
83 79916867 : FD_VOLATILE( worker->state ) = FD_TPOOL_WORKER_STATE_BOOT;
84 79916867 : FD_COMPILER_MFENCE();
85 79916867 : return 0;
86 84 : }
87 :
88 : ulong
89 3 : fd_tpool_align( void ) {
90 3 : return FD_TPOOL_ALIGN;
91 3 : }
92 :
93 : ulong
94 3003093 : fd_tpool_footprint( ulong worker_max ) {
95 3003093 : if( FD_UNLIKELY( !((1UL<=worker_max) & (worker_max<=FD_TILE_MAX)) ) ) return 0UL;
96 474180 : return fd_ulong_align_up( sizeof(fd_tpool_private_worker_t) +
97 474180 : sizeof(fd_tpool_t) + worker_max*sizeof(fd_tpool_private_worker_t *), FD_TPOOL_ALIGN );
98 3003093 : }
99 :
100 : fd_tpool_t *
101 : fd_tpool_init( void * mem,
102 3099 : ulong worker_max ) {
103 :
104 3099 : if( FD_UNLIKELY( !mem ) ) {
105 3 : FD_LOG_WARNING(( "NULL mem" ));
106 3 : return NULL;
107 3 : }
108 :
109 3096 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, fd_tpool_align() ) ) ) {
110 3 : FD_LOG_WARNING(( "bad alignment" ));
111 3 : return NULL;
112 3 : }
113 :
114 3093 : ulong footprint = fd_tpool_footprint( worker_max );
115 3093 : if( FD_UNLIKELY( !footprint ) ) {
116 6 : FD_LOG_WARNING(( "bad worker_max" ));
117 6 : return NULL;
118 6 : }
119 :
120 3087 : fd_memset( mem, 0, footprint );
121 :
122 3087 : fd_tpool_private_worker_t * worker0 = (fd_tpool_private_worker_t *)mem;
123 3087 : FD_COMPILER_MFENCE();
124 3087 : FD_VOLATILE( worker0->state ) = FD_TPOOL_WORKER_STATE_EXEC;
125 3087 : FD_COMPILER_MFENCE();
126 :
127 3087 : fd_tpool_t * tpool = (fd_tpool_t *)(worker0+1);
128 3087 : tpool->worker_max = worker_max;
129 3087 : tpool->worker_cnt = 1UL;
130 :
131 3087 : FD_COMPILER_MFENCE();
132 3087 : FD_VOLATILE( fd_tpool_private_worker( tpool )[0] ) = worker0;
133 3087 : FD_COMPILER_MFENCE();
134 :
135 3087 : return tpool;
136 3093 : }
137 :
138 : void *
139 3090 : fd_tpool_fini( fd_tpool_t * tpool ) {
140 :
141 3090 : if( FD_UNLIKELY( !tpool ) ) {
142 3 : FD_LOG_WARNING(( "NULL tpool" ));
143 3 : return NULL;
144 3 : }
145 :
146 3150 : while( fd_tpool_worker_cnt( tpool )>1UL )
147 63 : if( FD_UNLIKELY( !fd_tpool_worker_pop( tpool ) ) ) {
148 0 : FD_LOG_WARNING(( "fd_tpool_worker_pop failed" ));
149 0 : return NULL;
150 0 : }
151 :
152 3087 : return (void *)fd_tpool_private_worker0( tpool );
153 3087 : }
154 :
155 : fd_tpool_t *
156 : fd_tpool_worker_push( fd_tpool_t * tpool,
157 : ulong tile_idx,
158 : void * scratch,
159 126 : ulong scratch_sz ) {
160 :
161 126 : if( FD_UNLIKELY( !tpool ) ) {
162 3 : FD_LOG_WARNING(( "NULL tpool" ));
163 3 : return NULL;
164 3 : }
165 :
166 123 : if( FD_UNLIKELY( !tile_idx ) ) {
167 3 : FD_LOG_WARNING(( "cannot push tile_idx 0" ));
168 3 : return NULL;
169 3 : }
170 :
171 120 : if( FD_UNLIKELY( tile_idx==fd_tile_idx() ) ) {
172 3 : FD_LOG_WARNING(( "cannot push self" ));
173 3 : return NULL;
174 3 : }
175 :
176 117 : if( FD_UNLIKELY( tile_idx>=fd_tile_cnt() ) ) {
177 3 : FD_LOG_WARNING(( "invalid tile_idx" ));
178 3 : return NULL;
179 3 : }
180 :
181 114 : if( FD_UNLIKELY( scratch_sz ) ) {
182 6 : if( FD_UNLIKELY( !scratch ) ) {
183 3 : FD_LOG_WARNING(( "NULL scratch" ));
184 3 : return NULL;
185 3 : }
186 :
187 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)scratch, FD_SCRATCH_SMEM_ALIGN ) ) ) {
188 3 : FD_LOG_WARNING(( "misaligned scratch" ));
189 3 : return NULL;
190 3 : }
191 3 : }
192 :
193 108 : fd_tpool_private_worker_t ** worker = fd_tpool_private_worker( tpool );
194 108 : ulong worker_cnt = tpool->worker_cnt;
195 :
196 108 : if( FD_UNLIKELY( worker_cnt>=tpool->worker_max ) ) {
197 3 : FD_LOG_WARNING(( "too many workers" ));
198 3 : return NULL;
199 3 : }
200 :
201 507 : for( ulong worker_idx=0UL; worker_idx<worker_cnt; worker_idx++ )
202 420 : if( worker[ worker_idx ]->tile_idx==tile_idx ) {
203 18 : FD_LOG_WARNING(( "tile_idx already added to tpool" ));
204 18 : return NULL;
205 18 : }
206 :
207 87 : fd_tpool_private_worker_cfg_t cfg[1];
208 :
209 87 : cfg->tpool = tpool;
210 87 : cfg->tile_idx = tile_idx;
211 87 : cfg->scratch = scratch;
212 87 : cfg->scratch_sz = scratch_sz;
213 :
214 87 : int argc = (int)(uint)worker_cnt;
215 87 : char ** argv = (char **)fd_type_pun( cfg );
216 :
217 87 : FD_COMPILER_MFENCE();
218 87 : FD_VOLATILE( worker[ worker_cnt ] ) = NULL;
219 87 : FD_COMPILER_MFENCE();
220 :
221 87 : if( FD_UNLIKELY( !fd_tile_exec_new( tile_idx, fd_tpool_private_worker, argc, argv ) ) ) {
222 3 : FD_LOG_WARNING(( "fd_tile_exec_new failed (tile probably already in use)" ));
223 3 : return NULL;
224 3 : }
225 :
226 3747 : while( !FD_VOLATILE_CONST( worker[ worker_cnt ] ) ) FD_SPIN_PAUSE();
227 :
228 84 : tpool->worker_cnt = worker_cnt + 1UL;
229 84 : return tpool;
230 87 : }
231 :
232 : fd_tpool_t *
233 93 : fd_tpool_worker_pop( fd_tpool_t * tpool ) {
234 :
235 93 : if( FD_UNLIKELY( !tpool ) ) {
236 3 : FD_LOG_WARNING(( "NULL tpool" ));
237 3 : return NULL;
238 3 : }
239 :
240 90 : ulong worker_cnt = tpool->worker_cnt;
241 90 : if( FD_UNLIKELY( worker_cnt<=1UL ) ) {
242 3 : FD_LOG_WARNING(( "no workers to pop" ));
243 3 : return NULL;
244 3 : }
245 :
246 87 : fd_tpool_private_worker_t * worker = fd_tpool_private_worker( tpool )[ worker_cnt-1UL ];
247 87 : fd_tile_exec_t * exec = fd_tile_exec( worker->tile_idx );
248 87 : int volatile * vstate = (int volatile *)&(worker->state);
249 87 : int state;
250 :
251 : /* Testing for IDLE isn't strictly necessary given requirements
252 : to use this but can help catch user errors. Likewise,
253 : FD_ATOMIC_CAS isn't strictly necessary given correct operation but
254 : can more robustly catch such errors. */
255 :
256 87 : # if FD_HAS_ATOMIC
257 :
258 87 : FD_COMPILER_MFENCE();
259 87 : state = FD_ATOMIC_CAS( vstate, FD_TPOOL_WORKER_STATE_IDLE, FD_TPOOL_WORKER_STATE_HALT );
260 87 : FD_COMPILER_MFENCE();
261 87 : if( FD_UNLIKELY( state!=FD_TPOOL_WORKER_STATE_IDLE ) ) {
262 3 : FD_LOG_WARNING(( "worker to pop is not idle (%i-%s)", state, fd_tpool_worker_state_cstr( state ) ));
263 3 : return NULL;
264 3 : }
265 :
266 : # else
267 :
268 : FD_COMPILER_MFENCE();
269 : state = *vstate;
270 : FD_COMPILER_MFENCE();
271 : if( FD_UNLIKELY( state!=FD_TPOOL_WORKER_STATE_IDLE ) ) {
272 : FD_LOG_WARNING(( "worker to pop is not idle (%i-%s)", state, fd_tpool_worker_state_cstr( state ) ));
273 : return NULL;
274 : }
275 : FD_COMPILER_MFENCE();
276 : *vstate = FD_TPOOL_WORKER_STATE_HALT;
277 : FD_COMPILER_MFENCE();
278 :
279 : # endif
280 :
281 : /* Wait for the worker to shutdown */
282 :
283 84 : int ret;
284 84 : char const * err = fd_tile_exec_delete( exec, &ret );
285 84 : if( FD_UNLIKELY( err ) ) FD_LOG_WARNING(( "tile err \"%s\" unexpected; attempting to continue", err ));
286 84 : else if( FD_UNLIKELY( ret ) ) FD_LOG_WARNING(( "tile ret %i unexpected; attempting to continue", ret ));
287 :
288 84 : tpool->worker_cnt = worker_cnt-1UL;
289 84 : return tpool;
290 87 : }
291 :
292 : #define FD_TPOOL_EXEC_ALL_IMPL_HDR(style) \
293 : void \
294 : fd_tpool_private_exec_all_##style##_node( void * _node_tpool, \
295 : ulong node_t0, ulong node_t1, \
296 : void * args, \
297 : void * reduce, ulong stride, \
298 : ulong l0, ulong l1, \
299 : ulong _task, ulong _tpool, \
300 11537481 : ulong t0, ulong t1 ) { \
301 11537481 : fd_tpool_t * node_tpool = (fd_tpool_t * )_node_tpool; \
302 11537481 : fd_tpool_task_t task = (fd_tpool_task_t)_task; \
303 11537481 : ulong wait_cnt = 0UL; \
304 11537481 : ushort wait_child[16]; /* Assumes tpool_cnt<=65536 */ \
305 19246502 : for(;;) { \
306 19246502 : ulong node_t_cnt = node_t1 - node_t0; \
307 19246502 : if( node_t_cnt<=1L ) break; \
308 19246502 : ulong node_ts = node_t0 + fd_tpool_private_split( node_t_cnt ); \
309 8661782 : fd_tpool_exec( node_tpool, node_ts, fd_tpool_private_exec_all_##style##_node, \
310 8661782 : node_tpool, node_ts,node_t1, args, reduce,stride, l0,l1, _task,_tpool, t0,t1 ); \
311 8661782 : wait_child[ wait_cnt++ ] = (ushort)node_ts; \
312 8661782 : node_t1 = node_ts; \
313 8661782 : }
314 :
315 : #define FD_TPOOL_EXEC_ALL_IMPL_FTR \
316 20007686 : while( wait_cnt ) fd_tpool_wait( node_tpool, (ulong)wait_child[ --wait_cnt ] ); \
317 11537481 : }
318 :
319 1021624 : FD_TPOOL_EXEC_ALL_IMPL_HDR(rrobin)
320 1021624 : ulong m_stride = t1-t0;
321 1021624 : ulong m = l0 + fd_ulong_min( node_t0-t0, ULONG_MAX-l0 ); /* robust against overflow */
322 47823045 : while( m<l1 ) {
323 46801421 : task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m,m+1UL, node_t0,node_t1 );
324 46801421 : m += fd_ulong_min( m_stride, ULONG_MAX-m ); /* robust against overflow */
325 46801421 : }
326 1021624 : FD_TPOOL_EXEC_ALL_IMPL_FTR
327 :
328 1019702 : FD_TPOOL_EXEC_ALL_IMPL_HDR(block)
329 1019702 : ulong m0; ulong m1; FD_TPOOL_PARTITION( l0,l1,1UL, node_t0-t0,t1-t0, m0,m1 );
330 49029521 : for( ulong m=m0; m<m1; m++ ) task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m,m+1UL, node_t0,node_t1 );
331 1019702 : FD_TPOOL_EXEC_ALL_IMPL_FTR
332 :
333 : #if FD_HAS_ATOMIC
334 1024444 : FD_TPOOL_EXEC_ALL_IMPL_HDR(taskq)
335 1024444 : ulong * l_next = (ulong *)_tpool;
336 1024444 : FD_COMPILER_MFENCE();
337 1024444 : void * tpool = (void *)FD_VOLATILE_CONST( l_next[1] );
338 1024444 : FD_COMPILER_MFENCE();
339 98439153 : for(;;) {
340 :
341 : /* Note that we use an ATOMIC_CAS here instead of an
342 : ATOMIC_FETCH_AND_ADD to avoid overflow risks by having threads
343 : increment l0 into the tail. ATOMIC_FETCH_AND_ADD could be used
344 : if there is no requirement to the effect that l1+FD_TILE_MAX does
345 : not overflow. */
346 :
347 98439153 : FD_COMPILER_MFENCE();
348 98439153 : ulong m0 = FD_VOLATILE_CONST( *l_next );
349 98439153 : FD_COMPILER_MFENCE();
350 98439153 : if( FD_UNLIKELY( m0>=l1 ) ) break;
351 97478984 : ulong m1 = m0+1UL;
352 97478984 : if( FD_UNLIKELY( FD_ATOMIC_CAS( l_next, m0, m1 )!=m0 ) ) {
353 35682479 : FD_SPIN_PAUSE();
354 35682479 : continue;
355 35682479 : }
356 :
357 61796505 : task( tpool,t0,t1, args,reduce,stride, l0,l1, m0,m1, node_t0,node_t1 );
358 61796505 : }
359 1024444 : FD_TPOOL_EXEC_ALL_IMPL_FTR
360 : #endif
361 :
362 1011468 : FD_TPOOL_EXEC_ALL_IMPL_HDR(batch)
363 1011468 : ulong m0; ulong m1; FD_TPOOL_PARTITION( l0,l1,1UL, node_t0-t0,t1-t0, m0,m1 );
364 1011468 : task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m0,m1, node_t0,node_t1 );
365 1011468 : FD_TPOOL_EXEC_ALL_IMPL_FTR
366 :
367 7460243 : FD_TPOOL_EXEC_ALL_IMPL_HDR(raw)
368 7460243 : task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, l0,l1, node_t0,node_t1 );
369 7460243 : FD_TPOOL_EXEC_ALL_IMPL_FTR
370 :
371 : #undef FD_TPOOL_EXEC_ALL_IMPL_FTR
372 : #undef FD_TPOOL_EXEC_ALL_IMPL_HDR
373 :
374 : char const *
375 18 : fd_tpool_worker_state_cstr( int state ) {
376 18 : switch( state ) {
377 3 : case FD_TPOOL_WORKER_STATE_BOOT: return "boot";
378 3 : case FD_TPOOL_WORKER_STATE_IDLE: return "idle";
379 6 : case FD_TPOOL_WORKER_STATE_EXEC: return "exec";
380 3 : case FD_TPOOL_WORKER_STATE_HALT: return "halt";
381 3 : default: break;
382 18 : }
383 3 : return "unknown";
384 18 : }
|