Line data Source code
1 : #ifndef HEADER_fd_src_util_tpool_fd_tpool_h
2 : #define HEADER_fd_src_util_tpool_fd_tpool_h
3 :
4 : /* tpool provides APIs to group sets of tiles together for ultra low
5 : overhead and high scalability launching of thread parallel jobs.
6 : There is a lot of nuance that most thread pool APIs and
7 : implementations get wrong. And this nuance is very useful to
8 : understand why the APIs and implementations are the way they are.
9 : So, a crash course:
10 :
11 : High performance thread parallelism crash course ********************
12 :
13 : Consider the simple and highly idealized case of a job with N
14 : independent tasks that each take an approximately uniform time
15 : tau_task to execute.
16 :
17 : A serial implementation of this job might look like:
18 :
19 : for( ulong n=0; n<N; n++ ) ... do task n ...;
20 :
21 : And the overall time to do such a job is:
22 :
23 : T_serial ~ tau_overhead + tau_task N
24 :
25 : where tau_overhead represents O(1) costs like the overhead to setup
26 : the loop that will do the N tasks.
27 :
28 : In a parallel implementation, we want to uniformly divide these tasks
29 : over P threads to speed this up. If we use the naive strategy that
30 : many codes and libraries use, we have something like (either
31 : explicitly or under the hood and ignoring error trapping):
32 :
33 : static void *
34 : task_block( void * _args ) {
35 : task_args_t * args = (task_args_t *)_args;
36 : ... unpack task args here, including
37 : ulong n0 = args->n0;
38 : ulong n1 = args->n1;
39 :
40 : for( ulong n=n0; n<n1; n++ ) ... do task n ...;
41 :
42 : return NULL;
43 : }
44 :
45 : ... in the main thread ...
46 :
47 : task_args_t * args = (task_args_t *)malloc( P*sizeof(task_args_t) );
48 : pthread_t * thread = (pthread_t *) malloc( P*sizeof(pthread_t) );
49 : for( ulong p=0; p<P; p++ ) args[p] = ... block p info ...;
50 : for( ulong p=1; p<P; p++ ) pthread_create( &thread[p], NULL, task_block, &args[p] );
51 : task_block( &args[0] );
52 : for( ulong p=1; p<P; p++ ) pthread_join( thread[p], NULL );
53 : free( thread );
54 : free( args );
55 :
56 : (Above, we used the main thread as one of the P threads in job
57 : execution such that it both dispatches and does a share of the work.
58 : This discussion still applies if we just have main do dispatch only.)
59 :
60 : Ugh ... this does parallelize the task over P threads but it is
61 : practically worthless. The overall time to execute is:
62 :
63 : T_parallel_dumb ~ tau_overhead_dumb
64 : + tau_partition_dumb P
65 : + tau_dispatch_dumb (P-1)
66 : + tau_task N / P
67 :
68 : tau_overhead_dumb represents that the overhead from before, but
69 : with more dumb. That is, it is likely a lot larger than the serial
70 : version because of the allocations and need to wrap up the tasks in a
71 : thread dispatch function signature compatible way (pthread_create
72 : "start_routine" in this case). (There are usually additional
73 : overheads omitted from this simplified discussion.)
74 :
75 : tau_partition_dumb is the time spent by the main thread partitioning
76 : the work for the worker threads per worker thread. This is dumb
77 : because this work could be parallelized (i.e. done within the task
78 : itself). A second, less obvious, dumbness is that, in complex work
79 : distribution scenarios, it is often very difficult for the main
80 : thread to determine the optimal partition a worker thread should do
81 : (see sgemm dispatch example below of something very hard to do
82 : centrally but very easy to do distributed). This ends up a triple
83 : whammy of awful: the parallelization strategy is inefficient,
84 : computed inefficiently and computed by one thread. Amdahl
85 : bottlenecks for everyone!
86 :
87 : tau_dispatch_dumb is the time needed to start and stop a worker
88 : thread to run a task. This is dumb because things like
89 : pthread_create have a huge overhead and because the total overhead is
90 : linear in P. (In practice, this is even dumber than it appears
91 : because we didn't do anything fancy with the created threads to
92 : mitigate thread-core affinity effects, IPC effects, kernel scheduling
93 : effects, NUMA effects, TLB effects, ... neglected in this idealized
94 : example.)
95 :
96 : Thus, we better have an infinite N if we are going to be able
97 : usefully parallelize over many threads under the dumb parallelization
98 : strategies.
99 :
100 : But, in the real world, N is not infinite and often our goal is to
101 : reduce the time needed to do a finite amount of work. Throwing all
102 : the threads in the world at the job will not help because, while
103 : increasing the number of threads decreases the amount of time
104 : required to do the tasks hyperbolically, it _increases_ _linearly_
105 : the amount of time needed to setup the threads to do those tasks.
106 :
107 : This leads to the question: what is maximum number of threads we can
108 : use profitably?
109 :
110 : This will be the P that minimizes T_parallel_dumb. Call this value
111 : P_max_useful_dumb. Given this point is a minima of T_parallel_dumb,
112 : for P<P_max_useful_dumb / P>P_max_useful_dumb, using _more_ / _less_
113 : threads for fixed sized job is useful.
114 :
115 : From basic calculus, P_max_useful_dumb is where the derivative of
116 : T_parallel_dumb with respect to P is zero. With:
117 :
118 : dT_parallel_dumb / dP ~ tau_partition_dumb
119 : + tau_dispatch_dumb
120 : - tau_task N / P^2
121 :
122 : we can solve for dT_parallel_dumb / dP = 0. This yields:
123 :
124 : P_max_useful_dumb ~ sqrt( tau_task N / ( tau_partition_dumb + tau_dispatch_dumb ) )
125 :
126 : This does not even weak scale! Doubling N does not double the
127 : P_max_useful_dumb; it only allows increases by ~sqrt(2). Sad.
128 :
129 : And it is worse than that because tau_dispatch_dumb is typically
130 : massive and tau_task is often infinitesimal.
131 :
132 : For example, in parallelizing level 1 matrix operations (e.g. a
133 : vector add), N is often related to the number of scalar flops needed
134 : and thus tau_task is small O(1) times the marginal cost of doing a
135 : single flop, assuming the problem itself fits within cache. That is,
136 : tau_task is measured in picoseconds (e.g. wide vector SIMD double
137 : pumped FMA units on a ~3 GHz core).
138 :
139 : But tau_dispatch_dumb is measured in milliseconds (e.g.
140 : pthread_create + pthread_join with all their glorious O/S and kernel
141 : scheduler overheads).
142 :
143 : This means we need huge N before it being even worthwhile to consider
144 : parallelization under the dumb strategy. And once N is large enough,
145 : the problem transitions from being cacheable to uncachable. And in
146 : that regime, memory bandwidth limits not considered here dominate
147 : (all those cores starved of useful work because of badly designed
148 : software and limited memory bandwidth). Embarrassing.
149 :
150 : All too common a story: "We parallelized [important thing] with
151 : [rando popular but naive thread library du jour]. It got slower.
152 : Guess it was already as efficient as it could be."
153 :
154 : Sigh ... fortunately, the math also gives the prescription of what we
155 : need to do (and maybe gives a hint, contrary to belief floating
156 : around in blogs and what not, math is in fact critically important
157 : for coding).
158 :
159 : First, we need to parallelize the partition calculation by having
160 : each worker thread concurrently compute their slice of the job. We
161 : can do that by passing a reference to a description of whole job
162 : along with just enough context to each thread that they can determine
163 : their slice (e.g. a thread index and number of threads). This
164 : eliminates tau_partition_dumb from the denominator above but it
165 : requires a better thread dispatcher function signature than used
166 : above.
167 :
168 : Second, we create all the threads we are going to use up front, pin
169 : them to dedicated isolated cores at high kernel schedule priority
170 : with optimized NUMA and TLB aware stacks and scratch spaces to
171 : massively reduce tau_dispatch_dumb overheads (and also fix some more
172 : subtle performance drains in practical usage at the same time).
173 : These threads will spin on memory in optimized ways to make the time
174 : to wake up a thread on the order of the time it takes to transfer a
175 : cache line to a core (tens to hundreds of ns).
176 :
177 : Together, these turn the above into something like:
178 :
179 : static void
180 : task_block( void * _args,
181 : ulong p,
182 : ulong P ) {
183 : job_args_t * args = (job_args_t *)_args;
184 : ... unpack job args here, including
185 : ulong N = args->N;
186 :
187 : // Compute the block of tasks to be done by this thread
188 : ulong n0;
189 : ulong n1;
190 : FD_TPOOL_PARTITION( 0,N,1, p,P, n0,n1 );
191 :
192 : for( ulong n=n0; n<n1; n++ ) ... do task n ...;
193 : }
194 :
195 : ... in main thread ...
196 :
197 : job_args_t args[1] = { ... job info ... };
198 : for( ulong p=1; p<P; p++ ) tpool_exec( tpool,p, task_block,args, p,P );
199 : task_block( args, 0,P );
200 : for( ulong p=1; p<P; p++ ) tpool_wait( tpool,p );
201 :
202 : This is dramatically better in implementation as now we have:
203 :
204 : P_max_useful_okay ~ sqrt( tau_task N / tau_dispatch_smart )
205 :
206 : where tau_dispatch_smart << tau_dispatch_dumb. Additionally, the
207 : various overheads in the main thread are much lower.
208 :
209 : While this is able to use dramatically more threads profitably for a
210 : fixed job size, we still aren't able to weak scale. To do that, we
211 : need to parallelize the thread dispatch too.
212 :
213 : Third, we parallelize thread dispatch. The obvious but wrong
214 : solution is to use a single common memory location that all worker
215 : threads in the pool will wait on to start execution and a single
216 : common memory location that all workers will atomically increment at
217 : completion so the main thread only has to spin on one memory location
218 : to tell if the job is complete.
219 :
220 : This is wrong practically because, while it superficially looks like
221 : O(1) operations for the dispatch and wait, this is a cache coherence
222 : protocol nightmare beyond compare. There will be large constant O(N)
223 : messaging on the CPUs internal network-on-chip to distribute the
224 : information to/from all the worker threads. Such will often not be
225 : parallelized under the hood particularly well or particularly
226 : deterministic (especially the waiting part ... the main thread still
227 : needs to get N updates from the workers that will be serialized with
228 : lots of cache line ping-pong-ing on top of atomic operations under
229 : the hood as they funnel back toward the main thread).
230 :
231 : Instead, we can recursively divide the job to unambiguously
232 : parallelize task dispatch. This yields something like:
233 :
234 : static void
235 : task_node( tpool_t * tpool, // thread pool to use
236 : ulong p0, // Assumes p0>=0, caller is p0
237 : ulong p1, // Assumes p1>p0
238 : void * args,
239 : ulong P ) { // Assumes P>=p1
240 :
241 : // At this point we are worker thread p0 and we are responsible
242 : // for dispatching work to workers [p0,p1)
243 :
244 : ulong p_cnt = p1-p0;
245 : if( p_cnt>1UL ) {
246 :
247 : // At this point, we have more than one thread available.
248 : // Split the workers approximately in half and have worker
249 : // thread ps dispatch work to threads [ps,p1) while we
250 : // concurrently dispatch work to threads [p0,ps).
251 :
252 : ulong ps = p0 + (p_cnt>>1);
253 : tpool_exec( tpool,ps, task_node, tpool,ps,p1, args, P );
254 : task_node( tpool,p0,ps, args, P );
255 : tpool_wait( tpool,ps );
256 : return;
257 : }
258 :
259 : // At this point, there is only one thread available
260 : // Do our slice of the job.
261 :
262 : job_args_t * args = (job_args_t *)_args;
263 : ... unpack job args here, including
264 : ulong N = args->N;
265 :
266 : // Compute block of tasks to be done by this thread
267 : ulong n0;
268 : ulong n1;
269 : FD_TPOOL_PARTITION( 0,N,1, p,P, n0,n1 );
270 :
271 : for( ulong n=n0; n<n1; n++ ) ... do task n ...;
272 : }
273 :
274 : ... in the main thread ...
275 :
276 : job_args_t args[1] = { ... job info ... };
277 : task_node( tpool,0,P, args, P );
278 :
279 : This has an algorithmically different and cheaper cost model for
280 : thread parallelization:
281 :
282 : T_parallel_smart ~ tau_overhead_smart
283 : + tau_dispatch_smart ceil log2(P)
284 : + tau_task N / P
285 :
286 : With the magical power of calculus again, noting that
287 : log2(P)=ln(P)/ln(2) and ceil log2(P) ~ log2(P) asymptotically, we
288 : have:
289 :
290 : dT_parallel_smart / dP ~ tau_dispatch_smart / (P ln(2)) - tau_task N/P^2
291 :
292 : such that:
293 :
294 : P_max_useful_smart ~ tau_task ln(2) N / tau_dispatch_smart
295 :
296 : Now we can weak scale! If we double the amount of work, we can
297 : double the maximum useful threads we can apply.
298 :
299 : Since tau_overhead_smart ~ tau_overhead_serial << tau_overhead_dumb,
300 : tau_dispatch_smart << tau_dispatch_dumb and N isn't stuck inside a
301 : sqrt anymore, we also can profitably parallelize _orders_ _of_
302 : _magnitude_ smaller problems than before.
303 :
304 : As a further bonus, the above dispatch pattern naturally supports
305 : much more complex parallelizations (see the sgemm example below) and
306 : naturally has good thread-NUMA topology oblivious and cache oblivious
307 : algorithmic properties (ideally the tpool threads themselves have
308 : good spatial locality for best results).
309 :
310 : Last, we can wrap all this up so that, in simple cases, all the user
311 : needs to do is:
312 :
313 : static void
314 : task_block( void * _args,
315 : ulong p,
316 : ulong P ) {
317 : job_args_t * args = (job_args_t *)_args;
318 : ... unpack job args here, including
319 : ulong N = args->N;
320 :
321 : // Compute block of tasks to be done by this thread
322 : ulong n0;
323 : ulong n1;
324 : FD_TPOOL_PARTITION( 0,N,1, p,P, n0,n1 );
325 :
326 : for( ulong n=n0; n<n1; n++ ) ... do task n ...;
327 : }
328 :
329 : ... in the main thread ...
330 :
331 : job_args_t args[1] = { ... args ... };
332 : tpool_exec_all( tpool,0,P, task_block,args );
333 :
334 : and, in the main thread, it will act like:
335 :
336 : job_args_t args[1] = { ... args ... };
337 : for( ulong p=0; p<P; p++ ) task_block( args, p,P );
338 :
339 : but have a performance characteristic like T_parallel_smart.
340 :
341 : The actual function signature used for a fd_tpool_task_t below was
342 : picked to be sufficient to tightly and uniformly partition an
343 : arbitrarily shaped layer 3 blas sgemm A^T B matrix calculation over
344 : lots of cores in an arbitrarily shaped NUMA topologies without a lot
345 : of overhead in the dispatch logic. For example (and this also shows
346 : a powerful use case for scratch memory too):
347 :
348 : // This is called by worker thread t0 and uses tpool worker threads
349 : // [t0,t1) to compute:
350 : //
351 : // C([0,l1-l0),[0,m1-m0)) = A([m0,m1),[l0,l1))' B([m0,m1),[n0,n1))
352 :
353 : void
354 : atb_node( fd_tpool_t * tpool, // tpool to use
355 : ulong t0, ulong t1, // Assumes t1>t0
356 : atb_args_t * args, // Location of A and B matrices and their column strides
357 : float * C, // (l1-l0)*(n1-n0) col-major matrix with col stride sC
358 : ulong sC, // Assumes sC>=(l1-l0), C(i,j) at C[ i + sC*j ]
359 : ulong l0, ulong l1, // Assumes l1>=l0
360 : ulong m0, ulong m1, // Assumes m1>=m0
361 : ulong n0, ulong n1 ) { // Assumes n1>=n0
362 :
363 : // At this point, we are worker thread t0 and we are responsible
364 : // for dispatching work to worker threads [t0,t1)
365 :
366 : ulong t_cnt = t1-t0;
367 : if( t_cnt>1UL ) { // ... and optionally the task big enough to be worth parallelizing
368 :
369 : // We need to split this task over more than one thread.
370 : //
371 : // Split the worker threads into two approximately equal sized
372 : // sets (left and right) and then proportionally split the
373 : // longest matrix range to try to make the subtasks as square
374 : // as possible (and thus improve compute data ratio /
375 : // cacheability of subtasks). Note that we do not just split
376 : // matrix ranges in half because such a split would result in
377 : // an increasingly bad load imbalance between the left and
378 : // right workers when the matrix range to split is large but
379 : // t_cnt is small and odd.
380 : //
381 : // We then execute in parallel the left range on worker threads
382 : // [t0,th) and the right range on worker threads [th,t1). Yes,
383 : // this uses recursion in the thread dispatch to get radically
384 : // better scalability, lower overhead and ability to map
385 : // arbitrary shaped problems onto arbitrarily shaped massively
386 : // parallel hardware with minimal load imbalance (see notes
387 : // above for more details about this).
388 :
389 : ulong t_cnt_left = t_cnt >> 1;
390 : ulong ts = t0 + t_cnt_left;
391 :
392 : ulong l_cnt = l1-l0;
393 : ulong m_cnt = m1-m0;
394 : ulong n_cnt = n1-n0;
395 :
396 : if( FD_UNLIKELY( (m_cnt>l_cnt) & (m_cnt>n_cnt) ) ) { // Split m range
397 :
398 : // Splitting along m is more onerous than splitting along l
399 : // or n because the left and right subtask outputs would end
400 : // up clobbering each other or, if being atomically too
401 : // clever by half, having a non-deterministic result due to an
402 : // indeterminant summation order and floating point
403 : // non-associativity. So we allocate a temporary matrix near
404 : // these cores to hold the right half partial reduction such
405 : // that the left and right can do their partial reductions
406 : // independently (and deterministically) and then do a
407 : // (deterministic) thread parallel reduction of the two
408 : // results on threads [t0,t1) afterward (a level of fine
409 : // grained parallelization few even think is possible on
410 : // commodity processors).
411 :
412 : ulong m_cnt_left = (m_cnt*t_cnt_left)/t_cnt; // handle overflow here
413 : ulong ms = m0 + m_cnt_left;
414 :
415 : fd_scratch_push();
416 :
417 : float * CR = (float *)fd_scratch_alloc( 0UL, l_cnt*n_cnt*sizeof(float) );
418 :
419 : fd_tpool_exec( tpool,ts, atb_node,tpool,ts,t1, args, CR,l_cnt, l0,l1, ms,m1, n0,n1 );
420 : atb_node( tpool,t0,ts, args, C,sC, l0,l1, m0,ms, n0,n1 );
421 : fd_tpool_wait( tpool,ts );
422 :
423 : // Do C([0,l_cnt),[0,n_cnt)) += CR([0,l_cnt),[0,n_cnt)) on threads [t0,t1) here
424 :
425 : fd_scratch_pop();
426 :
427 : } else if( FD_UNLIKELY( n_cnt>l_cnt ) ) { // Split n range
428 :
429 : ulong n_cnt_left = (n_cnt*t_cnt_left)/t_cnt; // handle overflow here
430 : ulong ns = n0 + n_cnt_left;
431 :
432 : float * CR = C + sC*n_cnt_left;
433 :
434 : fd_tpool_exec( tpool,ts, atb_node,tpool,ts,t1, args, CR,sC, l0,l1, m0,m1, ns,n1 );
435 : atb_node( tpool,t0,ts, args, C,sC, l0,l1, m0,m1, n0,ns );
436 : fd_tpool_wait( tpool,ts );
437 :
438 : } else { // Split l range
439 :
440 : ulong l_cnt_left = (l_cnt*t_cnt_left)/t_cnt; // handle overflow here
441 : ulong ls = l0 + l_cnt_left;
442 :
443 : float * CR = C + l_cnt_left;
444 :
445 : fd_tpool_exec( tpool,ts, atb_node,tpool,ts,t1, args, CR,sC, ls,l1, m0,m1, n0,n1 );
446 : atb_node( tpool,t0,ts, args, C,sC, l0,ls, m0,m1, n0,n1 );
447 : fd_tpool_wait( tpool,ts );
448 :
449 : }
450 :
451 : return;
452 : }
453 :
454 : // At this point, we are at a leaf node
455 : // Do C([0,l1-l0),[0,n1-n0)) = A([m0,m1),[l0,l1))' B([m0,m1),[n0,n1)) here
456 : }
457 :
458 : End of high performance thread parallelism crash course ************/
459 :
460 : #include "../tile/fd_tile.h"
461 : #include "../scratch/fd_scratch.h"
462 :
463 : /* FD_TPOOL_WORKER_STATE_* are possible states a tpool worker thread
464 : is in. Users practically should never see BOOT or HALT. */
465 :
466 79916957 : #define FD_TPOOL_WORKER_STATE_BOOT (0) /* Tile is booting */
467 29554834 : #define FD_TPOOL_WORKER_STATE_IDLE (1) /* Tile is idle */
468 31376983 : #define FD_TPOOL_WORKER_STATE_EXEC (2) /* Tile is executing a task */
469 6 : #define FD_TPOOL_WORKER_STATE_HALT (3) /* Tile is halting */
470 :
471 : /* FD_TPOOL_PARTITION partitions tasks indexed [task0,task1) over
472 : worker_cnt worker threads. On return, tasks indexed
473 : [worker_task0,worker_task1) is the range of tasks to be done by
474 : worker worker_idx.
475 :
476 : The number of tasks to a worker will be as uniform as possible, with
477 : the constraints that the tasks will be assigned to workers in
478 : monotonically increasing order and, for worker_idx<worker_cnt-1, the
479 : number of tasks assigned will be a multiple of lane_cnt in size.
480 : (That is, any final incomplete SIMD block will be assigned to worker
481 : worker_cnt-1.)
482 :
483 : Assumes task1>=task0, lane_cnt>0, worker_cnt>0 and
484 : worker_idx<worker_cnt. Performance will be fastest if lane_cnt
485 : and/or worker_cnt are an integer power-of-two (especially 1). This
486 : macro is robust. */
487 :
488 1544691749 : #define FD_TPOOL_PARTITION( task0, task1, lane_cnt, worker_idx, worker_cnt, worker_task0, worker_task1 ) do { \
489 1544691749 : ulong _ftp_task0 = (task0); \
490 1544691749 : ulong _ftp_task1 = (task1); \
491 1544691749 : ulong _ftp_lane_cnt = (lane_cnt); \
492 1544691749 : ulong _ftp_worker_idx = (worker_idx); \
493 1544691749 : ulong _ftp_worker_cnt = (worker_cnt); \
494 1544691749 : ulong _ftp_task_cnt = _ftp_task1 - _ftp_task0; \
495 1544691749 : ulong _ftp_block_cnt = _ftp_task_cnt / _ftp_lane_cnt; /* Num complete simd blocks, typically nop or fast shr */ \
496 1544691749 : ulong _ftp_block_rem = _ftp_task_cnt % _ftp_lane_cnt; /* Number of leftovers, typically nop or fast mask */ \
497 1544691749 : ulong _ftp_worker_block_min = _ftp_block_cnt / _ftp_worker_cnt; /* Min complete simd blocks for a worker */ \
498 1544691749 : ulong _ftp_worker_extra_cnt = _ftp_block_cnt % _ftp_worker_cnt; /* Num workers needing an extra complete simd block */ \
499 1544691749 : ulong _ftp_worker_task0 = _ftp_task0 \
500 1544691749 : + _ftp_lane_cnt*(_ftp_worker_block_min*_ftp_worker_idx + fd_ulong_min(_ftp_worker_idx,_ftp_worker_extra_cnt)); \
501 1544691749 : ulong _ftp_worker_task1 = _ftp_worker_task0 \
502 1544691749 : + _ftp_lane_cnt*(_ftp_worker_block_min + ((ulong)(_ftp_worker_idx<_ftp_worker_extra_cnt))) \
503 1544691749 : + fd_ulong_if( _ftp_worker_idx==(_ftp_worker_cnt-1UL), _ftp_block_rem, 0UL ); \
504 1544691749 : (worker_task0) = _ftp_worker_task0; \
505 1544691749 : (worker_task1) = _ftp_worker_task1; \
506 1544691749 : } while(0)
507 :
508 : /* A fd_tpool_task_t is the function signature used for the entry
509 : point of a task. Users are free to repurpose these arguments however
510 : they see fit for an individual executions but the number of
511 : arguments, types of argument and names used below reflect the intent
512 : described in the above crash course. Namely, this is adequate to map
513 : arbitrary level 3 BLAS dense matrix-matrix multiply with arbitrary
514 : shaped matrices to an arbitrary topology set of tiles ultra quickly
515 : and ultra tightly. Bulk dispatchers like tpool_exec_all might apply
516 : some additional conventions to some of these arguments. */
517 :
518 : typedef void
519 : (*fd_tpool_task_t)( void * tpool,
520 : ulong t0, ulong t1,
521 : void * args,
522 : void * reduce, ulong stride,
523 : ulong l0, ulong l1,
524 : ulong m0, ulong m1,
525 : ulong n0, ulong n1 );
526 :
527 : /* A fd_tpool_t is an opaque handle of a thread pool */
528 :
529 : struct fd_tpool_private;
530 : typedef struct fd_tpool_private fd_tpool_t;
531 :
532 : /* Private APIs *******************************************************/
533 :
534 : /* These are exposed here to facilitate inlining various operations in
535 : high performance contexts. */
536 :
537 : struct __attribute__((aligned(128))) fd_tpool_private_worker {
538 : fd_tpool_task_t task;
539 : void * task_tpool;
540 : ulong task_t0; ulong task_t1;
541 : void * task_args;
542 : void * task_reduce; ulong task_stride;
543 : ulong task_l0; ulong task_l1;
544 : ulong task_m0; ulong task_m1;
545 : ulong task_n0; ulong task_n1;
546 : int state;
547 : uint tile_idx;
548 : void * scratch;
549 : ulong scratch_sz;
550 : };
551 :
552 : typedef struct fd_tpool_private_worker fd_tpool_private_worker_t;
553 :
554 : struct fd_tpool_private {
555 :
556 : /* This point is 128 aligned and preceded by the worker0 sentinel */
557 :
558 : ulong worker_max; /* Positive */
559 : ulong worker_cnt; /* in [1,worker_max] */
560 :
561 : /* worker_max element fd_tpool_private_worker_t * array here, indexed
562 : [0,worker_cnt). worker[0] points to worker0 above. Note that we
563 : cannot use a flex array here because strict C++17 doesn't support
564 : it and we use C++ in fd_tpool.cxx to handle people using C++
565 : libraries that throw exceptions that are uncaught ... sigh. */
566 :
567 : };
568 :
569 : FD_PROTOTYPES_BEGIN
570 :
571 : FD_STATIC_ASSERT( FD_TILE_MAX<65536UL, update_implementation );
572 :
573 : /* fd_tpool_private_worker0 returns a pointer in the local address space
574 : to the worker0 sentinel */
575 :
576 : FD_FN_CONST static inline fd_tpool_private_worker_t *
577 3087 : fd_tpool_private_worker0( fd_tpool_t const * tpool ) {
578 3087 : return ((fd_tpool_private_worker_t *)tpool)-1;
579 3087 : }
580 :
581 : /* fd_tpool_private_worker returns a pointer in the local address space
582 : of the first element of the tpool's worker array. */
583 :
584 : FD_FN_CONST static inline fd_tpool_private_worker_t **
585 58761325 : fd_tpool_private_worker( fd_tpool_t const * tpool ) {
586 58761325 : return (fd_tpool_private_worker_t **)(tpool+1);
587 58761325 : }
588 :
589 : FD_FN_CONST static inline ulong /* Returns number of elements to left side */
590 31532114 : fd_tpool_private_split( ulong n ) { /* Assumes n>1 */
591 : # if 0 /* Simple splitting */
592 : return n>>1;
593 : # else /* NUMA aware splitting */
594 : /* This split has the property that the left side >= the right and one
595 : of the splits is the largest power of two smaller than n. It
596 : results in building a balanced tree (the same as the simple split)
597 : but with all the leaf nodes concentrated to toward the left when n
598 : isn't a power of two. This can yield a slight reduction of the
599 : number of messages that might have to cross a NUMA boundary in many
600 : common usage scenarios. */
601 31532114 : ulong tmp = 1UL << (fd_ulong_find_msb( n )-1);
602 31532114 : return fd_ulong_max( tmp, n-tmp );
603 31532114 : # endif
604 31532114 : }
605 :
606 : FD_PROTOTYPES_END
607 :
608 : /* End of private APIs ************************************************/
609 :
610 : /* FD_TPOOL_{ALIGN,FOOTPRINT} return the alignment and footprint
611 : required for a memory region to be used as a tpool. ALIGN will a
612 : integer power of two of at most 4096 and FOOTPRINT will be a multiple
613 : of ALIGN. worker_max is assumed to be valid (e.g. in
614 : [1,FD_TILE_MAX]. These are provided to facilitate compile time
615 : construction and for consistency with other constructors. (FIXME:
616 : consider using FD_LAYOUT here.) */
617 :
618 474183 : #define FD_TPOOL_ALIGN (128UL)
619 : #define FD_TPOOL_FOOTPRINT( worker_max ) ( ( sizeof(fd_tpool_private_worker_t) /* worker0 sentinel */ \
620 : + sizeof(fd_tpool_t) + /* tpool header */ \
621 : + ((ulong)(worker_max))*sizeof(fd_tpool_private_worker_t *) /* worker array */ \
622 : + FD_TPOOL_ALIGN-1UL ) & (~(FD_TPOOL_ALIGN-1UL)) )
623 :
624 : /* FD_TPOOL_WORKER_SCRATCH_DEPTH is the maximum number of scratch
625 : frames a worker thread scratch region can have. */
626 :
627 0 : #define FD_TPOOL_WORKER_SCRATCH_DEPTH (128UL)
628 :
629 : FD_PROTOTYPES_BEGIN
630 :
631 : /* fd_tpool_align returns FD_TPOOL_ALIGN. fd_tpool_footprint returns
632 : FD_TPOOL_FOOTPRINT( worker_max ) if worker_max is in [1,FD_TILE_MAX]
633 : or 0 otherwise. */
634 :
635 : FD_FN_CONST ulong fd_tpool_align( void );
636 : FD_FN_CONST ulong fd_tpool_footprint( ulong worker_max );
637 :
638 : /* fd_tpool_init formats a memory region mem with the appropriate
639 : alignment and footprint as a thread pool that can support up to
640 : worker_max worker threads. worker max must be in [1,FD_TILE_MAX].
641 : Returns a handle for the tpool (this is not a simple cast of mem) on
642 : success and NULL on failure (logs details). On a success return,
643 : worker 0 will already exist. Many threads can temporarily assume the
644 : identity of worker 0 as the situation merits. Worker 0 is typically
645 : the thread that starts dispatches to other worker threads in an
646 : operation and can also flexibly participates in the tpool in bulk
647 : operations. This uses init/fini semantics instead of
648 : new/join/leave/delete semantics because thread pools aren't
649 : meaningfully sharable between processes / thread groups. */
650 :
651 : fd_tpool_t *
652 : fd_tpool_init( void * mem,
653 : ulong worker_max );
654 :
655 : /* fd_tpool_fini pops all worker threads pushed into the tpool and
656 : unformats the underlying memory region. This should be called at
657 : most once by a thread that was not pushed into the tpool (e.g. a
658 : "worker 0" thread) and no other operations on the tpool should be in
659 : progress when this is called or done after this is called. Returns
660 : the memory region used by the tpool on success (this is not a simple
661 : cast of mem) and NULL on failure (logs details). */
662 :
663 : void *
664 : fd_tpool_fini( fd_tpool_t * tpool );
665 :
666 : /* fd_tpool_worker_push pushes tile tile_idx into the tpool. tile_idx
667 : 0, the calling tile, and tiles that have already been pushed into the
668 : tpool cannot be pushed. Further, tile_idx should be idle and no
669 : other tile operations should be done it while it is a part of the
670 : tpool.
671 :
672 : If scratch_sz is non-zero, this will assume that tile tile_idx
673 : currently has no scratch memory attached to it and configure tile
674 : tile_idx to use the memory with appropriate alignment whose first byte
675 : in the local address space is scratch and that has scratch_sz bytes
676 : total for its scratch memory. Otherwise, it will use whatever
677 : scratch memory has already been configured for tile_idx (or leave it
678 : unattached to a scratch memory). Scratch memory used by a tile
679 : should not be used for any other purposes while the tile is part of
680 : the tpool.
681 :
682 : IMPORTANT SAFETY TIP! Since worker 0 identity is flexible, it is the
683 : caller's responsibility to attach/detach the scratch memory as
684 : appropriate for a "worker 0 thread" that is not pushed into the
685 : tpool.
686 :
687 : Returns tpool on success (tile tile_idx is part of the tpool and will
688 : be considered as executing from tile's point of view while a member
689 : of the tpool ... no tile operations can or should be done on it while
690 : it is part of the tpool) and NULL on failure (logs details). Reasons
691 : for failure include NULL tpool, bad tile_idx, tile was not idle, bad
692 : scratch region specified, etc).
693 :
694 : No other operations on tpool should be in process when this is called
695 : or started while this is running. */
696 :
697 : fd_tpool_t *
698 : fd_tpool_worker_push( fd_tpool_t * tpool,
699 : ulong tile_idx,
700 : void * scratch,
701 : ulong scratch_sz );
702 :
703 : /* fd_tpool_worker_pop pops the most recently pushed tpool worker
704 : thread. If the tile is attached to some scratch memory as part of
705 : its push, it will be detached from it here.
706 :
707 : Returns tpool on success (the tile is no longer part of the tpool and
708 : considered idle from tile's POV and can be used for other purposes)
709 : and NULL on failure (logs details). Reasons for failure include NULL
710 : tpool, bad tile_idx, tile was not idle, bad scratch region, etc).
711 :
712 : No other operations on the tpool should be in process when this is
713 : called or started while this is running. */
714 :
715 : fd_tpool_t *
716 : fd_tpool_worker_pop( fd_tpool_t * tpool );
717 :
718 : /* Accessors. As these are used in high performance contexts, these do
719 : no input argument checking. Specifically, they assume tpool is valid
720 : and (if applicable) worker_idx in [0,worker_cnt). worker 0 is
721 : special. The tile_idx/scratch/scratch_sz for worker 0 are always
722 : returned as 0/NULL/0 here. */
723 :
724 6264 : FD_FN_PURE static inline ulong fd_tpool_worker_cnt( fd_tpool_t const * tpool ) { return tpool->worker_cnt; }
725 3114 : FD_FN_PURE static inline ulong fd_tpool_worker_max( fd_tpool_t const * tpool ) { return tpool->worker_max; }
726 :
727 : FD_FN_PURE static inline ulong
728 : fd_tpool_worker_tile_idx( fd_tpool_t const * tpool,
729 3093 : ulong worker_idx ) {
730 3093 : return (ulong)fd_tpool_private_worker( tpool )[ worker_idx ]->tile_idx;
731 3093 : }
732 :
733 : FD_FN_PURE static inline void *
734 : fd_tpool_worker_scratch( fd_tpool_t const * tpool,
735 3093 : ulong worker_idx ) {
736 3093 : return fd_tpool_private_worker( tpool )[ worker_idx ]->scratch;
737 3093 : }
738 :
739 : FD_FN_PURE static inline ulong
740 : fd_tpool_worker_scratch_sz( fd_tpool_t const * tpool,
741 3093 : ulong worker_idx ) {
742 3093 : return fd_tpool_private_worker( tpool )[ worker_idx ]->scratch_sz;
743 3093 : }
744 :
745 : /* fd_tpool_worker_state atomically observes the state of tpool thread
746 : worker_idx at some point in time between when the call was made and
747 : the call returns. As this is used in high performance contexts, does
748 : no input argument checking. Specifically, assumes tpool is valid and
749 : worker_idx is in [0,worker_cnt). Return value will be a
750 : FD_TPOOL_WORKER_STATE value (and, in correct usage, either IDLE or
751 : EXEC). worker 0 is special. The state here will always be EXEC. */
752 :
753 : static inline int
754 : fd_tpool_worker_state( fd_tpool_t const * tpool,
755 3099 : ulong worker_idx ) {
756 3099 : return FD_VOLATILE_CONST( fd_tpool_private_worker( tpool )[ worker_idx ]->state );
757 3099 : }
758 :
759 : /* fd_tpool_exec calls
760 :
761 : task( task_tpool,
762 : task_t0, task_t1,
763 : task_args,
764 : task_reduce, task_stride,
765 : task_l0, task_l1,
766 : task_m0, task_m1,
767 : task_n0, task_n1 );
768 :
769 : on tpool thread worker_idx. This will run concurrently with the
770 : caller. Uncaught exceptions in task will be logged by the remote
771 : thread (in principle, that is assuming the uncaught exception did not
772 : leave the application an unstable state, thread worker_idx will still
773 : be usable for additional fd_tpool_exec). As this is used in high
774 : performance contexts, does no input argument checking. Specifically,
775 : assumes tpool is valid, worker_idx in (0,worker_cnt) (yes, open on
776 : both ends), caller is not tpool thread worker_idx, task is valid.
777 : worker_idx 0 is special and is considered to always be in the EXEC
778 : state so we cannot call exec on it. */
779 :
780 : static inline void
781 : fd_tpool_exec( fd_tpool_t * tpool, ulong worker_idx,
782 : fd_tpool_task_t task,
783 : void * task_tpool,
784 : ulong task_t0, ulong task_t1,
785 : void * task_args,
786 : void * task_reduce, ulong task_stride,
787 : ulong task_l0, ulong task_l1,
788 : ulong task_m0, ulong task_m1,
789 31373887 : ulong task_n0, ulong task_n1 ) {
790 31373887 : fd_tpool_private_worker_t * worker = fd_tpool_private_worker( tpool )[ worker_idx ];
791 31373887 : FD_COMPILER_MFENCE();
792 31373887 : FD_VOLATILE( worker->task ) = task;
793 31373887 : FD_VOLATILE( worker->task_tpool ) = task_tpool;
794 31373887 : FD_VOLATILE( worker->task_t0 ) = task_t0; FD_VOLATILE( worker->task_t1 ) = task_t1;
795 31373887 : FD_VOLATILE( worker->task_args ) = task_args;
796 31373887 : FD_VOLATILE( worker->task_reduce ) = task_reduce; FD_VOLATILE( worker->task_stride ) = task_stride;
797 31373887 : FD_VOLATILE( worker->task_l0 ) = task_l0; FD_VOLATILE( worker->task_l1 ) = task_l1;
798 31373887 : FD_VOLATILE( worker->task_m0 ) = task_m0; FD_VOLATILE( worker->task_m1 ) = task_m1;
799 31373887 : FD_VOLATILE( worker->task_n0 ) = task_n0; FD_VOLATILE( worker->task_n1 ) = task_n1;
800 31373887 : FD_COMPILER_MFENCE();
801 31373887 : FD_VOLATILE( worker->state ) = FD_TPOOL_WORKER_STATE_EXEC;
802 31373887 : FD_COMPILER_MFENCE();
803 31373887 : }
804 :
805 : /* fd_tpool_wait waits for the tpool thread worker_idx to leave the
806 : EXEC state. As this is used in high performance contexts, does no
807 : input argument checking. Specifically, assumes tpool is valid,
808 : worker_idx in (0,worker_cnt) (yes, open on both ends) and caller is
809 : not tpool thread worker_idx. worker_idx 0 is considered to always be
810 : in an exec state so we cannot call wait on it. */
811 :
812 : static inline void
813 : fd_tpool_wait( fd_tpool_t const * tpool,
814 29301566 : ulong worker_idx ) {
815 29301566 : int volatile * vstate = (int volatile *)&(fd_tpool_private_worker( tpool )[ worker_idx ]->state);
816 29301566 : int state;
817 752451970 : for(;;) {
818 752451970 : state = *vstate;
819 752451970 : if( FD_LIKELY( state!=FD_TPOOL_WORKER_STATE_EXEC ) ) break;
820 720225371 : FD_SPIN_PAUSE();
821 720225371 : }
822 29301566 : }
823 :
824 : /* Assuming the tasks can be executed safely in any order and/or
825 : concurrently, fd_tpool_exec_all_rrobin, fd_tpool_exec_all_block and
826 : fd_tpool_exec_all_taskq are functionally equivalent to:
827 :
828 : for( ulong l=l0; l<l1; l++ )
829 : task( task_tpool,t0,t1, task_args,task_reduce,task_stride, task_l0,task_l1, l,l+1, t,t+1 );
830 :
831 : where t indicates to which tpool worker thread idx the particular
832 : task was assigned (and thus t is in [t0,t1), where thread t0 is the
833 : thread that did the dispatch). The rrobin variant stripes individual
834 : tasks deterministically over worker threads (e.g. worker thread t
835 : does tasks (t-t0)+0*(t1-t0),(t-t0)+1*(t1-t0),(t-t0)+2*(t1-t0), ...
836 : The block variant blocks individual tasks over worker threads.
837 :
838 : The taskq variant requires FD_HAS_ATOMIC support and assigns tasks to
839 : threads dynamically. Practically, this is only useful if there are a
840 : huge number of tasks to execute relative to the number of threads,
841 : the tasks to execute have highly non-uniform sizes, the cost to
842 : execute a task is much much greater than the cost of a single atomic
843 : memory operation, and thread-core affinity doesn't impact the time to
844 : execute a task ... conditions that, in total, are met far less
845 : frequently than most developers expect.)
846 :
847 : fd_tpool_exec_all_batch is functionally equivalent to:
848 :
849 : for( ulong t=t0; t<t1; t++ ) {
850 : ulong batch_task_l0;
851 : ulong batch_task_l1;
852 : FD_TPOOL_PARTITION( task_l0,task_l1,1, t-t0,t1-t0, batch_task_l0,batch_task_l1 );
853 : task( task_tpool,t0,t1, task_args,task_reduce,task_stride, task_l0,task_l1, batch_task_l0,batch_task_l1, t,t+1 );
854 : }
855 :
856 : The batch assigned to a thread will be the same tasks as those
857 : assigned to a thread by fd_tpool_exec_all_block. The difference is
858 : that fd_tpool_exec_all_batch effectively merges all the calls to task
859 : that fd_tpool_exec_all_block would make in a block into a single call
860 : per thread.
861 :
862 : fd_tpool_exec_all_raw is functionally equivalent to:
863 :
864 : for( ulong t=t0; t<t1; t++ )
865 : task( task_tpool,t0,t1, task_args,task_reduce,task_stride, task_l0,task_l1, task_l0,task_l1, t,t+1 );
866 :
867 : This allows the caller to use their own partitioning strategies with
868 : minimal overhead.
869 :
870 : All these are executed thread parallel using the calling thread and
871 : tpool worker threads (t0,t1) (yes, open on both ends ... the caller
872 : masquerades as worker thread t0 if isn't actually worker thread t0 as
873 : far as exec_all is concerned ... see safety tip above about scratch).
874 : The caller should not be any of worker threads (t0,t1) and worker
875 : threads (t0,t1) should be idle on entry and should not be dispatched
876 : to while an exec_all is running.
877 :
878 : As such, in all of these, a task knows automatically which worker
879 : thread is processing this (t), the range of tasks assigned to it
880 : (e.g. [batch_task_l0,batch_task_l1)), the entire range of workers
881 : [t0,t1) in use, the entire range of tasks [task_l0,task_l1) as well
882 : as the original values for task_tpool, task_args, task_reduce,
883 : task_stride.
884 :
885 : As this is used in high performance contexts, this does no input
886 : argument checking. Specifically, it assumes tpool is valid, task is
887 : valid, 0<=t0<t1<=worker_cnt, l0<=l1. */
888 :
889 : #define FD_TPOOL_EXEC_ALL_DECL(style) \
890 : void \
891 : fd_tpool_private_exec_all_##style##_node( void * _node_tpool, \
892 : ulong node_t0, ulong node_t1, \
893 : void * args, \
894 : void * reduce, ulong stride, \
895 : ulong l0, ulong l1, \
896 : ulong _task, ulong _tpool, \
897 : ulong t0, ulong t1 ); \
898 : \
899 : static inline void \
900 : fd_tpool_exec_all_##style( fd_tpool_t * tpool, \
901 : ulong t0, ulong t1, \
902 : fd_tpool_task_t task, \
903 : void * task_tpool, \
904 : void * task_args, \
905 : void * task_reduce, ulong task_stride, \
906 2797440 : ulong task_l0, ulong task_l1 ) { \
907 2797440 : fd_tpool_private_exec_all_##style##_node( tpool, t0,t1, task_args, task_reduce,task_stride, task_l0,task_l1, \
908 2797440 : (ulong)task,(ulong)task_tpool, t0,t1 ); \
909 2797440 : }
910 :
911 : FD_TPOOL_EXEC_ALL_DECL(rrobin)
912 : FD_TPOOL_EXEC_ALL_DECL(block)
913 : FD_TPOOL_EXEC_ALL_DECL(batch)
914 : FD_TPOOL_EXEC_ALL_DECL(raw)
915 :
916 : #if FD_HAS_ATOMIC
917 : void
918 : fd_tpool_private_exec_all_taskq_node( void * _node_tpool,
919 : ulong node_t0, ulong node_t1,
920 : void * args,
921 : void * reduce, ulong stride,
922 : ulong l0, ulong l1,
923 : ulong _task, ulong _l_next,
924 : ulong t0, ulong t1 );
925 :
926 : static inline void
927 : fd_tpool_exec_all_taskq( fd_tpool_t * tpool,
928 : ulong t0, ulong t1,
929 : fd_tpool_task_t task,
930 : void * task_tpool,
931 : void * task_args,
932 : void * task_reduce, ulong task_stride,
933 300000 : ulong task_l0, ulong task_l1 ) {
934 300000 : ulong l_next[16] __attribute((aligned(128)));
935 300000 : FD_VOLATILE( l_next[0] ) = task_l0;
936 300000 : FD_VOLATILE( l_next[1] ) = (ulong)task_tpool;
937 300000 : FD_COMPILER_MFENCE();
938 300000 : fd_tpool_private_exec_all_taskq_node( tpool, t0,t1, task_args, task_reduce,task_stride, task_l0,task_l1,
939 300000 : (ulong)task,(ulong)l_next, t0,t1 );
940 300000 : }
941 : #endif
942 :
943 : #undef FD_TPOOL_EXEC_ALL_DECL
944 :
945 : /* FD_FOR_ALL provides some macros for writing CUDA-ish parallel-for
946 : kernels executed via tpool threads. Example usage:
947 :
948 : In a header file:
949 :
950 : // my_vec_op uses tpool threads [tpool_t0,tpool_t1) to do:
951 : //
952 : // for( long i=i0; i<i1; i++ ) z[i] = my_scalar_op( x[i], y[i] );
953 : //
954 : // where x, y and z are pointers to non-overlapping my_ele_t
955 : // arrays. The caller is assumed to be thread tpool_t0, threads
956 : // (tpool_t0,tpool_t1) are assumed to be idle and
957 : // (block_i1-block_i0) in [0,LONG_MAX / FD_TILE_MAX].
958 :
959 : FD_FOR_ALL_PROTO( my_vec_op );
960 :
961 : In a source file that uses my_vec_op:
962 :
963 : FD_FOR_ALL( my_vec_op, tpool, t0,t1, i0,i1, x,y,z );
964 :
965 : In the source file that implements my_vec_op:
966 :
967 : FD_FOR_ALL_BEGIN( my_vec_op, 1024L ) {
968 :
969 : ... At this point:
970 :
971 : - longs block_i0 and block_i1 give the range of elements
972 : [block_i0,block_i1) for this thread to process. Other
973 : threads will process other approximately equally sized
974 : disjoint ranges in parallel.
975 :
976 : - long block_cnt = block_i1 - block_i0.
977 :
978 : - long block_thresh is the parameter used to optimize
979 : thread dispatch and was specified by the 1024L above.
980 : Element ranges with more than this number of elements are
981 : considered worth splitting over more than one thread if
982 : possible. An ultra high performance deterministic
983 : parallelized tree dispatch is used for good scaling,
984 : cache temporal locality and cache spatial locality.
985 : Should be at least 1 and can be a run time evaluated
986 : expression. This can also be used to make sure different
987 : kernel dispatches consistently partition elements such
988 : that there is good cache reuse between different thread
989 : parallel kernels.
990 :
991 : - tpool is a handle of this thread's tpool and ulongs
992 : tpool_t0 and tpool_t1 give the range of threads
993 : [tpool_t0,tpool_t1) available to process this block.
994 : This thread is tpool_t0 and threads (tpool_t0,tpool_t1)
995 : are idle.
996 :
997 : - Even if the range (tpool_t0,tpool_t1) not empty, it is
998 : almost certainly optimal to just have this thread process
999 : all elements in the block single threaded. That is, when
1000 : processing many elements (>>block_thresh*(t1-t0)),
1001 : (tpool_t0,tpool_t1) will be empty. When processing a
1002 : small numbers of elements, FD_FOR_ALL already concluded
1003 : there were too few elements to dispatch to
1004 : (tpool_t0,tpool_t1).
1005 :
1006 : - In this example, ulongs _a0 = (ulong)x, _a1 = (ulong)y,
1007 : _a2 = (ulong)z. ulongs _a3, _a4, _a5, and _a6 are zero.
1008 :
1009 : - _tpool, _block_i0, _block_i1, _reduce_cnt, _reduce_stack
1010 : are reserved.
1011 :
1012 : IMPORTANT SAFETY TIP! DO NOT RETURN FROM THIS BLOCK. (IF
1013 : ENDING A BLOCK EARLY, USE BREAK.)
1014 :
1015 : my_ele_t const * restrict x = (my_ele_t const *)_a0;
1016 : my_ele_t const * restrict y = (my_ele_t const *)_a1;
1017 : my_ele_t * restrict z = (my_ele_t *)_a2;
1018 :
1019 : for( long i=block_i0; i<block_i1; i++ ) z[i] = my_scalar_op( x[i], y[i] );
1020 :
1021 : } FD_FOR_ALL_END */
1022 :
1023 : #define FD_FOR_ALL_PROTO(op) \
1024 : void \
1025 : op( void * _tpool, /* Assumes valid */ \
1026 : ulong tpool_t0, ulong tpool_t1, /* Assumes t0<=t1, caller is t0, (t0,t1) idle */ \
1027 : void * _block_i0, void * _block_i1, /* Assumes block_cnt in [0,LONG_MAX/tpool_cnt] */ \
1028 : ulong _a0, ulong _a1, \
1029 : ulong _a2, ulong _a3, \
1030 : ulong _a4, ulong _a5, \
1031 : ulong _a6 )
1032 :
1033 : #define FD_FOR_ALL_BEGIN(op,BLOCK_THRESH) \
1034 : void \
1035 : op( void * _tpool, \
1036 : ulong tpool_t0, ulong tpool_t1, \
1037 : void * _block_i0, void * _block_i1, \
1038 : ulong _a0, ulong _a1, \
1039 : ulong _a2, ulong _a3, \
1040 : ulong _a4, ulong _a5, \
1041 14766701 : ulong _a6 ) { \
1042 14766701 : long block_thresh = (BLOCK_THRESH); \
1043 14766701 : fd_tpool_t * tpool = (fd_tpool_t *)_tpool; \
1044 14766701 : long block_i0 = (long)_block_i0; \
1045 14766701 : long block_i1 = (long)_block_i1; \
1046 14766701 : long block_cnt; \
1047 14766701 : ulong _reduce_cnt = 0UL; \
1048 14766701 : ushort _reduce_stack[ 16 ]; /* Assumes TILE_MAX<=65536 */ \
1049 24569053 : for(;;) { \
1050 24569053 : ulong tpool_cnt = tpool_t1 - tpool_t0; \
1051 24569053 : /**/ block_cnt = block_i1 - block_i0; \
1052 24569053 : if( FD_LIKELY( (tpool_cnt<=1UL) | (block_cnt<=block_thresh) ) ) break; \
1053 24569053 : ulong tpool_cs = fd_tpool_private_split( tpool_cnt ); \
1054 10989944 : ulong tpool_ts = tpool_t0 + tpool_cs; \
1055 10989944 : long block_is = block_i0 + (long)((tpool_cs*(ulong)block_cnt) / tpool_cnt); /* No overflow */ \
1056 10989944 : fd_tpool_exec( tpool,tpool_ts, op, tpool,tpool_ts,tpool_t1, (void *)block_is,(void *)block_i1, \
1057 10989944 : _a0,_a1,_a2,_a3,_a4,_a5,_a6 ); \
1058 10989944 : _reduce_stack[ _reduce_cnt++ ] = (ushort)tpool_ts; \
1059 10989944 : tpool_t1 = tpool_ts; \
1060 10989944 : block_i1 = block_is; \
1061 10989944 : } \
1062 14766701 : do
1063 :
1064 : #define FD_FOR_ALL_END \
1065 14766701 : while(0); \
1066 25717520 : while( _reduce_cnt ) fd_tpool_wait( tpool, (ulong)_reduce_stack[ --_reduce_cnt ] ); \
1067 8168425 : }
1068 :
1069 : #define FD_FOR_ALL_PRIVATE_F(...) too_few_arguments_passed_to_FD_FOR_ALL
1070 1897452 : #define FD_FOR_ALL_PRIVATE_0(op,tpool,t0,t1,i0,i1 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), 0UL, 0UL, 0UL, 0UL, 0UL, 0UL, 0UL )
1071 300000 : #define FD_FOR_ALL_PRIVATE_1(op,tpool,t0,t1,i0,i1,a0 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), 0UL, 0UL, 0UL, 0UL, 0UL, 0UL )
1072 300000 : #define FD_FOR_ALL_PRIVATE_2(op,tpool,t0,t1,i0,i1,a0,a1 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), (ulong)(a1), 0UL, 0UL, 0UL, 0UL, 0UL )
1073 300000 : #define FD_FOR_ALL_PRIVATE_3(op,tpool,t0,t1,i0,i1,a0,a1,a2 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), (ulong)(a1), (ulong)(a2), 0UL, 0UL, 0UL, 0UL )
1074 300000 : #define FD_FOR_ALL_PRIVATE_4(op,tpool,t0,t1,i0,i1,a0,a1,a2,a3 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), (ulong)(a1), (ulong)(a2), (ulong)(a3), 0UL, 0UL, 0UL )
1075 300000 : #define FD_FOR_ALL_PRIVATE_5(op,tpool,t0,t1,i0,i1,a0,a1,a2,a3,a4 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), (ulong)(a1), (ulong)(a2), (ulong)(a3), (ulong)(a4), 0UL, 0UL )
1076 300000 : #define FD_FOR_ALL_PRIVATE_6(op,tpool,t0,t1,i0,i1,a0,a1,a2,a3,a4,a5 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), (ulong)(a1), (ulong)(a2), (ulong)(a3), (ulong)(a4), (ulong)(a5), 0UL )
1077 300000 : #define FD_FOR_ALL_PRIVATE_7(op,tpool,t0,t1,i0,i1,a0,a1,a2,a3,a4,a5,a6) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), (ulong)(a1), (ulong)(a2), (ulong)(a3), (ulong)(a4), (ulong)(a5), (ulong)(a6) )
1078 : #define FD_FOR_ALL_PRIVATE_M(...) too_many_arguments_passed_to_FD_FOR_ALL
1079 2400060 : #define FD_FOR_ALL(...) FD_EXPAND_THEN_CONCAT2(FD_FOR_ALL_PRIVATE_,FD_VA_ARGS_SELECT(__VA_ARGS__,M,M,M,M,M,M,M,M,M,M,M,M,M,M,M,M,M,M,M,7,6,5,4,3,2,1,0,F,F,F,F,F))(__VA_ARGS__)
1080 :
1081 : /* FD_MAP_REDUCE extends to FD_FOR_ALL to reduce results back to thread
1082 : tpool_t0. IMPORTANT SAFETY TIP! Adequate scratch memory must be
1083 : configured for threads used by a MAP_REDUCE.
1084 :
1085 : Example usage:
1086 :
1087 : In a header file:
1088 :
1089 : // my_hist_op uses tpool threads [tpool_t0,tpool_t1) to compute:
1090 : //
1091 : // long h[4] __attribute__((aligned(128)));
1092 : // for( long j=0L; j<4L; j++ ) h[j] = 0L;
1093 : // for( long i=i0; i<i1; i++ ) {
1094 : // long j = my_map_op( x[i] );
1095 : // h[j]++;
1096 : // }
1097 : //
1098 : // where x is a pointer to a my_ele_t array. The caller is
1099 : // assumed to be thread tpool_t0 and threads (tpool_t0,tpool_t1)
1100 : // are assumed to be idle. Requires thread local scratch space
1101 : // per tpool thread of ~4 ceil lg (tpool_t1-tpool_t0) ulong.
1102 :
1103 : FD_MAP_REDUCE_PROTO( my_hist_op );
1104 :
1105 : In the source file that uses my_hist_op:
1106 :
1107 : long h[4] __attribute__((aligned(128)));
1108 : FD_MAP_REDUCE( my_hist_op, tpool, t0,t1, i0,i1, x, h );
1109 :
1110 : In the source file that implements my_hist_op:
1111 :
1112 : FD_MAP_REDUCE_BEGIN( my_hist_op, 1L, 128UL, 4UL*sizeof(long) ) {
1113 :
1114 : ... At this point:
1115 :
1116 : - block_i0, block_i1, block_cnt, block_thresh, tpool,
1117 : tpool_t0, tpool_t1, _a0 through _a5 have the exact same
1118 : behaviors as FD_FOR_ALL. _tpool, _block_i0, _block_i1,
1119 : _reduce_cnt, _reduce_stack are similarly reserved.
1120 :
1121 : - Unlike FD_FOR_ALL, there is no _a6. Instead ulong _r0
1122 : holds the location where this thread should store its
1123 : local reduction.
1124 :
1125 : - ulong reduce_align and reduce_footprint give the alignment
1126 : and footprint of this region. In this example, these are
1127 : given by the 128UL and 4UL*sizeof(long) above. Like
1128 : block_thresh, these can be run-time evaluated
1129 : expressions. This assumes that the region passed by the
1130 : caller for the final results has a compatible alignment
1131 : and footprint.
1132 :
1133 : IMPORTANT SAFETY TIP! DO NOT RETURN FROM THIS BLOCK. (IF
1134 : ENDING A BLOCK EARLY, USE BREAK.)
1135 :
1136 : my_ele_t const * restrict x = (my_ele_t const *)_a0;
1137 : long * restrict h = (long *)_r0;
1138 :
1139 : for( long j=0L; j<4L; j++ ) h[j] = 0UL;
1140 : for( long i=block_i0; i<block_i1; i++ ) {
1141 : long j = my_map_op( x[i] );
1142 : h[j]++;
1143 : }
1144 :
1145 : } FD_MAP_END {
1146 :
1147 : ... At this point, the environment is as described above with the
1148 : following differences:
1149 :
1150 : - There are at least two threads in [tpool_t0,tpool_t1).
1151 :
1152 : - On entry, ulongs _r0 and _r1 point the partials to reduce
1153 : (cast to ulongs). These will have an alignment and
1154 : footprint compatible with the above.
1155 :
1156 : - On exit, _r1 has been reduced into _r0. It is okay if
1157 : _r1 is clobbered in this process.
1158 :
1159 : - [block_i0,block_i1) give the range of elements covered by
1160 : the output of this reduction (though this is not typically
1161 : used).
1162 :
1163 : IMPORTANT SAFETY TIP! While this reduction is often
1164 : theoretically parallelizable and threads
1165 : [tpool_t0,tpool_t1) are available here for such,
1166 : parallelization of this can often be counterproductive
1167 : (especially if the amount to reduce is small, the reduction
1168 : operation is cheap and the arrays to reduce have poor
1169 : spatial locality for reduction here due to the mapping
1170 : phase above).
1171 :
1172 : IMPORTANT SAFETY TIP! DO NOT RETURN FROM THIS BLOCK. (IF
1173 : ENDING A BLOCK EARLY, USE BREAK.)
1174 :
1175 : long * restrict h0 = (long *)_r0;
1176 : long const * restrict h1 = (long const *)_r1;
1177 :
1178 : for( long j=0L; j<4L; j++ ) h0[j] += h1[j];
1179 :
1180 : } FD_REDUCE_END */
1181 :
1182 : #define FD_MAP_REDUCE_PROTO(op) \
1183 : void \
1184 : op( void * _tpool, /* Assumes valid */ \
1185 : ulong tpool_t0, ulong tpool_t1, /* Assumes t0<=t1, caller is t0, (t0,t1) idle */ \
1186 : void * _block_i0, void * _block_i1, /* Assumes block_cnt in [0,LONG_MAX/tpool_cnt] */ \
1187 : ulong _a0, ulong _a1, \
1188 : ulong _a2, ulong _a3, \
1189 : ulong _a4, ulong _a5, \
1190 : ulong _r0 )
1191 :
1192 : #define FD_MAP_REDUCE_BEGIN(op,BLOCK_THRESH,REDUCE_ALIGN,REDUCE_FOOTPRINT) \
1193 : void \
1194 : op( void * _tpool, \
1195 : ulong tpool_t0, ulong tpool_t1, \
1196 : void * _block_i0, void * _block_i1, \
1197 : ulong _a0, ulong _a1, \
1198 : ulong _a2, ulong _a3, \
1199 : ulong _a4, ulong _a5, \
1200 13862824 : ulong _r0 ) { \
1201 13862824 : long block_thresh = (BLOCK_THRESH); \
1202 13862824 : ulong reduce_align = (REDUCE_ALIGN); \
1203 13862824 : ulong reduce_footprint = (REDUCE_FOOTPRINT); \
1204 13862824 : fd_tpool_t * tpool = (fd_tpool_t *)_tpool; \
1205 13862824 : long block_i0 = (long)_block_i0; \
1206 13862824 : long block_i1 = (long)_block_i1; \
1207 13862824 : long block_cnt; \
1208 13862824 : ulong _reduce_cnt = 0UL; \
1209 13862824 : struct { \
1210 13862824 : uint ts; /* ts is thread that needs to complete before reduction can proceed */ \
1211 13862824 : uint t1; /* [t0,t1) is range of threads available for reduction */ \
1212 13862824 : ulong r1; /* r1 is the scratch memory used for the reduction */ \
1213 13862824 : long i1; /* [i0,i1) is range the reduction output covers */ \
1214 13862824 : } _reduce_stack[ 16 ]; /* Assumes TILE_MAX<65536 (yes strictly less) */ \
1215 13862824 : fd_scratch_push(); \
1216 22203421 : for(;;) { \
1217 22203421 : ulong tpool_cnt = tpool_t1 - tpool_t0; \
1218 22203421 : /**/ block_cnt = block_i1 - block_i0; \
1219 22203421 : if( FD_LIKELY( (tpool_cnt<=1UL) | (block_cnt<=block_thresh) ) ) break; \
1220 22203421 : ulong tpool_cs = fd_tpool_private_split( tpool_cnt ); \
1221 10031023 : ulong tpool_ts = tpool_t0 + tpool_cs; \
1222 10031023 : long block_is = block_i0 + (long)((tpool_cs*(ulong)block_cnt) / tpool_cnt); /* No overflow */ \
1223 10031023 : ulong _r1 = (ulong)fd_scratch_alloc( reduce_align, reduce_footprint ); \
1224 10031023 : fd_tpool_exec( tpool, tpool_ts, op, tpool,tpool_ts,tpool_t1, (void *)block_is,(void *)block_i1, \
1225 10031023 : _a0,_a1,_a2,_a3,_a4,_a5,_r1 ); \
1226 10031023 : _reduce_stack[ _reduce_cnt ].ts = (uint)tpool_ts; \
1227 10031023 : _reduce_stack[ _reduce_cnt ].t1 = (uint)tpool_t1; \
1228 10031023 : _reduce_stack[ _reduce_cnt ].i1 = block_i1; \
1229 10031023 : _reduce_stack[ _reduce_cnt ].r1 = _r1; \
1230 10031023 : _reduce_cnt++; \
1231 10031023 : tpool_t1 = tpool_ts; \
1232 10031023 : block_i1 = block_is; \
1233 10031023 : } \
1234 13862824 : do
1235 :
1236 : #define FD_MAP_END \
1237 13862824 : while(0); \
1238 23908715 : while( _reduce_cnt ) { \
1239 10045891 : --_reduce_cnt; \
1240 10045891 : ulong _r1 = _reduce_stack[ _reduce_cnt ].r1; \
1241 10045891 : block_i1 = _reduce_stack[ _reduce_cnt ].i1; \
1242 10045891 : tpool_t1 = (ulong)_reduce_stack[ _reduce_cnt ].t1; \
1243 10045891 : block_cnt = block_i1 - block_i0; \
1244 10045891 : fd_tpool_wait( tpool, (ulong)_reduce_stack[ _reduce_cnt ].ts ); \
1245 10045891 : (void)_r0; (void)_r1; \
1246 10045891 : do
1247 :
1248 : #define FD_REDUCE_END \
1249 10045891 : while(0); \
1250 10045891 : } \
1251 13862824 : fd_scratch_pop(); \
1252 7249547 : }
1253 :
1254 : #define FD_MAP_REDUCE_PRIVATE_F(...) too_few_arguments_passed_to_FD_MAP_REDUCE
1255 1897440 : #define FD_MAP_REDUCE_PRIVATE_0(op,tpool,t0,t1,i0,i1, r0 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), 0UL, 0UL, 0UL, 0UL, 0UL, 0UL, (ulong)(r0) )
1256 300000 : #define FD_MAP_REDUCE_PRIVATE_1(op,tpool,t0,t1,i0,i1,a0, r0 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), 0UL, 0UL, 0UL, 0UL, 0UL, (ulong)(r0) )
1257 300000 : #define FD_MAP_REDUCE_PRIVATE_2(op,tpool,t0,t1,i0,i1,a0,a1, r0 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), (ulong)(a1), 0UL, 0UL, 0UL, 0UL, (ulong)(r0) )
1258 300000 : #define FD_MAP_REDUCE_PRIVATE_3(op,tpool,t0,t1,i0,i1,a0,a1,a2, r0 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), (ulong)(a1), (ulong)(a2), 0UL, 0UL, 0UL, (ulong)(r0) )
1259 300000 : #define FD_MAP_REDUCE_PRIVATE_4(op,tpool,t0,t1,i0,i1,a0,a1,a2,a3, r0 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), (ulong)(a1), (ulong)(a2), (ulong)(a3), 0UL, 0UL, (ulong)(r0) )
1260 300000 : #define FD_MAP_REDUCE_PRIVATE_5(op,tpool,t0,t1,i0,i1,a0,a1,a2,a3,a4, r0 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), (ulong)(a1), (ulong)(a2), (ulong)(a3), (ulong)(a4), 0UL, (ulong)(r0) )
1261 300000 : #define FD_MAP_REDUCE_PRIVATE_6(op,tpool,t0,t1,i0,i1,a0,a1,a2,a3,a4,a5,r0 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), (ulong)(a1), (ulong)(a2), (ulong)(a3), (ulong)(a4), (ulong)(a5), (ulong)(r0) )
1262 : #define FD_MAP_REDUCE_PRIVATE_M(...) too_many_arguments_passed_to_FD_MAP_REDUCE
1263 2100048 : #define FD_MAP_REDUCE(...) FD_EXPAND_THEN_CONCAT2(FD_MAP_REDUCE_PRIVATE_,FD_VA_ARGS_SELECT(__VA_ARGS__,M,M,M,M,M,M,M,M,M,M,M,M,M,M,M,M,M,M,M,6,5,4,3,2,1,0,F,F,F,F,F,F))(__VA_ARGS__)
1264 :
1265 : /* fd_tpool_worker_state_cstr converts an FD_TPOOL_WORKER_STATE_* code
1266 : into a human readable cstr. The lifetime of the returned pointer is
1267 : infinite. The returned pointer is always to a non-NULL cstr. */
1268 :
1269 : FD_FN_CONST char const *
1270 : fd_tpool_worker_state_cstr( int state );
1271 :
1272 : FD_PROTOTYPES_END
1273 :
1274 : #endif /* HEADER_fd_src_util_tpool_fd_tpool_h */
|