Line data Source code
1 : #include "fd_tpool.h"
2 :
3 : #if FD_HAS_THREADS
4 : #include <pthread.h>
5 : #endif
6 :
7 : struct fd_tpool_private_worker_cfg {
8 : fd_tpool_t * tpool;
9 : ulong tile_idx;
10 : };
11 :
12 : typedef struct fd_tpool_private_worker_cfg fd_tpool_private_worker_cfg_t;
13 :
14 : static int
15 : fd_tpool_private_worker( int argc,
16 84 : char ** argv ) {
17 84 : ulong worker_idx = (ulong)(uint)argc;
18 84 : fd_tpool_private_worker_cfg_t * cfg = (fd_tpool_private_worker_cfg_t *)argv;
19 :
20 84 : fd_tpool_t * tpool = cfg->tpool;
21 84 : ulong tile_idx = cfg->tile_idx;
22 :
23 : /* We are BOOT */
24 :
25 84 : fd_tpool_private_worker_t worker[1];
26 :
27 84 : memset( worker, 0, sizeof(fd_tpool_private_worker_t) );
28 :
29 84 : worker->tile_idx = (uint) tile_idx;
30 :
31 84 : # if FD_HAS_THREADS
32 84 : int sleeper = !!(tpool->opt & FD_TPOOL_OPT_SLEEP);
33 :
34 84 : pthread_mutex_t lock[1];
35 84 : pthread_cond_t wake[1];
36 :
37 84 : if( FD_UNLIKELY( sleeper ) ) {
38 0 : if( FD_UNLIKELY( pthread_mutex_init( lock, NULL ) ) ) FD_LOG_ERR(( "pthread_mutex_init failed" ));
39 0 : if( FD_UNLIKELY( pthread_cond_init ( wake, NULL ) ) ) FD_LOG_ERR(( "pthread_cond_init failed" ));
40 0 : if( FD_UNLIKELY( pthread_mutex_lock( lock ) ) ) FD_LOG_ERR(( "pthread_mutex_lock failed" ));
41 0 : }
42 :
43 84 : worker->lock = (ulong)lock;
44 84 : worker->wake = (ulong)wake;
45 84 : # endif
46 :
47 84 : FD_COMPILER_MFENCE();
48 :
49 84 : fd_tpool_private_worker( tpool )[ worker_idx ] = worker;
50 :
51 84 : ulong const * arg = worker->arg;
52 84 : uint seq1 = worker->seq1;
53 :
54 648493427 : for(;;) {
55 :
56 : /* We are IDLE ... see what we should do next */
57 :
58 648493427 : # if FD_HAS_THREADS
59 648493427 : if( FD_UNLIKELY( sleeper ) && FD_UNLIKELY( pthread_cond_wait( wake, lock ) ) )
60 0 : FD_LOG_WARNING(( "pthread_cond_wait failed; attempting to continue" ));
61 648493427 : # endif
62 :
63 648493427 : FD_COMPILER_MFENCE();
64 648493427 : uint seq0 = worker->seq0;
65 648493427 : FD_COMPILER_MFENCE();
66 648493427 : uint _arg_cnt = worker->arg_cnt;
67 648493427 : ulong _task = worker->task;
68 648493427 : FD_COMPILER_MFENCE();
69 :
70 648493427 : if( FD_UNLIKELY( seq0==seq1 ) ) { /* Got idle */
71 644958083 : FD_SPIN_PAUSE();
72 644958083 : continue;
73 644958083 : }
74 :
75 3535344 : if( FD_UNLIKELY( !_task ) ) break; /* Got halt */
76 :
77 : /* We are EXEC ... do the task and then transition to IDLE */
78 :
79 3535260 : try {
80 :
81 5208747 : if( _arg_cnt==UINT_MAX ) {
82 :
83 5208747 : fd_tpool_task_t task = (fd_tpool_task_t)_task;
84 :
85 5208747 : void * task_tpool = (void *)arg[ 0];
86 5208747 : ulong task_t0 = arg[ 1]; ulong task_t1 = arg[ 2];
87 5208747 : void * task_args = (void *)arg[ 3];
88 5208747 : void * task_reduce = (void *)arg[ 4]; ulong task_stride = arg[ 5];
89 5208747 : ulong task_l0 = arg[ 6]; ulong task_l1 = arg[ 7];
90 5208747 : ulong task_m0 = arg[ 8]; ulong task_m1 = arg[ 9];
91 5208747 : ulong task_n0 = arg[10]; ulong task_n1 = arg[11];
92 :
93 5208747 : task( task_tpool,task_t0,task_t1, task_args, task_reduce,task_stride, task_l0,task_l1, task_m0,task_m1, task_n0,task_n1 );
94 :
95 5 : } else {
96 :
97 5 : fd_tpool_task_v2_t task = (fd_tpool_task_v2_t)_task;
98 :
99 5 : task( tpool, worker_idx, (ulong)_arg_cnt, arg );
100 :
101 5 : }
102 :
103 3535260 : } catch( ... ) {
104 0 : FD_LOG_WARNING(( "uncaught exception; attempting to continue" ));
105 0 : }
106 :
107 19639254 : FD_COMPILER_MFENCE();
108 :
109 19639254 : worker->seq1 = seq0;
110 19639254 : seq1 = seq0;
111 19639254 : }
112 :
113 : /* We are HALT ... clean up and terminate */
114 :
115 16104078 : # if FD_HAS_THREADS
116 16104078 : if( FD_UNLIKELY( sleeper ) ) {
117 0 : if( FD_UNLIKELY( pthread_mutex_unlock ( lock ) ) ) FD_LOG_WARNING(( "pthread_mutex_unlock failed; attempting to continue" ));
118 0 : if( FD_UNLIKELY( pthread_cond_destroy ( wake ) ) ) FD_LOG_WARNING(( "pthread_cond_destroy failed; attempting to continue" ));
119 0 : if( FD_UNLIKELY( pthread_mutex_destroy( lock ) ) ) FD_LOG_WARNING(( "pthread_mutex_destroy failed; attempting to continue" ));
120 0 : }
121 16104078 : # endif
122 :
123 16104078 : return 0;
124 84 : }
125 :
126 : #if FD_HAS_THREADS
127 : void
128 0 : fd_tpool_private_wake( fd_tpool_private_worker_t * worker ) {
129 0 : pthread_mutex_t * lock = (pthread_mutex_t *)worker->lock;
130 0 : pthread_cond_t * wake = (pthread_cond_t *)worker->wake;
131 0 : if( FD_UNLIKELY( pthread_mutex_lock ( lock ) ) ) FD_LOG_WARNING(( "pthread_mutex_lock failed; attempting to continue" ));
132 0 : if( FD_UNLIKELY( pthread_cond_signal ( wake ) ) ) FD_LOG_WARNING(( "pthread_cond_signal failed; attempting to continue" ));
133 0 : if( FD_UNLIKELY( pthread_mutex_unlock( lock ) ) ) FD_LOG_WARNING(( "pthread_mutex_unlock failed; attempting to continue" ));
134 0 : }
135 : #endif
136 :
137 : ulong
138 3099 : fd_tpool_align( void ) {
139 3099 : return FD_TPOOL_ALIGN;
140 3099 : }
141 :
142 : ulong
143 3003093 : fd_tpool_footprint( ulong worker_max ) {
144 3003093 : if( FD_UNLIKELY( !((1UL<=worker_max) & (worker_max<=FD_TILE_MAX)) ) ) return 0UL;
145 474180 : return fd_ulong_align_up( sizeof(fd_tpool_private_worker_t) +
146 474180 : sizeof(fd_tpool_t) + worker_max*sizeof(fd_tpool_private_worker_t *), FD_TPOOL_ALIGN );
147 3003093 : }
148 :
149 : fd_tpool_t *
150 : fd_tpool_init( void * mem,
151 : ulong worker_max,
152 3099 : ulong opt ) {
153 :
154 3099 : FD_COMPILER_MFENCE();
155 :
156 3099 : if( FD_UNLIKELY( !mem ) ) {
157 3 : FD_LOG_WARNING(( "NULL mem" ));
158 3 : return NULL;
159 3 : }
160 :
161 3096 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, fd_tpool_align() ) ) ) {
162 3 : FD_LOG_WARNING(( "bad alignment" ));
163 3 : return NULL;
164 3 : }
165 :
166 3093 : ulong footprint = fd_tpool_footprint( worker_max );
167 3093 : if( FD_UNLIKELY( !footprint ) ) {
168 6 : FD_LOG_WARNING(( "bad worker_max" ));
169 6 : return NULL;
170 6 : }
171 :
172 3087 : fd_memset( mem, 0, footprint );
173 :
174 3087 : fd_tpool_private_worker_t * worker0 = (fd_tpool_private_worker_t *)mem;
175 :
176 3087 : worker0->seq0 = 1U;
177 3087 : worker0->seq1 = 0U;
178 :
179 3087 : fd_tpool_t * tpool = (fd_tpool_t *)(worker0+1);
180 :
181 3087 : tpool->opt = opt;
182 3087 : tpool->worker_max = (uint)worker_max;
183 3087 : tpool->worker_cnt = 1U;
184 :
185 3087 : FD_COMPILER_MFENCE();
186 3087 : fd_tpool_private_worker( tpool )[0] = worker0;
187 3087 : FD_COMPILER_MFENCE();
188 :
189 3087 : return tpool;
190 3093 : }
191 :
192 : void *
193 3090 : fd_tpool_fini( fd_tpool_t * tpool ) {
194 :
195 3090 : FD_COMPILER_MFENCE();
196 :
197 3090 : if( FD_UNLIKELY( !tpool ) ) {
198 3 : FD_LOG_WARNING(( "NULL tpool" ));
199 3 : return NULL;
200 3 : }
201 :
202 3150 : while( fd_tpool_worker_cnt( tpool )>1UL ) {
203 63 : if( FD_UNLIKELY( !fd_tpool_worker_pop( tpool ) ) ) {
204 0 : FD_LOG_WARNING(( "fd_tpool_worker_pop failed" ));
205 0 : return NULL;
206 0 : }
207 63 : }
208 :
209 3087 : return (void *)fd_tpool_private_worker0( tpool );
210 3087 : }
211 :
212 : fd_tpool_t *
213 : fd_tpool_worker_push( fd_tpool_t * tpool,
214 120 : ulong tile_idx ) {
215 :
216 120 : FD_COMPILER_MFENCE();
217 :
218 120 : if( FD_UNLIKELY( !tpool ) ) {
219 3 : FD_LOG_WARNING(( "NULL tpool" ));
220 3 : return NULL;
221 3 : }
222 :
223 117 : if( FD_UNLIKELY( !tile_idx ) ) {
224 3 : FD_LOG_WARNING(( "cannot push tile_idx 0" ));
225 3 : return NULL;
226 3 : }
227 :
228 114 : if( FD_UNLIKELY( tile_idx==fd_tile_idx() ) ) {
229 3 : FD_LOG_WARNING(( "cannot push self" ));
230 3 : return NULL;
231 3 : }
232 :
233 111 : if( FD_UNLIKELY( tile_idx>=fd_tile_cnt() ) ) {
234 3 : FD_LOG_WARNING(( "invalid tile_idx" ));
235 3 : return NULL;
236 3 : }
237 :
238 108 : fd_tpool_private_worker_t ** worker = fd_tpool_private_worker( tpool );
239 108 : ulong worker_cnt = (ulong)tpool->worker_cnt;
240 :
241 108 : if( FD_UNLIKELY( worker_cnt>=(ulong)tpool->worker_max ) ) {
242 3 : FD_LOG_WARNING(( "too many workers" ));
243 3 : return NULL;
244 3 : }
245 :
246 507 : for( ulong worker_idx=0UL; worker_idx<worker_cnt; worker_idx++ )
247 420 : if( worker[ worker_idx ]->tile_idx==tile_idx ) {
248 18 : FD_LOG_WARNING(( "tile_idx already added to tpool" ));
249 18 : return NULL;
250 18 : }
251 :
252 87 : fd_tpool_private_worker_cfg_t cfg[1];
253 :
254 87 : cfg->tpool = tpool;
255 87 : cfg->tile_idx = tile_idx;
256 :
257 87 : int argc = (int)(uint)worker_cnt;
258 87 : char ** argv = (char **)fd_type_pun( cfg );
259 :
260 87 : FD_COMPILER_MFENCE();
261 87 : worker[ worker_cnt ] = NULL;
262 87 : FD_COMPILER_MFENCE();
263 :
264 87 : if( FD_UNLIKELY( !fd_tile_exec_new( tile_idx, fd_tpool_private_worker, argc, argv ) ) ) {
265 3 : FD_LOG_WARNING(( "fd_tile_exec_new failed (tile probably already in use)" ));
266 3 : return NULL;
267 3 : }
268 :
269 2133 : while( !FD_VOLATILE_CONST( worker[ worker_cnt ] ) ) FD_SPIN_PAUSE();
270 :
271 84 : tpool->worker_cnt = (uint)(worker_cnt + 1UL);
272 84 : return tpool;
273 87 : }
274 :
275 : fd_tpool_t *
276 93 : fd_tpool_worker_pop( fd_tpool_t * tpool ) {
277 :
278 93 : FD_COMPILER_MFENCE();
279 :
280 93 : if( FD_UNLIKELY( !tpool ) ) {
281 3 : FD_LOG_WARNING(( "NULL tpool" ));
282 3 : return NULL;
283 3 : }
284 :
285 90 : ulong worker_cnt = (ulong)tpool->worker_cnt;
286 90 : if( FD_UNLIKELY( worker_cnt<=1UL ) ) {
287 3 : FD_LOG_WARNING(( "no workers to pop" ));
288 3 : return NULL;
289 3 : }
290 :
291 : /* Testing for IDLE isn't strictly necessary given requirements to use
292 : this and this isn't being done atomically with the actually pop but
293 : does help catch obvious user errors. */
294 :
295 87 : if( FD_UNLIKELY( !fd_tpool_worker_idle( tpool, worker_cnt-1UL ) ) ) {
296 3 : FD_LOG_WARNING(( "worker to pop is not idle" ));
297 3 : return NULL;
298 3 : }
299 :
300 : /* Send HALT to the worker */
301 :
302 84 : fd_tpool_private_worker_t * worker = fd_tpool_private_worker( tpool )[ worker_cnt-1UL ];
303 84 : uint seq0 = worker->seq0 + 1U;
304 84 : fd_tile_exec_t * exec = fd_tile_exec( worker->tile_idx );
305 :
306 84 : worker->task = 0UL;
307 84 : FD_COMPILER_MFENCE();
308 84 : worker->seq0 = seq0;
309 84 : FD_COMPILER_MFENCE();
310 84 : if( FD_UNLIKELY( tpool->opt & FD_TPOOL_OPT_SLEEP ) ) fd_tpool_private_wake( worker );
311 :
312 : /* Wait for the worker to shutdown */
313 :
314 84 : int ret;
315 84 : char const * err = fd_tile_exec_delete( exec, &ret );
316 84 : if( FD_UNLIKELY( err ) ) FD_LOG_WARNING(( "tile err \"%s\" unexpected; attempting to continue", err ));
317 84 : else if( FD_UNLIKELY( ret ) ) FD_LOG_WARNING(( "tile ret %i unexpected; attempting to continue", ret ));
318 :
319 84 : tpool->worker_cnt = (uint)(worker_cnt - 1UL);
320 84 : return tpool;
321 87 : }
322 :
323 : #define FD_TPOOL_EXEC_ALL_IMPL_HDR(style) \
324 : void \
325 : fd_tpool_private_exec_all_##style##_node( void * _node_tpool, \
326 : ulong node_t0, ulong node_t1, \
327 : void * args, \
328 : void * reduce, ulong stride, \
329 : ulong l0, ulong l1, \
330 : ulong _task, ulong _tpool, \
331 8627191 : ulong t0, ulong t1 ) { \
332 8627191 : fd_tpool_t * node_tpool = (fd_tpool_t * )_node_tpool; \
333 8627191 : fd_tpool_task_t task = (fd_tpool_task_t)_task; \
334 8627191 : ulong wait_cnt = 0UL; \
335 8627191 : ushort wait_child[16]; /* Assumes tpool_cnt<=65536 */ \
336 16874568 : for(;;) { \
337 16874568 : ulong node_t_cnt = node_t1 - node_t0; \
338 16874568 : if( node_t_cnt<=1L ) break; \
339 16874568 : ulong node_ts = node_t0 + fd_tpool_private_split( node_t_cnt ); \
340 8146026 : fd_tpool_exec( node_tpool, node_ts, fd_tpool_private_exec_all_##style##_node, \
341 8146026 : node_tpool, node_ts,node_t1, args, reduce,stride, l0,l1, _task,_tpool, t0,t1 ); \
342 8146026 : wait_child[ wait_cnt++ ] = (ushort)node_ts; \
343 8146026 : node_t1 = node_ts; \
344 8146026 : }
345 :
346 : #define FD_TPOOL_EXEC_ALL_IMPL_FTR \
347 17366213 : while( wait_cnt ) fd_tpool_wait( node_tpool, (ulong)wait_child[ --wait_cnt ] ); \
348 8627191 : }
349 :
350 772615 : FD_TPOOL_EXEC_ALL_IMPL_HDR(rrobin)
351 772615 : ulong m_stride = t1-t0;
352 772615 : ulong m = l0 + fd_ulong_min( node_t0-t0, ULONG_MAX-l0 ); /* robust against overflow */
353 67873859 : while( m<l1 ) {
354 67101244 : task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m,m+1UL, node_t0,node_t1 );
355 67101244 : m += fd_ulong_min( m_stride, ULONG_MAX-m ); /* robust against overflow */
356 67101244 : }
357 772615 : FD_TPOOL_EXEC_ALL_IMPL_FTR
358 :
359 773040 : FD_TPOOL_EXEC_ALL_IMPL_HDR(block)
360 773040 : ulong m0; ulong m1; FD_TPOOL_PARTITION( l0,l1,1UL, node_t0-t0,t1-t0, m0,m1 );
361 70792979 : for( ulong m=m0; m<m1; m++ ) task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m,m+1UL, node_t0,node_t1 );
362 773040 : FD_TPOOL_EXEC_ALL_IMPL_FTR
363 :
364 : #if FD_HAS_ATOMIC
365 783760 : FD_TPOOL_EXEC_ALL_IMPL_HDR(taskq)
366 783760 : ulong * l_next = (ulong *)_tpool;
367 783760 : void * tpool = (void *)l_next[1];
368 83353498 : for(;;) {
369 :
370 : /* Note that we use an ATOMIC_CAS here instead of an
371 : ATOMIC_FETCH_AND_ADD to avoid overflow risks by having threads
372 : increment l0 into the tail. ATOMIC_FETCH_AND_ADD could be used
373 : if there is no requirement to the effect that l1+FD_TILE_MAX does
374 : not overflow. */
375 :
376 83353498 : FD_COMPILER_MFENCE();
377 83353498 : ulong m0 = *l_next;
378 83353498 : FD_COMPILER_MFENCE();
379 :
380 83353498 : if( FD_UNLIKELY( m0>=l1 ) ) break;
381 82466938 : ulong m1 = m0+1UL;
382 82466938 : if( FD_UNLIKELY( FD_ATOMIC_CAS( l_next, m0, m1 )!=m0 ) ) {
383 8959649 : FD_SPIN_PAUSE();
384 8959649 : continue;
385 8959649 : }
386 :
387 73507289 : task( tpool,t0,t1, args,reduce,stride, l0,l1, m0,m1, node_t0,node_t1 );
388 73507289 : }
389 783760 : FD_TPOOL_EXEC_ALL_IMPL_FTR
390 : #endif
391 :
392 763336 : FD_TPOOL_EXEC_ALL_IMPL_HDR(batch)
393 763336 : ulong m0; ulong m1; FD_TPOOL_PARTITION( l0,l1,1UL, node_t0-t0,t1-t0, m0,m1 );
394 763336 : task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m0,m1, node_t0,node_t1 );
395 763336 : FD_TPOOL_EXEC_ALL_IMPL_FTR
396 :
397 5534440 : FD_TPOOL_EXEC_ALL_IMPL_HDR(raw)
398 5534440 : task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, l0,l1, node_t0,node_t1 );
399 5534440 : FD_TPOOL_EXEC_ALL_IMPL_FTR
400 :
401 : #undef FD_TPOOL_EXEC_ALL_IMPL_FTR
402 : #undef FD_TPOOL_EXEC_ALL_IMPL_HDR
|