Line data Source code
1 : #include "fd_tpool.h"
2 :
3 : struct fd_tpool_private_worker_cfg {
4 : fd_tpool_t * tpool;
5 : ulong tile_idx;
6 : void * scratch;
7 : ulong scratch_sz;
8 : };
9 :
10 : typedef struct fd_tpool_private_worker_cfg fd_tpool_private_worker_cfg_t;
11 :
12 : /* This is not static to allow tile 0 to attach to this if desired. */
13 :
14 : FD_TL ulong fd_tpool_private_scratch_frame[ FD_TPOOL_WORKER_SCRATCH_DEPTH ] __attribute((aligned(FD_SCRATCH_FMEM_ALIGN)));
15 :
16 : static int
17 : fd_tpool_private_worker( int argc,
18 84 : char ** argv ) {
19 84 : ulong worker_idx = (ulong)(uint)argc;
20 84 : fd_tpool_private_worker_cfg_t * cfg = (fd_tpool_private_worker_cfg_t *)argv;
21 :
22 84 : fd_tpool_t * tpool = cfg->tpool;
23 84 : ulong tile_idx = cfg->tile_idx;
24 84 : void * scratch = cfg->scratch;
25 84 : ulong scratch_sz = cfg->scratch_sz;
26 :
27 84 : fd_tpool_private_worker_t worker[1];
28 :
29 84 : FD_COMPILER_MFENCE();
30 84 : worker->state = FD_TPOOL_WORKER_STATE_BOOT;
31 84 : FD_COMPILER_MFENCE();
32 :
33 84 : worker->tile_idx = (uint)tile_idx;
34 84 : worker->scratch = scratch;
35 84 : worker->scratch_sz = scratch_sz;
36 :
37 84 : if( scratch_sz ) fd_scratch_attach( scratch, fd_tpool_private_scratch_frame, scratch_sz, FD_TPOOL_WORKER_SCRATCH_DEPTH );
38 :
39 84 : FD_COMPILER_MFENCE();
40 84 : worker->state = FD_TPOOL_WORKER_STATE_IDLE;
41 84 : FD_COMPILER_MFENCE();
42 :
43 84 : fd_tpool_private_worker( tpool )[ worker_idx ] = worker;
44 :
45 2461238908 : for(;;) {
46 :
47 : /* We are IDLE ... see what we should do next */
48 :
49 2461238908 : FD_COMPILER_MFENCE();
50 2461238908 : int state = worker->state;
51 2461238908 : FD_COMPILER_MFENCE();
52 :
53 2461238908 : if( FD_UNLIKELY( state!=FD_TPOOL_WORKER_STATE_EXEC ) ) {
54 1954361289 : if( FD_UNLIKELY( state!=FD_TPOOL_WORKER_STATE_IDLE ) ) break;
55 1954361205 : FD_SPIN_PAUSE();
56 1954361205 : continue;
57 1954361289 : }
58 :
59 : /* We are EXEC ... do the task and then transition to IDLE */
60 :
61 506877619 : fd_tpool_task_t task = worker->task;
62 :
63 506877619 : void * task_tpool = worker->task_tpool;
64 506877619 : ulong task_t0 = worker->task_t0; ulong task_t1 = worker->task_t1;
65 506877619 : void * task_args = worker->task_args;
66 506877619 : void * task_reduce = worker->task_reduce; ulong task_stride = worker->task_stride;
67 506877619 : ulong task_l0 = worker->task_l0; ulong task_l1 = worker->task_l1;
68 506877619 : ulong task_m0 = worker->task_m0; ulong task_m1 = worker->task_m1;
69 506877619 : ulong task_n0 = worker->task_n0; ulong task_n1 = worker->task_n1;
70 :
71 506877619 : FD_COMPILER_MFENCE();
72 :
73 506877619 : try {
74 506877619 : task( task_tpool,task_t0,task_t1, task_args, task_reduce,task_stride, task_l0,task_l1, task_m0,task_m1, task_n0,task_n1 );
75 506877619 : } catch( ... ) {
76 0 : FD_LOG_WARNING(( "uncaught exception; attempting to continue" ));
77 0 : }
78 :
79 506877619 : FD_COMPILER_MFENCE();
80 29555221 : worker->state = FD_TPOOL_WORKER_STATE_IDLE;
81 29555221 : }
82 :
83 : /* state is HALT, clean up and then reset back to BOOT */
84 :
85 5 : if( scratch_sz ) fd_scratch_detach( NULL );
86 :
87 5 : FD_COMPILER_MFENCE();
88 5 : worker->state = FD_TPOOL_WORKER_STATE_BOOT;
89 5 : FD_COMPILER_MFENCE();
90 :
91 5 : return 0;
92 84 : }
93 :
94 : ulong
95 3 : fd_tpool_align( void ) {
96 3 : return FD_TPOOL_ALIGN;
97 3 : }
98 :
99 : ulong
100 3003093 : fd_tpool_footprint( ulong worker_max ) {
101 3003093 : if( FD_UNLIKELY( !((1UL<=worker_max) & (worker_max<=FD_TILE_MAX)) ) ) return 0UL;
102 474180 : return fd_ulong_align_up( sizeof(fd_tpool_private_worker_t) +
103 474180 : sizeof(fd_tpool_t) + worker_max*sizeof(fd_tpool_private_worker_t *), FD_TPOOL_ALIGN );
104 3003093 : }
105 :
106 : fd_tpool_t *
107 : fd_tpool_init( void * mem,
108 3099 : ulong worker_max ) {
109 :
110 3099 : FD_COMPILER_MFENCE();
111 :
112 3099 : if( FD_UNLIKELY( !mem ) ) {
113 3 : FD_LOG_WARNING(( "NULL mem" ));
114 3 : return NULL;
115 3 : }
116 :
117 3096 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, fd_tpool_align() ) ) ) {
118 3 : FD_LOG_WARNING(( "bad alignment" ));
119 3 : return NULL;
120 3 : }
121 :
122 3093 : ulong footprint = fd_tpool_footprint( worker_max );
123 3093 : if( FD_UNLIKELY( !footprint ) ) {
124 6 : FD_LOG_WARNING(( "bad worker_max" ));
125 6 : return NULL;
126 6 : }
127 :
128 3087 : fd_memset( mem, 0, footprint );
129 :
130 3087 : fd_tpool_private_worker_t * worker0 = (fd_tpool_private_worker_t *)mem;
131 :
132 3087 : FD_COMPILER_MFENCE();
133 3087 : worker0->state = FD_TPOOL_WORKER_STATE_EXEC;
134 3087 : FD_COMPILER_MFENCE();
135 :
136 3087 : fd_tpool_t * tpool = (fd_tpool_t *)(worker0+1);
137 3087 : tpool->worker_max = worker_max;
138 3087 : tpool->worker_cnt = 1UL;
139 :
140 3087 : FD_COMPILER_MFENCE();
141 3087 : fd_tpool_private_worker( tpool )[0] = worker0;
142 3087 : FD_COMPILER_MFENCE();
143 :
144 3087 : return tpool;
145 3093 : }
146 :
147 : void *
148 3090 : fd_tpool_fini( fd_tpool_t * tpool ) {
149 :
150 3090 : FD_COMPILER_MFENCE();
151 :
152 3090 : if( FD_UNLIKELY( !tpool ) ) {
153 3 : FD_LOG_WARNING(( "NULL tpool" ));
154 3 : return NULL;
155 3 : }
156 :
157 3150 : while( fd_tpool_worker_cnt( tpool )>1UL ) {
158 63 : if( FD_UNLIKELY( !fd_tpool_worker_pop( tpool ) ) ) {
159 0 : FD_LOG_WARNING(( "fd_tpool_worker_pop failed" ));
160 0 : return NULL;
161 0 : }
162 63 : }
163 :
164 3087 : return (void *)fd_tpool_private_worker0( tpool );
165 3087 : }
166 :
167 : fd_tpool_t *
168 : fd_tpool_worker_push( fd_tpool_t * tpool,
169 : ulong tile_idx,
170 : void * scratch,
171 126 : ulong scratch_sz ) {
172 :
173 126 : FD_COMPILER_MFENCE();
174 :
175 126 : if( FD_UNLIKELY( !tpool ) ) {
176 3 : FD_LOG_WARNING(( "NULL tpool" ));
177 3 : return NULL;
178 3 : }
179 :
180 123 : if( FD_UNLIKELY( !tile_idx ) ) {
181 3 : FD_LOG_WARNING(( "cannot push tile_idx 0" ));
182 3 : return NULL;
183 3 : }
184 :
185 120 : if( FD_UNLIKELY( tile_idx==fd_tile_idx() ) ) {
186 3 : FD_LOG_WARNING(( "cannot push self" ));
187 3 : return NULL;
188 3 : }
189 :
190 117 : if( FD_UNLIKELY( tile_idx>=fd_tile_cnt() ) ) {
191 3 : FD_LOG_WARNING(( "invalid tile_idx" ));
192 3 : return NULL;
193 3 : }
194 :
195 114 : if( FD_UNLIKELY( scratch_sz ) ) {
196 6 : if( FD_UNLIKELY( !scratch ) ) {
197 3 : FD_LOG_WARNING(( "NULL scratch" ));
198 3 : return NULL;
199 3 : }
200 :
201 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)scratch, FD_SCRATCH_SMEM_ALIGN ) ) ) {
202 3 : FD_LOG_WARNING(( "misaligned scratch" ));
203 3 : return NULL;
204 3 : }
205 3 : }
206 :
207 108 : fd_tpool_private_worker_t ** worker = fd_tpool_private_worker( tpool );
208 108 : ulong worker_cnt = tpool->worker_cnt;
209 :
210 108 : if( FD_UNLIKELY( worker_cnt>=tpool->worker_max ) ) {
211 3 : FD_LOG_WARNING(( "too many workers" ));
212 3 : return NULL;
213 3 : }
214 :
215 507 : for( ulong worker_idx=0UL; worker_idx<worker_cnt; worker_idx++ )
216 420 : if( worker[ worker_idx ]->tile_idx==tile_idx ) {
217 18 : FD_LOG_WARNING(( "tile_idx already added to tpool" ));
218 18 : return NULL;
219 18 : }
220 :
221 87 : fd_tpool_private_worker_cfg_t cfg[1];
222 :
223 87 : cfg->tpool = tpool;
224 87 : cfg->tile_idx = tile_idx;
225 87 : cfg->scratch = scratch;
226 87 : cfg->scratch_sz = scratch_sz;
227 :
228 87 : int argc = (int)(uint)worker_cnt;
229 87 : char ** argv = (char **)fd_type_pun( cfg );
230 :
231 87 : FD_COMPILER_MFENCE();
232 87 : worker[ worker_cnt ] = NULL;
233 87 : FD_COMPILER_MFENCE();
234 :
235 87 : if( FD_UNLIKELY( !fd_tile_exec_new( tile_idx, fd_tpool_private_worker, argc, argv ) ) ) {
236 3 : FD_LOG_WARNING(( "fd_tile_exec_new failed (tile probably already in use)" ));
237 3 : return NULL;
238 3 : }
239 :
240 2355 : while( !FD_VOLATILE_CONST( worker[ worker_cnt ] ) ) FD_SPIN_PAUSE();
241 :
242 84 : tpool->worker_cnt = worker_cnt + 1UL;
243 84 : return tpool;
244 87 : }
245 :
246 : fd_tpool_t *
247 93 : fd_tpool_worker_pop( fd_tpool_t * tpool ) {
248 :
249 93 : FD_COMPILER_MFENCE();
250 :
251 93 : if( FD_UNLIKELY( !tpool ) ) {
252 3 : FD_LOG_WARNING(( "NULL tpool" ));
253 3 : return NULL;
254 3 : }
255 :
256 90 : ulong worker_cnt = tpool->worker_cnt;
257 90 : if( FD_UNLIKELY( worker_cnt<=1UL ) ) {
258 3 : FD_LOG_WARNING(( "no workers to pop" ));
259 3 : return NULL;
260 3 : }
261 :
262 87 : fd_tpool_private_worker_t * worker = fd_tpool_private_worker( tpool )[ worker_cnt-1UL ];
263 87 : fd_tile_exec_t * exec = fd_tile_exec( worker->tile_idx );
264 87 : int volatile * vstate = (int volatile *)&(worker->state);
265 87 : int state;
266 :
267 : /* Testing for IDLE isn't strictly necessary given requirements to use
268 : this but can help catch user errors. Likewise, FD_ATOMIC_CAS isn't
269 : strictly necessary given correct operation but can more robustly
270 : catch such errors. */
271 :
272 87 : # if FD_HAS_ATOMIC
273 :
274 87 : FD_COMPILER_MFENCE();
275 87 : state = FD_ATOMIC_CAS( vstate, FD_TPOOL_WORKER_STATE_IDLE, FD_TPOOL_WORKER_STATE_HALT );
276 87 : FD_COMPILER_MFENCE();
277 :
278 87 : if( FD_UNLIKELY( state!=FD_TPOOL_WORKER_STATE_IDLE ) ) {
279 3 : FD_LOG_WARNING(( "worker to pop is not idle (%i-%s)", state, fd_tpool_worker_state_cstr( state ) ));
280 3 : return NULL;
281 3 : }
282 :
283 : # else
284 :
285 : FD_COMPILER_MFENCE();
286 : state = *vstate;
287 : FD_COMPILER_MFENCE();
288 :
289 : if( FD_UNLIKELY( state!=FD_TPOOL_WORKER_STATE_IDLE ) ) {
290 : FD_LOG_WARNING(( "worker to pop is not idle (%i-%s)", state, fd_tpool_worker_state_cstr( state ) ));
291 : return NULL;
292 : }
293 :
294 : FD_COMPILER_MFENCE();
295 : *vstate = FD_TPOOL_WORKER_STATE_HALT;
296 : FD_COMPILER_MFENCE();
297 :
298 : # endif
299 :
300 : /* Wait for the worker to shutdown */
301 :
302 84 : int ret;
303 84 : char const * err = fd_tile_exec_delete( exec, &ret );
304 84 : if( FD_UNLIKELY( err ) ) FD_LOG_WARNING(( "tile err \"%s\" unexpected; attempting to continue", err ));
305 84 : else if( FD_UNLIKELY( ret ) ) FD_LOG_WARNING(( "tile ret %i unexpected; attempting to continue", ret ));
306 :
307 84 : tpool->worker_cnt = worker_cnt-1UL;
308 84 : return tpool;
309 87 : }
310 :
311 : #define FD_TPOOL_EXEC_ALL_IMPL_HDR(style) \
312 : void \
313 : fd_tpool_private_exec_all_##style##_node( void * _node_tpool, \
314 : ulong node_t0, ulong node_t1, \
315 : void * args, \
316 : void * reduce, ulong stride, \
317 : ulong l0, ulong l1, \
318 : ulong _task, ulong _tpool, \
319 11516451 : ulong t0, ulong t1 ) { \
320 11516451 : fd_tpool_t * node_tpool = (fd_tpool_t * )_node_tpool; \
321 11516451 : fd_tpool_task_t task = (fd_tpool_task_t)_task; \
322 11516451 : ulong wait_cnt = 0UL; \
323 11516451 : ushort wait_child[16]; /* Assumes tpool_cnt<=65536 */ \
324 18083405 : for(;;) { \
325 18083405 : ulong node_t_cnt = node_t1 - node_t0; \
326 18083405 : if( node_t_cnt<=1L ) break; \
327 18083405 : ulong node_ts = node_t0 + fd_tpool_private_split( node_t_cnt ); \
328 8275671 : fd_tpool_exec( node_tpool, node_ts, fd_tpool_private_exec_all_##style##_node, \
329 8275671 : node_tpool, node_ts,node_t1, args, reduce,stride, l0,l1, _task,_tpool, t0,t1 ); \
330 8275671 : wait_child[ wait_cnt++ ] = (ushort)node_ts; \
331 8275671 : node_t1 = node_ts; \
332 8275671 : }
333 :
334 : #define FD_TPOOL_EXEC_ALL_IMPL_FTR \
335 19555430 : while( wait_cnt ) fd_tpool_wait( node_tpool, (ulong)wait_child[ --wait_cnt ] ); \
336 11516451 : }
337 :
338 996292 : FD_TPOOL_EXEC_ALL_IMPL_HDR(rrobin)
339 996292 : ulong m_stride = t1-t0;
340 996292 : ulong m = l0 + fd_ulong_min( node_t0-t0, ULONG_MAX-l0 ); /* robust against overflow */
341 43806521 : while( m<l1 ) {
342 42810229 : task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m,m+1UL, node_t0,node_t1 );
343 42810229 : m += fd_ulong_min( m_stride, ULONG_MAX-m ); /* robust against overflow */
344 42810229 : }
345 996292 : FD_TPOOL_EXEC_ALL_IMPL_FTR
346 :
347 1022198 : FD_TPOOL_EXEC_ALL_IMPL_HDR(block)
348 1022198 : ulong m0; ulong m1; FD_TPOOL_PARTITION( l0,l1,1UL, node_t0-t0,t1-t0, m0,m1 );
349 46712895 : for( ulong m=m0; m<m1; m++ ) task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m,m+1UL, node_t0,node_t1 );
350 1022198 : FD_TPOOL_EXEC_ALL_IMPL_FTR
351 :
352 : #if FD_HAS_ATOMIC
353 1018409 : FD_TPOOL_EXEC_ALL_IMPL_HDR(taskq)
354 1018409 : ulong * l_next = (ulong *)_tpool;
355 1018409 : void * tpool = (void *)l_next[1];
356 104967272 : for(;;) {
357 :
358 : /* Note that we use an ATOMIC_CAS here instead of an
359 : ATOMIC_FETCH_AND_ADD to avoid overflow risks by having threads
360 : increment l0 into the tail. ATOMIC_FETCH_AND_ADD could be used
361 : if there is no requirement to the effect that l1+FD_TILE_MAX does
362 : not overflow. */
363 :
364 104967272 : FD_COMPILER_MFENCE();
365 104967272 : ulong m0 = *l_next;
366 104967272 : FD_COMPILER_MFENCE();
367 :
368 104967272 : if( FD_UNLIKELY( m0>=l1 ) ) break;
369 104192559 : ulong m1 = m0+1UL;
370 104192559 : if( FD_UNLIKELY( FD_ATOMIC_CAS( l_next, m0, m1 )!=m0 ) ) {
371 37639810 : FD_SPIN_PAUSE();
372 37639810 : continue;
373 37639810 : }
374 :
375 66552749 : task( tpool,t0,t1, args,reduce,stride, l0,l1, m0,m1, node_t0,node_t1 );
376 66552749 : }
377 1018409 : FD_TPOOL_EXEC_ALL_IMPL_FTR
378 : #endif
379 :
380 1011389 : FD_TPOOL_EXEC_ALL_IMPL_HDR(batch)
381 1011389 : ulong m0; ulong m1; FD_TPOOL_PARTITION( l0,l1,1UL, node_t0-t0,t1-t0, m0,m1 );
382 1011389 : task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m0,m1, node_t0,node_t1 );
383 1011389 : FD_TPOOL_EXEC_ALL_IMPL_FTR
384 :
385 7468163 : FD_TPOOL_EXEC_ALL_IMPL_HDR(raw)
386 7468163 : task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, l0,l1, node_t0,node_t1 );
387 7468163 : FD_TPOOL_EXEC_ALL_IMPL_FTR
388 :
389 : #undef FD_TPOOL_EXEC_ALL_IMPL_FTR
390 : #undef FD_TPOOL_EXEC_ALL_IMPL_HDR
391 :
392 : char const *
393 18 : fd_tpool_worker_state_cstr( int state ) {
394 18 : switch( state ) {
395 3 : case FD_TPOOL_WORKER_STATE_BOOT: return "boot";
396 3 : case FD_TPOOL_WORKER_STATE_IDLE: return "idle";
397 6 : case FD_TPOOL_WORKER_STATE_EXEC: return "exec";
398 3 : case FD_TPOOL_WORKER_STATE_HALT: return "halt";
399 3 : default: break;
400 18 : }
401 3 : return "unknown";
402 18 : }
|