Line data Source code
1 : #include "fd_tpool.h"
2 :
3 : #if FD_HAS_THREADS
4 : #include <pthread.h>
5 : #endif
6 :
7 : struct fd_tpool_private_worker_cfg {
8 : fd_tpool_t * tpool;
9 : ulong tile_idx;
10 : };
11 :
12 : typedef struct fd_tpool_private_worker_cfg fd_tpool_private_worker_cfg_t;
13 :
14 : static int
15 : fd_tpool_private_worker_( int argc,
16 84 : char ** argv ) {
17 84 : ulong worker_idx = (ulong)(uint)argc;
18 84 : fd_tpool_private_worker_cfg_t * cfg = (fd_tpool_private_worker_cfg_t *)argv;
19 :
20 84 : fd_tpool_t * tpool = cfg->tpool;
21 84 : ulong tile_idx = cfg->tile_idx;
22 :
23 : /* We are BOOT */
24 :
25 84 : fd_tpool_private_worker_t worker[1];
26 :
27 84 : memset( worker, 0, sizeof(fd_tpool_private_worker_t) );
28 :
29 84 : worker->tile_idx = (uint) tile_idx;
30 :
31 84 : # if FD_HAS_THREADS
32 84 : int sleeper = !!(tpool->opt & FD_TPOOL_OPT_SLEEP);
33 :
34 84 : pthread_mutex_t lock[1];
35 84 : pthread_cond_t wake[1];
36 :
37 84 : if( FD_UNLIKELY( sleeper ) ) {
38 0 : if( FD_UNLIKELY( pthread_mutex_init( lock, NULL ) ) ) FD_LOG_ERR(( "pthread_mutex_init failed" ));
39 0 : if( FD_UNLIKELY( pthread_cond_init ( wake, NULL ) ) ) FD_LOG_ERR(( "pthread_cond_init failed" ));
40 0 : if( FD_UNLIKELY( pthread_mutex_lock( lock ) ) ) FD_LOG_ERR(( "pthread_mutex_lock failed" ));
41 0 : }
42 :
43 84 : worker->lock = (ulong)lock;
44 84 : worker->wake = (ulong)wake;
45 84 : # endif
46 :
47 84 : FD_COMPILER_MFENCE();
48 :
49 84 : fd_tpool_private_worker( tpool )[ worker_idx ] = worker;
50 :
51 84 : ulong const * arg = worker->arg;
52 84 : uint seq1 = worker->seq1;
53 :
54 2083355041 : for(;;) {
55 :
56 : /* We are IDLE ... see what we should do next */
57 :
58 2083355041 : # if FD_HAS_THREADS
59 2083355041 : if( FD_UNLIKELY( sleeper ) && FD_UNLIKELY( pthread_cond_wait( wake, lock ) ) )
60 0 : FD_LOG_WARNING(( "pthread_cond_wait failed; attempting to continue" ));
61 2083355041 : # endif
62 :
63 2083355041 : FD_COMPILER_MFENCE();
64 2083355041 : uint seq0 = worker->seq0;
65 2083355041 : FD_COMPILER_MFENCE();
66 2083355041 : uint _arg_cnt = worker->arg_cnt;
67 2083355041 : ulong _task = worker->task;
68 2083355041 : FD_COMPILER_MFENCE();
69 :
70 2083355041 : if( FD_UNLIKELY( seq0==seq1 ) ) { /* Got idle */
71 2040486659 : FD_SPIN_PAUSE();
72 2040486659 : continue;
73 2040486659 : }
74 :
75 42868382 : if( FD_UNLIKELY( !_task ) ) break; /* Got halt */
76 :
77 : /* We are EXEC ... do the task and then transition to IDLE */
78 :
79 42868298 : # ifdef __cplusplus
80 42868298 : try {
81 42868298 : # endif
82 :
83 42868298 : if( _arg_cnt==UINT_MAX ) {
84 :
85 8673517 : fd_tpool_task_t task = (fd_tpool_task_t)_task;
86 :
87 8673517 : void * task_tpool = (void *)arg[ 0];
88 8673517 : ulong task_t0 = arg[ 1]; ulong task_t1 = arg[ 2];
89 8673517 : void * task_args = (void *)arg[ 3];
90 8673517 : void * task_reduce = (void *)arg[ 4]; ulong task_stride = arg[ 5];
91 8673517 : ulong task_l0 = arg[ 6]; ulong task_l1 = arg[ 7];
92 8673517 : ulong task_m0 = arg[ 8]; ulong task_m1 = arg[ 9];
93 8673517 : ulong task_n0 = arg[10]; ulong task_n1 = arg[11];
94 :
95 8673517 : task( task_tpool,task_t0,task_t1, task_args, task_reduce,task_stride, task_l0,task_l1, task_m0,task_m1, task_n0,task_n1 );
96 :
97 34194781 : } else {
98 :
99 34194781 : fd_tpool_task_v2_t task = (fd_tpool_task_v2_t)_task;
100 :
101 34194781 : task( tpool, worker_idx, (ulong)_arg_cnt, arg );
102 :
103 34194781 : }
104 :
105 42868298 : # ifdef __cplusplus
106 42868298 : } catch( ... ) {
107 0 : FD_LOG_WARNING(( "uncaught exception; attempting to continue" ));
108 0 : }
109 42868298 : # endif
110 :
111 42868298 : FD_COMPILER_MFENCE();
112 :
113 28660983 : worker->seq1 = seq0;
114 28660983 : seq1 = seq0;
115 28660983 : }
116 :
117 : /* We are HALT ... clean up and terminate */
118 :
119 5 : # if FD_HAS_THREADS
120 5 : if( FD_UNLIKELY( sleeper ) ) {
121 0 : if( FD_UNLIKELY( pthread_mutex_unlock ( lock ) ) ) FD_LOG_WARNING(( "pthread_mutex_unlock failed; attempting to continue" ));
122 0 : if( FD_UNLIKELY( pthread_cond_destroy ( wake ) ) ) FD_LOG_WARNING(( "pthread_cond_destroy failed; attempting to continue" ));
123 0 : if( FD_UNLIKELY( pthread_mutex_destroy( lock ) ) ) FD_LOG_WARNING(( "pthread_mutex_destroy failed; attempting to continue" ));
124 0 : }
125 5 : # endif
126 :
127 5 : return 0;
128 84 : }
129 :
130 : #if FD_HAS_THREADS
131 : void
132 0 : fd_tpool_private_wake( fd_tpool_private_worker_t * worker ) {
133 0 : pthread_mutex_t * lock = (pthread_mutex_t *)worker->lock;
134 0 : pthread_cond_t * wake = (pthread_cond_t *)worker->wake;
135 0 : if( FD_UNLIKELY( pthread_mutex_lock ( lock ) ) ) FD_LOG_WARNING(( "pthread_mutex_lock failed; attempting to continue" ));
136 0 : if( FD_UNLIKELY( pthread_cond_signal ( wake ) ) ) FD_LOG_WARNING(( "pthread_cond_signal failed; attempting to continue" ));
137 0 : if( FD_UNLIKELY( pthread_mutex_unlock( lock ) ) ) FD_LOG_WARNING(( "pthread_mutex_unlock failed; attempting to continue" ));
138 0 : }
139 : #endif
140 :
141 : ulong
142 3105 : fd_tpool_align( void ) {
143 3105 : return FD_TPOOL_ALIGN;
144 3105 : }
145 :
146 : ulong
147 3003099 : fd_tpool_footprint( ulong worker_max ) {
148 3003099 : if( FD_UNLIKELY( !((1UL<=worker_max) & (worker_max<=FD_TILE_MAX)) ) ) return 0UL;
149 474186 : return fd_ulong_align_up( sizeof(fd_tpool_private_worker_t) +
150 474186 : sizeof(fd_tpool_t) + worker_max*sizeof(fd_tpool_private_worker_t *), FD_TPOOL_ALIGN );
151 3003099 : }
152 :
153 : fd_tpool_t *
154 : fd_tpool_init( void * mem,
155 : ulong worker_max,
156 3105 : ulong opt ) {
157 :
158 3105 : FD_COMPILER_MFENCE();
159 :
160 3105 : if( FD_UNLIKELY( !mem ) ) {
161 3 : FD_LOG_WARNING(( "NULL mem" ));
162 3 : return NULL;
163 3 : }
164 :
165 3102 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, fd_tpool_align() ) ) ) {
166 3 : FD_LOG_WARNING(( "bad alignment" ));
167 3 : return NULL;
168 3 : }
169 :
170 3099 : ulong footprint = fd_tpool_footprint( worker_max );
171 3099 : if( FD_UNLIKELY( !footprint ) ) {
172 6 : FD_LOG_WARNING(( "bad worker_max" ));
173 6 : return NULL;
174 6 : }
175 :
176 3093 : fd_memset( mem, 0, footprint );
177 :
178 3093 : fd_tpool_private_worker_t * worker0 = (fd_tpool_private_worker_t *)mem;
179 :
180 3093 : worker0->seq0 = 1U;
181 3093 : worker0->seq1 = 0U;
182 :
183 3093 : fd_tpool_t * tpool = (fd_tpool_t *)(worker0+1);
184 :
185 3093 : tpool->opt = opt;
186 3093 : tpool->worker_max = (uint)worker_max;
187 3093 : tpool->worker_cnt = 1U;
188 :
189 3093 : FD_COMPILER_MFENCE();
190 3093 : fd_tpool_private_worker( tpool )[0] = worker0;
191 3093 : FD_COMPILER_MFENCE();
192 :
193 3093 : return tpool;
194 3099 : }
195 :
196 : void *
197 3096 : fd_tpool_fini( fd_tpool_t * tpool ) {
198 :
199 3096 : FD_COMPILER_MFENCE();
200 :
201 3096 : if( FD_UNLIKELY( !tpool ) ) {
202 3 : FD_LOG_WARNING(( "NULL tpool" ));
203 3 : return NULL;
204 3 : }
205 :
206 3156 : while( fd_tpool_worker_cnt( tpool )>1UL ) {
207 63 : if( FD_UNLIKELY( !fd_tpool_worker_pop( tpool ) ) ) {
208 0 : FD_LOG_WARNING(( "fd_tpool_worker_pop failed" ));
209 0 : return NULL;
210 0 : }
211 63 : }
212 :
213 3093 : return (void *)fd_tpool_private_worker0( tpool );
214 3093 : }
215 :
216 : fd_tpool_t *
217 : fd_tpool_worker_push( fd_tpool_t * tpool,
218 120 : ulong tile_idx ) {
219 :
220 120 : FD_COMPILER_MFENCE();
221 :
222 120 : if( FD_UNLIKELY( !tpool ) ) {
223 3 : FD_LOG_WARNING(( "NULL tpool" ));
224 3 : return NULL;
225 3 : }
226 :
227 117 : if( FD_UNLIKELY( !tile_idx ) ) {
228 3 : FD_LOG_WARNING(( "cannot push tile_idx 0" ));
229 3 : return NULL;
230 3 : }
231 :
232 114 : if( FD_UNLIKELY( tile_idx==fd_tile_idx() ) ) {
233 3 : FD_LOG_WARNING(( "cannot push self" ));
234 3 : return NULL;
235 3 : }
236 :
237 111 : if( FD_UNLIKELY( tile_idx>=fd_tile_cnt() ) ) {
238 3 : FD_LOG_WARNING(( "invalid tile_idx" ));
239 3 : return NULL;
240 3 : }
241 :
242 108 : fd_tpool_private_worker_t ** worker = fd_tpool_private_worker( tpool );
243 108 : ulong worker_cnt = (ulong)tpool->worker_cnt;
244 :
245 108 : if( FD_UNLIKELY( worker_cnt>=(ulong)tpool->worker_max ) ) {
246 3 : FD_LOG_WARNING(( "too many workers" ));
247 3 : return NULL;
248 3 : }
249 :
250 507 : for( ulong worker_idx=0UL; worker_idx<worker_cnt; worker_idx++ )
251 420 : if( worker[ worker_idx ]->tile_idx==tile_idx ) {
252 18 : FD_LOG_WARNING(( "tile_idx already added to tpool" ));
253 18 : return NULL;
254 18 : }
255 :
256 87 : fd_tpool_private_worker_cfg_t cfg[1];
257 :
258 87 : cfg->tpool = tpool;
259 87 : cfg->tile_idx = tile_idx;
260 :
261 87 : int argc = (int)(uint)worker_cnt;
262 87 : char ** argv = (char **)fd_type_pun( cfg );
263 :
264 87 : FD_COMPILER_MFENCE();
265 87 : worker[ worker_cnt ] = NULL;
266 87 : FD_COMPILER_MFENCE();
267 :
268 87 : if( FD_UNLIKELY( !fd_tile_exec_new( tile_idx, fd_tpool_private_worker_, argc, argv ) ) ) {
269 3 : FD_LOG_WARNING(( "fd_tile_exec_new failed (tile probably already in use)" ));
270 3 : return NULL;
271 3 : }
272 :
273 3140 : while( !FD_VOLATILE_CONST( worker[ worker_cnt ] ) ) FD_SPIN_PAUSE();
274 :
275 84 : tpool->worker_cnt = (uint)(worker_cnt + 1UL);
276 84 : return tpool;
277 87 : }
278 :
279 : fd_tpool_t *
280 93 : fd_tpool_worker_pop( fd_tpool_t * tpool ) {
281 :
282 93 : FD_COMPILER_MFENCE();
283 :
284 93 : if( FD_UNLIKELY( !tpool ) ) {
285 3 : FD_LOG_WARNING(( "NULL tpool" ));
286 3 : return NULL;
287 3 : }
288 :
289 90 : ulong worker_cnt = (ulong)tpool->worker_cnt;
290 90 : if( FD_UNLIKELY( worker_cnt<=1UL ) ) {
291 3 : FD_LOG_WARNING(( "no workers to pop" ));
292 3 : return NULL;
293 3 : }
294 :
295 : /* Testing for IDLE isn't strictly necessary given requirements to use
296 : this and this isn't being done atomically with the actually pop but
297 : does help catch obvious user errors. */
298 :
299 87 : if( FD_UNLIKELY( !fd_tpool_worker_idle( tpool, worker_cnt-1UL ) ) ) {
300 3 : FD_LOG_WARNING(( "worker to pop is not idle" ));
301 3 : return NULL;
302 3 : }
303 :
304 : /* Send HALT to the worker */
305 :
306 84 : fd_tpool_private_worker_t * worker = fd_tpool_private_worker( tpool )[ worker_cnt-1UL ];
307 84 : uint seq0 = worker->seq0 + 1U;
308 84 : fd_tile_exec_t * exec = fd_tile_exec( worker->tile_idx );
309 :
310 84 : worker->task = 0UL;
311 84 : FD_COMPILER_MFENCE();
312 84 : worker->seq0 = seq0;
313 84 : FD_COMPILER_MFENCE();
314 84 : if( FD_UNLIKELY( tpool->opt & FD_TPOOL_OPT_SLEEP ) ) fd_tpool_private_wake( worker );
315 :
316 : /* Wait for the worker to shutdown */
317 :
318 84 : int ret;
319 84 : char const * err = fd_tile_exec_delete( exec, &ret );
320 84 : if( FD_UNLIKELY( err ) ) FD_LOG_WARNING(( "tile err \"%s\" unexpected; attempting to continue", err ));
321 84 : else if( FD_UNLIKELY( ret ) ) FD_LOG_WARNING(( "tile ret %i unexpected; attempting to continue", ret ));
322 :
323 84 : tpool->worker_cnt = (uint)(worker_cnt - 1UL);
324 84 : return tpool;
325 87 : }
326 :
327 : #define FD_TPOOL_EXEC_ALL_IMPL_HDR(style) \
328 : void \
329 : fd_tpool_private_exec_all_##style##_node( void * _node_tpool, \
330 : ulong node_t0, ulong node_t1, \
331 : void * args, \
332 : void * reduce, ulong stride, \
333 : ulong l0, ulong l1, \
334 : ulong _task, ulong _tpool, \
335 11521067 : ulong t0, ulong t1 ) { \
336 11521067 : fd_tpool_t * node_tpool = (fd_tpool_t * )_node_tpool; \
337 11521067 : fd_tpool_task_t task = (fd_tpool_task_t)_task; \
338 11521067 : ulong wait_cnt = 0UL; \
339 11521067 : ushort wait_child[16]; /* Assumes tpool_cnt<=65536 */ \
340 18575758 : for(;;) { \
341 18575758 : ulong node_t_cnt = node_t1 - node_t0; \
342 18575758 : if( node_t_cnt<=1L ) break; \
343 18575758 : ulong node_ts = node_t0 + fd_tpool_private_split( node_t_cnt ); \
344 8366693 : fd_tpool_exec( node_tpool, node_ts, fd_tpool_private_exec_all_##style##_node, \
345 8366693 : node_tpool, node_ts,node_t1, args, reduce,stride, l0,l1, _task,_tpool, t0,t1 ); \
346 8366693 : wait_child[ wait_cnt++ ] = (ushort)node_ts; \
347 8366693 : node_t1 = node_ts; \
348 8366693 : }
349 :
350 : #define FD_TPOOL_EXEC_ALL_IMPL_FTR \
351 19839302 : while( wait_cnt ) fd_tpool_wait( node_tpool, (ulong)wait_child[ --wait_cnt ] ); \
352 11521067 : }
353 :
354 1024576 : FD_TPOOL_EXEC_ALL_IMPL_HDR(rrobin)
355 1024576 : ulong m_stride = t1-t0;
356 1024576 : ulong m = l0 + fd_ulong_min( node_t0-t0, ULONG_MAX-l0 ); /* robust against overflow */
357 47851274 : while( m<l1 ) {
358 46826698 : task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m,m+1UL, node_t0,node_t1 );
359 46826698 : m += fd_ulong_min( m_stride, ULONG_MAX-m ); /* robust against overflow */
360 46826698 : }
361 1024576 : FD_TPOOL_EXEC_ALL_IMPL_FTR
362 :
363 1014550 : FD_TPOOL_EXEC_ALL_IMPL_HDR(block)
364 1014550 : ulong m0; ulong m1; FD_TPOOL_PARTITION( l0,l1,1UL, node_t0-t0,t1-t0, m0,m1 );
365 47939472 : for( ulong m=m0; m<m1; m++ ) task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m,m+1UL, node_t0,node_t1 );
366 1014550 : FD_TPOOL_EXEC_ALL_IMPL_FTR
367 :
368 : #if FD_HAS_ATOMIC
369 1009795 : FD_TPOOL_EXEC_ALL_IMPL_HDR(taskq)
370 1009795 : ulong * l_next = (ulong *)_tpool;
371 1009795 : void * tpool = (void *)l_next[1];
372 98554668 : for(;;) {
373 :
374 : /* Note that we use an ATOMIC_CAS here instead of an
375 : ATOMIC_FETCH_AND_ADD to avoid overflow risks by having threads
376 : increment l0 into the tail. ATOMIC_FETCH_AND_ADD could be used
377 : if there is no requirement to the effect that l1+FD_TILE_MAX does
378 : not overflow. */
379 :
380 98554668 : FD_COMPILER_MFENCE();
381 98554668 : ulong m0 = *l_next;
382 98554668 : FD_COMPILER_MFENCE();
383 :
384 98554668 : if( FD_UNLIKELY( m0>=l1 ) ) break;
385 97607393 : ulong m1 = m0+1UL;
386 97607393 : if( FD_UNLIKELY( FD_ATOMIC_CAS( l_next, m0, m1 )!=m0 ) ) {
387 37385443 : FD_SPIN_PAUSE();
388 37385443 : continue;
389 37385443 : }
390 :
391 60221950 : task( tpool,t0,t1, args,reduce,stride, l0,l1, m0,m1, node_t0,node_t1 );
392 60221950 : }
393 1009795 : FD_TPOOL_EXEC_ALL_IMPL_FTR
394 : #endif
395 :
396 1025784 : FD_TPOOL_EXEC_ALL_IMPL_HDR(batch)
397 1025784 : ulong m0; ulong m1; FD_TPOOL_PARTITION( l0,l1,1UL, node_t0-t0,t1-t0, m0,m1 );
398 1025784 : task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m0,m1, node_t0,node_t1 );
399 1025784 : FD_TPOOL_EXEC_ALL_IMPL_FTR
400 :
401 7446362 : FD_TPOOL_EXEC_ALL_IMPL_HDR(raw)
402 7446362 : task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, l0,l1, node_t0,node_t1 );
403 7446362 : FD_TPOOL_EXEC_ALL_IMPL_FTR
404 :
405 : #undef FD_TPOOL_EXEC_ALL_IMPL_FTR
406 : #undef FD_TPOOL_EXEC_ALL_IMPL_HDR
|