Line data Source code
1 : #ifndef HEADER_fd_src_util_tpool_fd_tpool_h
2 : #define HEADER_fd_src_util_tpool_fd_tpool_h
3 :
4 : /* tpool provides APIs to group sets of tiles together for ultra low
5 : overhead and high scalability launching of thread parallel jobs.
6 : There is a lot of nuance that most thread pool APIs and
7 : implementations get wrong. And this nuance is very useful to
8 : understand why the APIs and implementations are the way they are.
9 : So, a crash course:
10 :
11 : High performance thread parallelism crash course ********************
12 :
13 : Consider the simple and highly idealized case of a job with N
14 : independent tasks that each take an approximately uniform time
15 : tau_task to execute.
16 :
17 : A serial implementation of this job might look like:
18 :
19 : for( ulong n=0; n<N; n++ ) ... do task n ...;
20 :
21 : And the overall time to do such a job is:
22 :
23 : T_serial ~ tau_overhead + tau_task N
24 :
25 : where tau_overhead represents O(1) costs like the overhead to setup
26 : the loop that will do the N tasks.
27 :
28 : In a parallel implementation, we want to uniformly divide these tasks
29 : over P threads to speed this up. If we use the naive strategy that
30 : many codes and libraries use, we have something like (either
31 : explicitly or under the hood and ignoring error trapping):
32 :
33 : static void *
34 : task_block( void * _args ) {
35 : task_args_t * args = (task_args_t *)_args;
36 : ... unpack task args here, including
37 : ulong n0 = args->n0;
38 : ulong n1 = args->n1;
39 :
40 : for( ulong n=n0; n<n1; n++ ) ... do task n ...;
41 :
42 : return NULL;
43 : }
44 :
45 : ... in the main thread ...
46 :
47 : task_args_t * args = (task_args_t *)malloc( P*sizeof(task_args_t) );
48 : pthread_t * thread = (pthread_t *) malloc( P*sizeof(pthread_t) );
49 : for( ulong p=0; p<P; p++ ) args[p] = ... block p info ...;
50 : for( ulong p=1; p<P; p++ ) pthread_create( &thread[p], NULL, task_block, &args[p] );
51 : task_block( &args[0] );
52 : for( ulong p=1; p<P; p++ ) pthread_join( thread[p], NULL );
53 : free( thread );
54 : free( args );
55 :
56 : (Above, we used the main thread as one of the P threads in job
57 : execution such that it both dispatches and does a share of the work.
58 : This discussion still applies if we just have main do dispatch only.)
59 :
60 : Ugh ... this does parallelize the task over P threads but it is
61 : practically worthless. The overall time to execute is:
62 :
63 : T_parallel_dumb ~ tau_overhead_dumb
64 : + tau_partition_dumb P
65 : + tau_dispatch_dumb (P-1)
66 : + tau_task N / P
67 :
68 : tau_overhead_dumb represents that the overhead from before, but
69 : with more dumb. That is, it is likely a lot larger than the serial
70 : version because of the allocations and need to wrap up the tasks in a
71 : thread dispatch function signature compatible way (pthread_create
72 : "start_routine" in this case). (There are usually additional
73 : overheads omitted from this simplified discussion.)
74 :
75 : tau_partition_dumb is the time spent by the main thread partitioning
76 : the work for the worker threads per worker thread. This is dumb
77 : because this work could be parallelized (i.e. done within the task
78 : itself). A second, less obvious, dumbness is that, in complex work
79 : distribution scenarios, it is often very difficult for the main
80 : thread to determine the optimal partition a worker thread should do
81 : (see sgemm dispatch example below of something very hard to do
82 : centrally but very easy to do distributed). This ends up a triple
83 : whammy of awful: the parallelization strategy is inefficient,
84 : computed inefficiently and computed by one thread. Amdahl
85 : bottlenecks for everyone!
86 :
87 : tau_dispatch_dumb is the time needed to start and stop a worker
88 : thread to run a task. This is dumb because things like
89 : pthread_create have a huge overhead and because the total overhead is
90 : linear in P. (In practice, this is even dumber than it appears
91 : because we didn't do anything fancy with the created threads to
92 : mitigate thread-core affinity effects, IPC effects, kernel scheduling
93 : effects, NUMA effects, TLB effects, ... neglected in this idealized
94 : example.)
95 :
96 : Thus, we better have an infinite N if we are going to be able
97 : usefully parallelize over many threads under the dumb parallelization
98 : strategies.
99 :
100 : But, in the real world, N is not infinite and often our goal is to
101 : reduce the time needed to do a finite amount of work. Throwing all
102 : the threads in the world at the job will not help because, while
103 : increasing the number of threads decreases the amount of time
104 : required to do the tasks hyperbolically, it _increases_ _linearly_
105 : the amount of time needed to setup the threads to do those tasks.
106 :
107 : This leads to the question: what is maximum number of threads we can
108 : use profitably?
109 :
110 : This will be the P that minimizes T_parallel_dumb. Call this value
111 : P_max_useful_dumb. Given this point is a minima of T_parallel_dumb,
112 : for P<P_max_useful_dumb / P>P_max_useful_dumb, using _more_ / _less_
113 : threads for fixed sized job is useful.
114 :
115 : From basic calculus, P_max_useful_dumb is where the derivative of
116 : T_parallel_dumb with respect to P is zero. With:
117 :
118 : dT_parallel_dumb / dP ~ tau_partition_dumb
119 : + tau_dispatch_dumb
120 : - tau_task N / P^2
121 :
122 : we can solve for dT_parallel_dumb / dP = 0. This yields:
123 :
124 : P_max_useful_dumb ~ sqrt( tau_task N / ( tau_partition_dumb + tau_dispatch_dumb ) )
125 :
126 : This does not even weak scale! Doubling N does not double the
127 : P_max_useful_dumb; it only allows increases by ~sqrt(2). Sad.
128 :
129 : And it is worse than that because tau_dispatch_dumb is typically
130 : massive and tau_task is often infinitesimal.
131 :
132 : For example, in parallelizing level 1 matrix operations (e.g. a
133 : vector add), N is often related to the number of scalar flops needed
134 : and thus tau_task is small O(1) times the marginal cost of doing a
135 : single flop, assuming the problem itself fits within cache. That is,
136 : tau_task is measured in picoseconds (e.g. wide vector SIMD double
137 : pumped FMA units on a ~3 GHz core).
138 :
139 : But tau_dispatch_dumb is measured in milliseconds (e.g.
140 : pthread_create + pthread_join with all their glorious O/S and kernel
141 : scheduler overheads).
142 :
143 : This means we need huge N before it being even worthwhile to consider
144 : parallelization under the dumb strategy. And once N is large enough,
145 : the problem transitions from being cacheable to uncachable. And in
146 : that regime, memory bandwidth limits not considered here dominate
147 : (all those cores starved of useful work because of badly designed
148 : software and limited memory bandwidth). Embarrassing.
149 :
150 : All too common a story: "We parallelized [important thing] with
151 : [rando popular but naive thread library du jour]. It got slower.
152 : Guess it was already as efficient as it could be."
153 :
154 : Sigh ... fortunately, the math also gives the prescription of what we
155 : need to do (and maybe gives a hint, contrary to belief floating
156 : around in blogs and what not, math is in fact critically important
157 : for coding).
158 :
159 : First, we need to parallelize the partition calculation by having
160 : each worker thread concurrently compute their slice of the job. We
161 : can do that by passing a reference to a description of whole job
162 : along with just enough context to each thread that they can determine
163 : their slice (e.g. a thread index and number of threads). This
164 : eliminates tau_partition_dumb from the denominator above but it
165 : requires a better thread dispatcher function signature than used
166 : above.
167 :
168 : Second, we create all the threads we are going to use up front, pin
169 : them to dedicated isolated cores at high kernel schedule priority
170 : with optimized NUMA and TLB aware stacks and scratch spaces to
171 : massively reduce tau_dispatch_dumb overheads (and also fix some more
172 : subtle performance drains in practical usage at the same time).
173 : These threads will spin on memory in optimized ways to make the time
174 : to wake up a thread on the order of the time it takes to transfer a
175 : cache line to a core (tens to hundreds of ns).
176 :
177 : Together, these turn the above into something like:
178 :
179 : static void
180 : task_block( void * _args,
181 : ulong p,
182 : ulong P ) {
183 : job_args_t * args = (job_args_t *)_args;
184 : ... unpack job args here, including
185 : ulong N = args->N;
186 :
187 : // Compute the block of tasks to be done by this thread
188 : ulong n0;
189 : ulong n1;
190 : FD_TPOOL_PARTITION( 0,N,1, p,P, n0,n1 );
191 :
192 : for( ulong n=n0; n<n1; n++ ) ... do task n ...;
193 : }
194 :
195 : ... in main thread ...
196 :
197 : job_args_t args[1] = { ... job info ... };
198 : for( ulong p=1; p<P; p++ ) tpool_exec( tpool,p, task_block,args, p,P );
199 : task_block( args, 0,P );
200 : for( ulong p=1; p<P; p++ ) tpool_wait( tpool,p );
201 :
202 : This is dramatically better in implementation as now we have:
203 :
204 : P_max_useful_okay ~ sqrt( tau_task N / tau_dispatch_smart )
205 :
206 : where tau_dispatch_smart << tau_dispatch_dumb. Additionally, the
207 : various overheads in the main thread are much lower.
208 :
209 : While this is able to use dramatically more threads profitably for a
210 : fixed job size, we still aren't able to weak scale. To do that, we
211 : need to parallelize the thread dispatch too.
212 :
213 : Third, we parallelize thread dispatch. The obvious but wrong
214 : solution is to use a single common memory location that all worker
215 : threads in the pool will wait on to start execution and a single
216 : common memory location that all workers will atomically increment at
217 : completion so the main thread only has to spin on one memory location
218 : to tell if the job is complete.
219 :
220 : This is wrong practically because, while it superficially looks like
221 : O(1) operations for the dispatch and wait, this is a cache coherence
222 : protocol nightmare beyond compare. There will be large constant O(N)
223 : messaging on the CPUs internal network-on-chip to distribute the
224 : information to/from all the worker threads. Such will often not be
225 : parallelized under the hood particularly well or particularly
226 : deterministic (especially the waiting part ... the main thread still
227 : needs to get N updates from the workers that will be serialized with
228 : lots of cache line ping-pong-ing on top of atomic operations under
229 : the hood as they funnel back toward the main thread).
230 :
231 : Instead, we can recursively divide the job to unambiguously
232 : parallelize task dispatch. This yields something like:
233 :
234 : static void
235 : task_node( tpool_t * tpool, // thread pool to use
236 : ulong p0, // Assumes p0>=0, caller is p0
237 : ulong p1, // Assumes p1>p0
238 : void * args,
239 : ulong P ) { // Assumes P>=p1
240 :
241 : // At this point we are worker thread p0 and we are responsible
242 : // for dispatching work to workers [p0,p1)
243 :
244 : ulong p_cnt = p1-p0;
245 : if( p_cnt>1UL ) {
246 :
247 : // At this point, we have more than one thread available.
248 : // Split the workers approximately in half and have worker
249 : // thread ps dispatch work to threads [ps,p1) while we
250 : // concurrently dispatch work to threads [p0,ps).
251 :
252 : ulong ps = p0 + (p_cnt>>1);
253 : tpool_exec( tpool,ps, task_node, tpool,ps,p1, args, P );
254 : task_node( tpool,p0,ps, args, P );
255 : tpool_wait( tpool,ps );
256 : return;
257 : }
258 :
259 : // At this point, there is only one thread available
260 : // Do our slice of the job.
261 :
262 : job_args_t * args = (job_args_t *)_args;
263 : ... unpack job args here, including
264 : ulong N = args->N;
265 :
266 : // Compute block of tasks to be done by this thread
267 : ulong n0;
268 : ulong n1;
269 : FD_TPOOL_PARTITION( 0,N,1, p,P, n0,n1 );
270 :
271 : for( ulong n=n0; n<n1; n++ ) ... do task n ...;
272 : }
273 :
274 : ... in the main thread ...
275 :
276 : job_args_t args[1] = { ... job info ... };
277 : task_node( tpool,0,P, args, P );
278 :
279 : This has an algorithmically different and cheaper cost model for
280 : thread parallelization:
281 :
282 : T_parallel_smart ~ tau_overhead_smart
283 : + tau_dispatch_smart ceil log2(P)
284 : + tau_task N / P
285 :
286 : With the magical power of calculus again, noting that
287 : log2(P)=ln(P)/ln(2) and ceil log2(P) ~ log2(P) asymptotically, we
288 : have:
289 :
290 : dT_parallel_smart / dP ~ tau_dispatch_smart / (P ln(2)) - tau_task N/P^2
291 :
292 : such that:
293 :
294 : P_max_useful_smart ~ tau_task ln(2) N / tau_dispatch_smart
295 :
296 : Now we can weak scale! If we double the amount of work, we can
297 : double the maximum useful threads we can apply.
298 :
299 : Since tau_overhead_smart ~ tau_overhead_serial << tau_overhead_dumb,
300 : tau_dispatch_smart << tau_dispatch_dumb and N isn't stuck inside a
301 : sqrt anymore, we also can profitably parallelize _orders_ _of_
302 : _magnitude_ smaller problems than before.
303 :
304 : As a further bonus, the above dispatch pattern naturally supports
305 : much more complex parallelizations (see the sgemm example below) and
306 : naturally has good thread-NUMA topology oblivious and cache oblivious
307 : algorithmic properties (ideally the tpool threads themselves have
308 : good spatial locality for best results).
309 :
310 : Last, we can wrap all this up so that, in simple cases, all the user
311 : needs to do is:
312 :
313 : static void
314 : task_block( void * _args,
315 : ulong p,
316 : ulong P ) {
317 : job_args_t * args = (job_args_t *)_args;
318 : ... unpack job args here, including
319 : ulong N = args->N;
320 :
321 : // Compute block of tasks to be done by this thread
322 : ulong n0;
323 : ulong n1;
324 : FD_TPOOL_PARTITION( 0,N,1, p,P, n0,n1 );
325 :
326 : for( ulong n=n0; n<n1; n++ ) ... do task n ...;
327 : }
328 :
329 : ... in the main thread ...
330 :
331 : job_args_t args[1] = { ... args ... };
332 : tpool_exec_all( tpool,0,P, task_block,args );
333 :
334 : and, in the main thread, it will act like:
335 :
336 : job_args_t args[1] = { ... args ... };
337 : for( ulong p=0; p<P; p++ ) task_block( args, p,P );
338 :
339 : but have a performance characteristic like T_parallel_smart.
340 :
341 : The actual function signature used for a fd_tpool_task_t below was
342 : picked to be sufficient to tightly and uniformly partition an
343 : arbitrarily shaped layer 3 blas sgemm A^T B matrix calculation over
344 : lots of cores in an arbitrarily shaped NUMA topologies without a lot
345 : of overhead in the dispatch logic. For example (and this also shows
346 : a powerful use case for scratch memory too):
347 :
348 : // This is called by worker thread t0 and uses tpool worker threads
349 : // [t0,t1) to compute:
350 : //
351 : // C([0,l1-l0),[0,m1-m0)) = A([m0,m1),[l0,l1))' B([m0,m1),[n0,n1))
352 :
353 : void
354 : atb_node( fd_tpool_t * tpool, // tpool to use
355 : ulong t0, ulong t1, // Assumes t1>t0
356 : atb_args_t * args, // Location of A and B matrices and their column strides
357 : float * C, // (l1-l0)*(n1-n0) col-major matrix with col stride sC
358 : ulong sC, // Assumes sC>=(l1-l0), C(i,j) at C[ i + sC*j ]
359 : ulong l0, ulong l1, // Assumes l1>=l0
360 : ulong m0, ulong m1, // Assumes m1>=m0
361 : ulong n0, ulong n1 ) { // Assumes n1>=n0
362 :
363 : // At this point, we are worker thread t0 and we are responsible
364 : // for dispatching work to worker threads [t0,t1)
365 :
366 : ulong t_cnt = t1-t0;
367 : if( t_cnt>1UL ) { // ... and optionally the task big enough to be worth parallelizing
368 :
369 : // We need to split this task over more than one thread.
370 : //
371 : // Split the worker threads into two approximately equal sized
372 : // sets (left and right) and then proportionally split the
373 : // longest matrix range to try to make the subtasks as square
374 : // as possible (and thus improve compute data ratio /
375 : // cacheability of subtasks). Note that we do not just split
376 : // matrix ranges in half because such a split would result in
377 : // an increasingly bad load imbalance between the left and
378 : // right workers when the matrix range to split is large but
379 : // t_cnt is small and odd.
380 : //
381 : // We then execute in parallel the left range on worker threads
382 : // [t0,th) and the right range on worker threads [th,t1). Yes,
383 : // this uses recursion in the thread dispatch to get radically
384 : // better scalability, lower overhead and ability to map
385 : // arbitrary shaped problems onto arbitrarily shaped massively
386 : // parallel hardware with minimal load imbalance (see notes
387 : // above for more details about this).
388 :
389 : ulong t_cnt_left = t_cnt >> 1;
390 : ulong ts = t0 + t_cnt_left;
391 :
392 : ulong l_cnt = l1-l0;
393 : ulong m_cnt = m1-m0;
394 : ulong n_cnt = n1-n0;
395 :
396 : if( FD_UNLIKELY( (m_cnt>l_cnt) & (m_cnt>n_cnt) ) ) { // Split m range
397 :
398 : // Splitting along m is more onerous than splitting along l
399 : // or n because the left and right subtask outputs would end
400 : // up clobbering each other or, if being atomically too
401 : // clever by half, having a non-deterministic result due to an
402 : // indeterminant summation order and floating point
403 : // non-associativity. So we allocate a temporary matrix near
404 : // these cores to hold the right half partial reduction such
405 : // that the left and right can do their partial reductions
406 : // independently (and deterministically) and then do a
407 : // (deterministic) thread parallel reduction of the two
408 : // results on threads [t0,t1) afterward (a level of fine
409 : // grained parallelization few even think is possible on
410 : // commodity processors).
411 :
412 : ulong m_cnt_left = (m_cnt*t_cnt_left)/t_cnt; // handle overflow here
413 : ulong ms = m0 + m_cnt_left;
414 :
415 : fd_scratch_push();
416 :
417 : float * CR = (float *)fd_scratch_alloc( 0UL, l_cnt*n_cnt*sizeof(float) );
418 :
419 : fd_tpool_exec( tpool,ts, atb_node,tpool,ts,t1, args, CR,l_cnt, l0,l1, ms,m1, n0,n1 );
420 : atb_node( tpool,t0,ts, args, C,sC, l0,l1, m0,ms, n0,n1 );
421 : fd_tpool_wait( tpool,ts );
422 :
423 : // Do C([0,l_cnt),[0,n_cnt)) += CR([0,l_cnt),[0,n_cnt)) on threads [t0,t1) here
424 :
425 : fd_scratch_pop();
426 :
427 : } else if( FD_UNLIKELY( n_cnt>l_cnt ) ) { // Split n range
428 :
429 : ulong n_cnt_left = (n_cnt*t_cnt_left)/t_cnt; // handle overflow here
430 : ulong ns = n0 + n_cnt_left;
431 :
432 : float * CR = C + sC*n_cnt_left;
433 :
434 : fd_tpool_exec( tpool,ts, atb_node,tpool,ts,t1, args, CR,sC, l0,l1, m0,m1, ns,n1 );
435 : atb_node( tpool,t0,ts, args, C,sC, l0,l1, m0,m1, n0,ns );
436 : fd_tpool_wait( tpool,ts );
437 :
438 : } else { // Split l range
439 :
440 : ulong l_cnt_left = (l_cnt*t_cnt_left)/t_cnt; // handle overflow here
441 : ulong ls = l0 + l_cnt_left;
442 :
443 : float * CR = C + l_cnt_left;
444 :
445 : fd_tpool_exec( tpool,ts, atb_node,tpool,ts,t1, args, CR,sC, ls,l1, m0,m1, n0,n1 );
446 : atb_node( tpool,t0,ts, args, C,sC, l0,ls, m0,m1, n0,n1 );
447 : fd_tpool_wait( tpool,ts );
448 :
449 : }
450 :
451 : return;
452 : }
453 :
454 : // At this point, we are at a leaf node
455 : // Do C([0,l1-l0),[0,n1-n0)) = A([m0,m1),[l0,l1))' B([m0,m1),[n0,n1)) here
456 : }
457 :
458 : End of high performance thread parallelism crash course ************/
459 :
460 : #include "../tile/fd_tile.h"
461 : #include "../scratch/fd_scratch.h"
462 :
463 : /* FD_TPOOL_WORKER_STATE_* are possible states a tpool worker thread
464 : is in. Users practically should never see BOOT or HALT. */
465 :
466 5 : #define FD_TPOOL_WORKER_STATE_BOOT (0) /* Tile is booting */
467 29555311 : #define FD_TPOOL_WORKER_STATE_IDLE (1) /* Tile is idle */
468 30816864 : #define FD_TPOOL_WORKER_STATE_EXEC (2) /* Tile is executing a task */
469 6 : #define FD_TPOOL_WORKER_STATE_HALT (3) /* Tile is halting */
470 :
471 : /* FD_TPOOL_PARTITION partitions tasks indexed [task0,task1) over
472 : worker_cnt worker threads. On return, tasks indexed
473 : [worker_task0,worker_task1) is the range of tasks to be done by
474 : worker worker_idx.
475 :
476 : The number of tasks to a worker will be as uniform as possible, with
477 : the constraints that the tasks will be assigned to workers in
478 : monotonically increasing order and, for worker_idx<worker_cnt-1, the
479 : number of tasks assigned will be a multiple of lane_cnt in size.
480 : (That is, any final incomplete SIMD block will be assigned to worker
481 : worker_cnt-1.)
482 :
483 : Assumes task1>=task0, lane_cnt>0, worker_cnt>0 and
484 : worker_idx<worker_cnt. Performance will be fastest if lane_cnt
485 : and/or worker_cnt are an integer power-of-two (especially 1). This
486 : macro is robust. */
487 :
488 1544694166 : #define FD_TPOOL_PARTITION( task0, task1, lane_cnt, worker_idx, worker_cnt, worker_task0, worker_task1 ) do { \
489 1544694166 : ulong _ftp_task0 = (task0); \
490 1544694166 : ulong _ftp_task1 = (task1); \
491 1544694166 : ulong _ftp_lane_cnt = (lane_cnt); \
492 1544694166 : ulong _ftp_worker_idx = (worker_idx); \
493 1544694166 : ulong _ftp_worker_cnt = (worker_cnt); \
494 1544694166 : ulong _ftp_task_cnt = _ftp_task1 - _ftp_task0; \
495 1544694166 : ulong _ftp_block_cnt = _ftp_task_cnt / _ftp_lane_cnt; /* Num complete simd blocks, typically nop or fast shr */ \
496 1544694166 : ulong _ftp_block_rem = _ftp_task_cnt % _ftp_lane_cnt; /* Number of leftovers, typically nop or fast mask */ \
497 1544694166 : ulong _ftp_worker_block_min = _ftp_block_cnt / _ftp_worker_cnt; /* Min complete simd blocks for a worker */ \
498 1544694166 : ulong _ftp_worker_extra_cnt = _ftp_block_cnt % _ftp_worker_cnt; /* Num workers needing an extra complete simd block */ \
499 1544694166 : ulong _ftp_worker_task0 = _ftp_task0 \
500 1544694166 : + _ftp_lane_cnt*(_ftp_worker_block_min*_ftp_worker_idx + fd_ulong_min(_ftp_worker_idx,_ftp_worker_extra_cnt)); \
501 1544694166 : ulong _ftp_worker_task1 = _ftp_worker_task0 \
502 1544694166 : + _ftp_lane_cnt*(_ftp_worker_block_min + ((ulong)(_ftp_worker_idx<_ftp_worker_extra_cnt))) \
503 1544694166 : + fd_ulong_if( _ftp_worker_idx==(_ftp_worker_cnt-1UL), _ftp_block_rem, 0UL ); \
504 1544694166 : (worker_task0) = _ftp_worker_task0; \
505 1544694166 : (worker_task1) = _ftp_worker_task1; \
506 1544694166 : } while(0)
507 :
508 : /* A fd_tpool_task_t is the function signature used for the entry
509 : point of a task. Users are free to repurpose these arguments however
510 : they see fit for an individual executions but the number of
511 : arguments, types of argument and names used below reflect the intent
512 : described in the above crash course. Namely, this is adequate to map
513 : arbitrary level 3 BLAS dense matrix-matrix multiply with arbitrary
514 : shaped matrices to an arbitrary topology set of tiles ultra quickly
515 : and ultra tightly. Bulk dispatchers like tpool_exec_all might apply
516 : some additional conventions to some of these arguments. */
517 :
518 : typedef void
519 : (*fd_tpool_task_t)( void * tpool,
520 : ulong t0, ulong t1,
521 : void * args,
522 : void * reduce, ulong stride,
523 : ulong l0, ulong l1,
524 : ulong m0, ulong m1,
525 : ulong n0, ulong n1 );
526 :
527 : /* A fd_tpool_t is an opaque handle of a thread pool */
528 :
529 : struct fd_tpool_private;
530 : typedef struct fd_tpool_private fd_tpool_t;
531 :
532 : /* Private APIs *******************************************************/
533 :
534 : /* These are exposed here to facilitate inlining various operations in
535 : high performance contexts. */
536 :
537 : struct __attribute__((aligned(128))) fd_tpool_private_worker {
538 : fd_tpool_task_t task;
539 : void * task_tpool;
540 : ulong task_t0; ulong task_t1;
541 : void * task_args;
542 : void * task_reduce; ulong task_stride;
543 : ulong task_l0; ulong task_l1;
544 : ulong task_m0; ulong task_m1;
545 : ulong task_n0; ulong task_n1;
546 : int state;
547 : uint tile_idx;
548 : void * scratch;
549 : ulong scratch_sz;
550 : };
551 :
552 : typedef struct fd_tpool_private_worker fd_tpool_private_worker_t;
553 :
554 : struct fd_tpool_private {
555 :
556 : /* This point is 128 aligned and preceded by the worker0 sentinel */
557 :
558 : ulong worker_max; /* Positive */
559 : ulong worker_cnt; /* in [1,worker_max] */
560 :
561 : /* worker_max element fd_tpool_private_worker_t * array here, indexed
562 : [0,worker_cnt). worker[0] points to worker0 above. Note that we
563 : cannot use a flex array here because strict C++17 doesn't support
564 : it and we use C++ in fd_tpool.cxx to handle people using C++
565 : libraries that throw exceptions that are uncaught ... sigh. */
566 :
567 : };
568 :
569 : FD_PROTOTYPES_BEGIN
570 :
571 : FD_STATIC_ASSERT( FD_TILE_MAX<65536UL, update_implementation );
572 :
573 : /* fd_tpool_private_worker0 returns a pointer in the local address space
574 : to the worker0 sentinel */
575 :
576 : FD_FN_CONST static inline fd_tpool_private_worker_t *
577 3087 : fd_tpool_private_worker0( fd_tpool_t const * tpool ) {
578 3087 : return ((fd_tpool_private_worker_t *)tpool)-1;
579 3087 : }
580 :
581 : /* fd_tpool_private_worker returns a pointer in the local address space
582 : of the first element of the tpool's worker array. */
583 :
584 : FD_FN_CONST static inline fd_tpool_private_worker_t **
585 56929319 : fd_tpool_private_worker( fd_tpool_t const * tpool ) {
586 56929319 : return (fd_tpool_private_worker_t **)(tpool+1);
587 56929319 : }
588 :
589 : FD_FN_CONST static inline ulong /* Returns number of elements to left side */
590 31081317 : fd_tpool_private_split( ulong n ) { /* Assumes n>1 */
591 : # if 0 /* Simple splitting */
592 : return n>>1;
593 : # else /* NUMA aware splitting */
594 : /* This split has the property that the left side >= the right and one
595 : of the splits is the largest power of two smaller than n. It
596 : results in building a balanced tree (the same as the simple split)
597 : but with all the leaf nodes concentrated to toward the left when n
598 : isn't a power of two. This can yield a slight reduction of the
599 : number of messages that might have to cross a NUMA boundary in many
600 : common usage scenarios. */
601 31081317 : ulong tmp = 1UL << (fd_ulong_find_msb( n )-1);
602 31081317 : return fd_ulong_max( tmp, n-tmp );
603 31081317 : # endif
604 31081317 : }
605 :
606 : FD_PROTOTYPES_END
607 :
608 : /* End of private APIs ************************************************/
609 :
610 : /* FD_TPOOL_{ALIGN,FOOTPRINT} return the alignment and footprint
611 : required for a memory region to be used as a tpool. ALIGN will a
612 : integer power of two of at most 4096 and FOOTPRINT will be a multiple
613 : of ALIGN. worker_max is assumed to be valid (e.g. in
614 : [1,FD_TILE_MAX]. These are provided to facilitate compile time
615 : construction and for consistency with other constructors. (FIXME:
616 : consider using FD_LAYOUT here.) */
617 :
618 474183 : #define FD_TPOOL_ALIGN (128UL)
619 : #define FD_TPOOL_FOOTPRINT( worker_max ) ( ( sizeof(fd_tpool_private_worker_t) /* worker0 sentinel */ \
620 : + sizeof(fd_tpool_t) + /* tpool header */ \
621 : + ((ulong)(worker_max))*sizeof(fd_tpool_private_worker_t *) /* worker array */ \
622 : + FD_TPOOL_ALIGN-1UL ) & (~(FD_TPOOL_ALIGN-1UL)) )
623 :
624 : /* FD_TPOOL_WORKER_SCRATCH_DEPTH is the maximum number of scratch
625 : frames a worker thread scratch region can have. */
626 :
627 0 : #define FD_TPOOL_WORKER_SCRATCH_DEPTH (128UL)
628 :
629 : FD_PROTOTYPES_BEGIN
630 :
631 : /* fd_tpool_align returns FD_TPOOL_ALIGN. fd_tpool_footprint returns
632 : FD_TPOOL_FOOTPRINT( worker_max ) if worker_max is in [1,FD_TILE_MAX]
633 : or 0 otherwise. */
634 :
635 : FD_FN_CONST ulong fd_tpool_align( void );
636 : FD_FN_CONST ulong fd_tpool_footprint( ulong worker_max );
637 :
638 : /* fd_tpool_init formats a memory region mem with the appropriate
639 : alignment and footprint as a thread pool that can support up to
640 : worker_max worker threads. worker max must be in [1,FD_TILE_MAX].
641 : Returns a handle for the tpool (this is not a simple cast of mem) on
642 : success and NULL on failure (logs details). On a success return,
643 : worker 0 will already exist. Many threads can temporarily assume the
644 : identity of worker 0 as the situation merits. Worker 0 is typically
645 : the thread that starts dispatches to other worker threads in an
646 : operation and can also flexibly participates in the tpool in bulk
647 : operations. This uses init/fini semantics instead of
648 : new/join/leave/delete semantics because thread pools aren't
649 : meaningfully sharable between processes / thread groups. This
650 : function acts as a compiler memory fence. */
651 :
652 : fd_tpool_t *
653 : fd_tpool_init( void * mem,
654 : ulong worker_max );
655 :
656 : /* fd_tpool_fini pops all worker threads pushed into the tpool and
657 : unformats the underlying memory region. This should be called at
658 : most once by a thread that was not pushed into the tpool (e.g. a
659 : "worker 0" thread) and no other operations on the tpool should be in
660 : progress when this is called or done after this is called. Returns
661 : the memory region used by the tpool on success (this is not a simple
662 : cast of mem) and NULL on failure (logs details). This function acts
663 : as a compiler memory fence. */
664 :
665 : void *
666 : fd_tpool_fini( fd_tpool_t * tpool );
667 :
668 : /* fd_tpool_worker_push pushes tile tile_idx into the tpool. tile_idx
669 : 0, the calling tile, and tiles that have already been pushed into the
670 : tpool cannot be pushed. Further, tile_idx should be idle and no
671 : other tile operations should be done it while it is a part of the
672 : tpool.
673 :
674 : If scratch_sz is non-zero, this will assume that tile tile_idx
675 : currently has no scratch memory attached to it and configure tile
676 : tile_idx to use the memory with appropriate alignment whose first byte
677 : in the local address space is scratch and that has scratch_sz bytes
678 : total for its scratch memory. Otherwise, it will use whatever
679 : scratch memory has already been configured for tile_idx (or leave it
680 : unattached to a scratch memory). Scratch memory used by a tile
681 : should not be used for any other purposes while the tile is part of
682 : the tpool.
683 :
684 : IMPORTANT SAFETY TIP! Since worker 0 identity is flexible, it is the
685 : caller's responsibility to attach/detach the scratch memory as
686 : appropriate for a "worker 0 thread" that is not pushed into the
687 : tpool.
688 :
689 : Returns tpool on success (tile tile_idx is part of the tpool and will
690 : be considered as executing from tile's point of view while a member
691 : of the tpool ... no tile operations can or should be done on it while
692 : it is part of the tpool) and NULL on failure (logs details). Reasons
693 : for failure include NULL tpool, bad tile_idx, tile was not idle, bad
694 : scratch region specified, etc).
695 :
696 : No other operations on tpool should be in process when this is called
697 : or started while this is running. This function acts as a compiler
698 : memory fence. */
699 :
700 : fd_tpool_t *
701 : fd_tpool_worker_push( fd_tpool_t * tpool,
702 : ulong tile_idx,
703 : void * scratch,
704 : ulong scratch_sz );
705 :
706 : /* fd_tpool_worker_pop pops the most recently pushed tpool worker
707 : thread. If the tile is attached to some scratch memory as part of
708 : its push, it will be detached from it here.
709 :
710 : Returns tpool on success (the tile is no longer part of the tpool and
711 : considered idle from tile's POV and can be used for other purposes)
712 : and NULL on failure (logs details). Reasons for failure include NULL
713 : tpool, bad tile_idx, tile was not idle, bad scratch region, etc).
714 :
715 : No other operations on the tpool should be in process when this is
716 : called or started while this is running. This function acts as a
717 : compiler memory fence. */
718 :
719 : fd_tpool_t *
720 : fd_tpool_worker_pop( fd_tpool_t * tpool );
721 :
722 : /* Accessors. As these are used in high performance contexts, these do
723 : no input argument checking. Specifically, they assume tpool is valid
724 : and (if applicable) worker_idx in [0,worker_cnt). worker 0 is
725 : special. The tile_idx/scratch/scratch_sz for worker 0 are always
726 : returned as 0/NULL/0 here. */
727 :
728 6264 : FD_FN_PURE static inline ulong fd_tpool_worker_cnt( fd_tpool_t const * tpool ) { return tpool->worker_cnt; }
729 3114 : FD_FN_PURE static inline ulong fd_tpool_worker_max( fd_tpool_t const * tpool ) { return tpool->worker_max; }
730 :
731 : FD_FN_PURE static inline ulong
732 : fd_tpool_worker_tile_idx( fd_tpool_t const * tpool,
733 3093 : ulong worker_idx ) {
734 3093 : return (ulong)fd_tpool_private_worker( tpool )[ worker_idx ]->tile_idx;
735 3093 : }
736 :
737 : FD_FN_PURE static inline void *
738 : fd_tpool_worker_scratch( fd_tpool_t const * tpool,
739 3093 : ulong worker_idx ) {
740 3093 : return fd_tpool_private_worker( tpool )[ worker_idx ]->scratch;
741 3093 : }
742 :
743 : FD_FN_PURE static inline ulong
744 : fd_tpool_worker_scratch_sz( fd_tpool_t const * tpool,
745 3093 : ulong worker_idx ) {
746 3093 : return fd_tpool_private_worker( tpool )[ worker_idx ]->scratch_sz;
747 3093 : }
748 :
749 : /* fd_tpool_worker_state atomically observes the state of tpool thread
750 : worker_idx at some point in time between when the call was made and
751 : the call returns. As this is used in high performance contexts, does
752 : no input argument checking. Specifically, assumes tpool is valid and
753 : worker_idx is in [0,worker_cnt). Return value will be a
754 : FD_TPOOL_WORKER_STATE value (and, in correct usage, either IDLE or
755 : EXEC). worker 0 is special. The state here will always be EXEC.
756 : This function acts as a compiler memory fence. */
757 :
758 : static inline int
759 : fd_tpool_worker_state( fd_tpool_t const * tpool,
760 3099 : ulong worker_idx ) {
761 3099 : int * _state = &fd_tpool_private_worker( tpool )[ worker_idx ]->state;
762 3099 : FD_COMPILER_MFENCE();
763 3099 : int state = *_state;
764 3099 : FD_COMPILER_MFENCE();
765 3099 : return state;
766 3099 : }
767 :
768 : /* fd_tpool_exec calls
769 :
770 : task( task_tpool,
771 : task_t0, task_t1,
772 : task_args,
773 : task_reduce, task_stride,
774 : task_l0, task_l1,
775 : task_m0, task_m1,
776 : task_n0, task_n1 );
777 :
778 : on tpool thread worker_idx. This will run concurrently with the
779 : caller. Uncaught exceptions in task will be logged by the remote
780 : thread (in principle, that is assuming the uncaught exception did not
781 : leave the application an unstable state, thread worker_idx will still
782 : be usable for additional fd_tpool_exec). As this is used in high
783 : performance contexts, does no input argument checking. Specifically,
784 : assumes tpool is valid, worker_idx in (0,worker_cnt) (yes, open on
785 : both ends), caller is not tpool thread worker_idx, task is valid.
786 : worker_idx 0 is special and is considered to always be in the EXEC
787 : state so we cannot call exec on it. This function acts as a compiler
788 : memory fence. */
789 :
790 : static inline void
791 : fd_tpool_exec( fd_tpool_t * tpool, ulong worker_idx,
792 : fd_tpool_task_t task,
793 : void * task_tpool,
794 : ulong task_t0, ulong task_t1,
795 : void * task_args,
796 : void * task_reduce, ulong task_stride,
797 : ulong task_l0, ulong task_l1,
798 : ulong task_m0, ulong task_m1,
799 30813768 : ulong task_n0, ulong task_n1 ) {
800 30813768 : fd_tpool_private_worker_t * worker = fd_tpool_private_worker( tpool )[ worker_idx ];
801 30813768 : worker->task = task;
802 30813768 : worker->task_tpool = task_tpool;
803 30813768 : worker->task_t0 = task_t0; worker->task_t1 = task_t1;
804 30813768 : worker->task_args = task_args;
805 30813768 : worker->task_reduce = task_reduce; worker->task_stride = task_stride;
806 30813768 : worker->task_l0 = task_l0; worker->task_l1 = task_l1;
807 30813768 : worker->task_m0 = task_m0; worker->task_m1 = task_m1;
808 30813768 : worker->task_n0 = task_n0; worker->task_n1 = task_n1;
809 30813768 : FD_COMPILER_MFENCE();
810 30813768 : worker->state = FD_TPOOL_WORKER_STATE_EXEC;
811 30813768 : FD_COMPILER_MFENCE();
812 30813768 : }
813 :
814 : /* fd_tpool_wait waits for the tpool thread worker_idx to leave the
815 : EXEC state. As this is used in high performance contexts, does no
816 : input argument checking. Specifically, assumes tpool is valid,
817 : worker_idx in (0,worker_cnt) (yes, open on both ends) and caller is
818 : not tpool thread worker_idx. worker_idx 0 is considered to always be
819 : in an exec state so we cannot call wait on it. On return, the caller
820 : atomically observed worker_idx was not in the EXEC state at some
821 : point between when this was called and when this returned. This
822 : function acts as a compiler memory fence. */
823 :
824 : static inline void
825 : fd_tpool_wait( fd_tpool_t const * tpool,
826 28415748 : ulong worker_idx ) {
827 28415748 : int * _state = &fd_tpool_private_worker( tpool )[ worker_idx ]->state;
828 692217827 : for(;;) {
829 692217827 : FD_COMPILER_MFENCE();
830 692217827 : int state = *_state;
831 692217827 : FD_COMPILER_MFENCE();
832 692217827 : if( FD_LIKELY( state!=FD_TPOOL_WORKER_STATE_EXEC ) ) break;
833 660141098 : FD_SPIN_PAUSE();
834 660141098 : }
835 28415748 : }
836 :
837 : /* Assuming the tasks can be executed safely in any order and/or
838 : concurrently, fd_tpool_exec_all_rrobin, fd_tpool_exec_all_block and
839 : fd_tpool_exec_all_taskq are functionally equivalent to:
840 :
841 : for( ulong l=l0; l<l1; l++ )
842 : task( task_tpool,t0,t1, task_args,task_reduce,task_stride, task_l0,task_l1, l,l+1, t,t+1 );
843 :
844 : where t indicates to which tpool worker thread idx the particular
845 : task was assigned (and thus t is in [t0,t1), where thread t0 is the
846 : thread that did the dispatch). The rrobin variant stripes individual
847 : tasks deterministically over worker threads (e.g. worker thread t
848 : does tasks (t-t0)+0*(t1-t0),(t-t0)+1*(t1-t0),(t-t0)+2*(t1-t0), ...
849 : The block variant blocks individual tasks over worker threads.
850 :
851 : The taskq variant requires FD_HAS_ATOMIC support and assigns tasks to
852 : threads dynamically. Practically, this is only useful if there are a
853 : huge number of tasks to execute relative to the number of threads,
854 : the tasks to execute have highly non-uniform sizes, the cost to
855 : execute a task is much much greater than the cost of a single atomic
856 : memory operation, and thread-core affinity doesn't impact the time to
857 : execute a task ... conditions that, in total, are met far less
858 : frequently than most developers expect.)
859 :
860 : fd_tpool_exec_all_batch is functionally equivalent to:
861 :
862 : for( ulong t=t0; t<t1; t++ ) {
863 : ulong batch_task_l0;
864 : ulong batch_task_l1;
865 : FD_TPOOL_PARTITION( task_l0,task_l1,1, t-t0,t1-t0, batch_task_l0,batch_task_l1 );
866 : task( task_tpool,t0,t1, task_args,task_reduce,task_stride, task_l0,task_l1, batch_task_l0,batch_task_l1, t,t+1 );
867 : }
868 :
869 : The batch assigned to a thread will be the same tasks as those
870 : assigned to a thread by fd_tpool_exec_all_block. The difference is
871 : that fd_tpool_exec_all_batch effectively merges all the calls to task
872 : that fd_tpool_exec_all_block would make in a block into a single call
873 : per thread.
874 :
875 : fd_tpool_exec_all_raw is functionally equivalent to:
876 :
877 : for( ulong t=t0; t<t1; t++ )
878 : task( task_tpool,t0,t1, task_args,task_reduce,task_stride, task_l0,task_l1, task_l0,task_l1, t,t+1 );
879 :
880 : This allows the caller to use their own partitioning strategies with
881 : minimal overhead.
882 :
883 : All these are executed thread parallel using the calling thread and
884 : tpool worker threads (t0,t1) (yes, open on both ends ... the caller
885 : masquerades as worker thread t0 if isn't actually worker thread t0 as
886 : far as exec_all is concerned ... see safety tip above about scratch).
887 : The caller should not be any of worker threads (t0,t1) and worker
888 : threads (t0,t1) should be idle on entry and should not be dispatched
889 : to while an exec_all is running.
890 :
891 : As such, in all of these, a task knows automatically which worker
892 : thread is processing this (t), the range of tasks assigned to it
893 : (e.g. [batch_task_l0,batch_task_l1)), the entire range of workers
894 : [t0,t1) in use, the entire range of tasks [task_l0,task_l1) as well
895 : as the original values for task_tpool, task_args, task_reduce,
896 : task_stride.
897 :
898 : As this is used in high performance contexts, this does no input
899 : argument checking. Specifically, it assumes tpool is valid, task is
900 : valid, 0<=t0<t1<=worker_cnt, l0<=l1. These functions act as a
901 : compiler memory fence. */
902 :
903 : #define FD_TPOOL_EXEC_ALL_DECL(style) \
904 : void \
905 : fd_tpool_private_exec_all_##style##_node( void * _node_tpool, \
906 : ulong node_t0, ulong node_t1, \
907 : void * args, \
908 : void * reduce, ulong stride, \
909 : ulong l0, ulong l1, \
910 : ulong _task, ulong _tpool, \
911 : ulong t0, ulong t1 ); \
912 : \
913 : static inline void \
914 : fd_tpool_exec_all_##style( fd_tpool_t * tpool, \
915 : ulong t0, ulong t1, \
916 : fd_tpool_task_t task, \
917 : void * task_tpool, \
918 : void * task_args, \
919 : void * task_reduce, ulong task_stride, \
920 2797440 : ulong task_l0, ulong task_l1 ) { \
921 2797440 : FD_COMPILER_MFENCE(); \
922 2797440 : fd_tpool_private_exec_all_##style##_node( tpool, t0,t1, task_args, task_reduce,task_stride, task_l0,task_l1, \
923 2797440 : (ulong)task,(ulong)task_tpool, t0,t1 ); \
924 2797440 : }
925 :
926 : FD_TPOOL_EXEC_ALL_DECL(rrobin)
927 : FD_TPOOL_EXEC_ALL_DECL(block)
928 : FD_TPOOL_EXEC_ALL_DECL(batch)
929 : FD_TPOOL_EXEC_ALL_DECL(raw)
930 :
931 : #if FD_HAS_ATOMIC
932 : void
933 : fd_tpool_private_exec_all_taskq_node( void * _node_tpool,
934 : ulong node_t0, ulong node_t1,
935 : void * args,
936 : void * reduce, ulong stride,
937 : ulong l0, ulong l1,
938 : ulong _task, ulong _l_next,
939 : ulong t0, ulong t1 );
940 :
941 : static inline void
942 : fd_tpool_exec_all_taskq( fd_tpool_t * tpool,
943 : ulong t0, ulong t1,
944 : fd_tpool_task_t task,
945 : void * task_tpool,
946 : void * task_args,
947 : void * task_reduce, ulong task_stride,
948 300000 : ulong task_l0, ulong task_l1 ) {
949 300000 : ulong l_next[16] __attribute((aligned(128)));
950 300000 : l_next[0] = task_l0;
951 300000 : l_next[1] = (ulong)task_tpool;
952 300000 : FD_COMPILER_MFENCE();
953 300000 : fd_tpool_private_exec_all_taskq_node( tpool, t0,t1, task_args, task_reduce,task_stride, task_l0,task_l1,
954 300000 : (ulong)task,(ulong)l_next, t0,t1 );
955 300000 : }
956 : #endif
957 :
958 : #undef FD_TPOOL_EXEC_ALL_DECL
959 :
960 : /* FD_FOR_ALL provides some macros for writing CUDA-ish parallel-for
961 : kernels executed via tpool threads. Example usage:
962 :
963 : In a header file:
964 :
965 : // my_vec_op uses tpool threads [tpool_t0,tpool_t1) to do:
966 : //
967 : // for( long i=i0; i<i1; i++ ) z[i] = my_scalar_op( x[i], y[i] );
968 : //
969 : // where x, y and z are pointers to non-overlapping my_ele_t
970 : // arrays. The caller is assumed to be thread tpool_t0, threads
971 : // (tpool_t0,tpool_t1) are assumed to be idle and
972 : // (block_i1-block_i0) in [0,LONG_MAX / FD_TILE_MAX].
973 :
974 : FD_FOR_ALL_PROTO( my_vec_op );
975 :
976 : In a source file that uses my_vec_op:
977 :
978 : FD_FOR_ALL( my_vec_op, tpool, t0,t1, i0,i1, x,y,z );
979 :
980 : In the source file that implements my_vec_op:
981 :
982 : FD_FOR_ALL_BEGIN( my_vec_op, 1024L ) {
983 :
984 : ... At this point:
985 :
986 : - longs block_i0 and block_i1 give the range of elements
987 : [block_i0,block_i1) for this thread to process. Other
988 : threads will process other approximately equally sized
989 : disjoint ranges in parallel.
990 :
991 : - long block_cnt = block_i1 - block_i0.
992 :
993 : - long block_thresh is the parameter used to optimize
994 : thread dispatch and was specified by the 1024L above.
995 : Element ranges with more than this number of elements are
996 : considered worth splitting over more than one thread if
997 : possible. An ultra high performance deterministic
998 : parallelized tree dispatch is used for good scaling,
999 : cache temporal locality and cache spatial locality.
1000 : Should be at least 1 and can be a run time evaluated
1001 : expression. This can also be used to make sure different
1002 : kernel dispatches consistently partition elements such
1003 : that there is good cache reuse between different thread
1004 : parallel kernels.
1005 :
1006 : - tpool is a handle of this thread's tpool and ulongs
1007 : tpool_t0 and tpool_t1 give the range of threads
1008 : [tpool_t0,tpool_t1) available to process this block.
1009 : This thread is tpool_t0 and threads (tpool_t0,tpool_t1)
1010 : are idle.
1011 :
1012 : - Even if the range (tpool_t0,tpool_t1) not empty, it is
1013 : almost certainly optimal to just have this thread process
1014 : all elements in the block single threaded. That is, when
1015 : processing many elements (>>block_thresh*(t1-t0)),
1016 : (tpool_t0,tpool_t1) will be empty. When processing a
1017 : small numbers of elements, FD_FOR_ALL already concluded
1018 : there were too few elements to dispatch to
1019 : (tpool_t0,tpool_t1).
1020 :
1021 : - In this example, ulongs _a0 = (ulong)x, _a1 = (ulong)y,
1022 : _a2 = (ulong)z. ulongs _a3, _a4, _a5, and _a6 are zero.
1023 :
1024 : - _tpool, _block_i0, _block_i1, _reduce_cnt, _reduce_stack
1025 : are reserved.
1026 :
1027 : IMPORTANT SAFETY TIP! DO NOT RETURN FROM THIS BLOCK. (IF
1028 : ENDING A BLOCK EARLY, USE BREAK.)
1029 :
1030 : my_ele_t const * restrict x = (my_ele_t const *)_a0;
1031 : my_ele_t const * restrict y = (my_ele_t const *)_a1;
1032 : my_ele_t * restrict z = (my_ele_t *)_a2;
1033 :
1034 : for( long i=block_i0; i<block_i1; i++ ) z[i] = my_scalar_op( x[i], y[i] );
1035 :
1036 : } FD_FOR_ALL_END
1037 :
1038 : FD_FOR_ALL operations are a compiler memory fence. */
1039 :
1040 : #define FD_FOR_ALL_PROTO(op) \
1041 : void \
1042 : op( void * _tpool, /* Assumes valid */ \
1043 : ulong tpool_t0, ulong tpool_t1, /* Assumes t0<=t1, caller is t0, (t0,t1) idle */ \
1044 : void * _block_i0, void * _block_i1, /* Assumes block_cnt in [0,LONG_MAX/tpool_cnt] */ \
1045 : ulong _a0, ulong _a1, \
1046 : ulong _a2, ulong _a3, \
1047 : ulong _a4, ulong _a5, \
1048 : ulong _a6 )
1049 :
1050 : #define FD_FOR_ALL_BEGIN(op,BLOCK_THRESH) \
1051 : void \
1052 : op( void * _tpool, \
1053 : ulong tpool_t0, ulong tpool_t1, \
1054 : void * _block_i0, void * _block_i1, \
1055 : ulong _a0, ulong _a1, \
1056 : ulong _a2, ulong _a3, \
1057 : ulong _a4, ulong _a5, \
1058 14316955 : ulong _a6 ) { \
1059 14316955 : FD_COMPILER_MFENCE(); /* guarantees memory fence even if tpool_cnt==1 */ \
1060 14316955 : long block_thresh = (BLOCK_THRESH); \
1061 14316955 : fd_tpool_t * tpool = (fd_tpool_t *)_tpool; \
1062 14316955 : long block_i0 = (long)_block_i0; \
1063 14316955 : long block_i1 = (long)_block_i1; \
1064 14316955 : long block_cnt; \
1065 14316955 : ulong _reduce_cnt = 0UL; \
1066 14316955 : ushort _reduce_stack[ 16 ]; /* Assumes TILE_MAX<=65536 */ \
1067 23785791 : for(;;) { \
1068 23785791 : ulong tpool_cnt = tpool_t1 - tpool_t0; \
1069 23785791 : /**/ block_cnt = block_i1 - block_i0; \
1070 23785791 : if( FD_LIKELY( (tpool_cnt<=1UL) | (block_cnt<=block_thresh) ) ) break; \
1071 23785791 : ulong tpool_cs = fd_tpool_private_split( tpool_cnt ); \
1072 10622800 : ulong tpool_ts = tpool_t0 + tpool_cs; \
1073 10622800 : long block_is = block_i0 + (long)((tpool_cs*(ulong)block_cnt) / tpool_cnt); /* No overflow */ \
1074 10622800 : fd_tpool_exec( tpool,tpool_ts, op, tpool,tpool_ts,tpool_t1, (void *)block_is,(void *)block_i1, \
1075 10622800 : _a0,_a1,_a2,_a3,_a4,_a5,_a6 ); \
1076 10622800 : _reduce_stack[ _reduce_cnt++ ] = (ushort)tpool_ts; \
1077 10622800 : tpool_t1 = tpool_ts; \
1078 10622800 : block_i1 = block_is; \
1079 10622800 : } \
1080 14316955 : do
1081 :
1082 : #define FD_FOR_ALL_END \
1083 14316955 : while(0); \
1084 25006443 : while( _reduce_cnt ) fd_tpool_wait( tpool, (ulong)_reduce_stack[ --_reduce_cnt ] ); \
1085 8007339 : }
1086 :
1087 : #define FD_FOR_ALL_PRIVATE_F(...) too_few_arguments_passed_to_FD_FOR_ALL
1088 1897452 : #define FD_FOR_ALL_PRIVATE_0(op,tpool,t0,t1,i0,i1 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), 0UL, 0UL, 0UL, 0UL, 0UL, 0UL, 0UL )
1089 300000 : #define FD_FOR_ALL_PRIVATE_1(op,tpool,t0,t1,i0,i1,a0 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), 0UL, 0UL, 0UL, 0UL, 0UL, 0UL )
1090 300000 : #define FD_FOR_ALL_PRIVATE_2(op,tpool,t0,t1,i0,i1,a0,a1 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), (ulong)(a1), 0UL, 0UL, 0UL, 0UL, 0UL )
1091 300000 : #define FD_FOR_ALL_PRIVATE_3(op,tpool,t0,t1,i0,i1,a0,a1,a2 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), (ulong)(a1), (ulong)(a2), 0UL, 0UL, 0UL, 0UL )
1092 300000 : #define FD_FOR_ALL_PRIVATE_4(op,tpool,t0,t1,i0,i1,a0,a1,a2,a3 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), (ulong)(a1), (ulong)(a2), (ulong)(a3), 0UL, 0UL, 0UL )
1093 300000 : #define FD_FOR_ALL_PRIVATE_5(op,tpool,t0,t1,i0,i1,a0,a1,a2,a3,a4 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), (ulong)(a1), (ulong)(a2), (ulong)(a3), (ulong)(a4), 0UL, 0UL )
1094 300000 : #define FD_FOR_ALL_PRIVATE_6(op,tpool,t0,t1,i0,i1,a0,a1,a2,a3,a4,a5 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), (ulong)(a1), (ulong)(a2), (ulong)(a3), (ulong)(a4), (ulong)(a5), 0UL )
1095 300000 : #define FD_FOR_ALL_PRIVATE_7(op,tpool,t0,t1,i0,i1,a0,a1,a2,a3,a4,a5,a6) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), (ulong)(a1), (ulong)(a2), (ulong)(a3), (ulong)(a4), (ulong)(a5), (ulong)(a6) )
1096 : #define FD_FOR_ALL_PRIVATE_M(...) too_many_arguments_passed_to_FD_FOR_ALL
1097 2400060 : #define FD_FOR_ALL(...) FD_EXPAND_THEN_CONCAT2(FD_FOR_ALL_PRIVATE_,FD_VA_ARGS_SELECT(__VA_ARGS__,M,M,M,M,M,M,M,M,M,M,M,M,M,M,M,M,M,M,M,7,6,5,4,3,2,1,0,F,F,F,F,F))(__VA_ARGS__)
1098 :
1099 : /* FD_MAP_REDUCE extends to FD_FOR_ALL to reduce results back to thread
1100 : tpool_t0. IMPORTANT SAFETY TIP! Adequate scratch memory must be
1101 : configured for threads used by a MAP_REDUCE.
1102 :
1103 : Example usage:
1104 :
1105 : In a header file:
1106 :
1107 : // my_hist_op uses tpool threads [tpool_t0,tpool_t1) to compute:
1108 : //
1109 : // long h[4] __attribute__((aligned(128)));
1110 : // for( long j=0L; j<4L; j++ ) h[j] = 0L;
1111 : // for( long i=i0; i<i1; i++ ) {
1112 : // long j = my_map_op( x[i] );
1113 : // h[j]++;
1114 : // }
1115 : //
1116 : // where x is a pointer to a my_ele_t array. The caller is
1117 : // assumed to be thread tpool_t0 and threads (tpool_t0,tpool_t1)
1118 : // are assumed to be idle. Requires thread local scratch space
1119 : // per tpool thread of ~4 ceil lg (tpool_t1-tpool_t0) ulong.
1120 :
1121 : FD_MAP_REDUCE_PROTO( my_hist_op );
1122 :
1123 : In the source file that uses my_hist_op:
1124 :
1125 : long h[4] __attribute__((aligned(128)));
1126 : FD_MAP_REDUCE( my_hist_op, tpool, t0,t1, i0,i1, x, h );
1127 :
1128 : In the source file that implements my_hist_op:
1129 :
1130 : FD_MAP_REDUCE_BEGIN( my_hist_op, 1L, 128UL, 4UL*sizeof(long) ) {
1131 :
1132 : ... At this point:
1133 :
1134 : - block_i0, block_i1, block_cnt, block_thresh, tpool,
1135 : tpool_t0, tpool_t1, _a0 through _a5 have the exact same
1136 : behaviors as FD_FOR_ALL. _tpool, _block_i0, _block_i1,
1137 : _reduce_cnt, _reduce_stack are similarly reserved.
1138 :
1139 : - Unlike FD_FOR_ALL, there is no _a6. Instead ulong _r0
1140 : holds the location where this thread should store its
1141 : local reduction.
1142 :
1143 : - ulong reduce_align and reduce_footprint give the alignment
1144 : and footprint of this region. In this example, these are
1145 : given by the 128UL and 4UL*sizeof(long) above. Like
1146 : block_thresh, these can be run-time evaluated
1147 : expressions. This assumes that the region passed by the
1148 : caller for the final results has a compatible alignment
1149 : and footprint.
1150 :
1151 : IMPORTANT SAFETY TIP! DO NOT RETURN FROM THIS BLOCK. (IF
1152 : ENDING A BLOCK EARLY, USE BREAK.)
1153 :
1154 : my_ele_t const * restrict x = (my_ele_t const *)_a0;
1155 : long * restrict h = (long *)_r0;
1156 :
1157 : for( long j=0L; j<4L; j++ ) h[j] = 0UL;
1158 : for( long i=block_i0; i<block_i1; i++ ) {
1159 : long j = my_map_op( x[i] );
1160 : h[j]++;
1161 : }
1162 :
1163 : } FD_MAP_END {
1164 :
1165 : ... At this point, the environment is as described above with the
1166 : following differences:
1167 :
1168 : - There are at least two threads in [tpool_t0,tpool_t1).
1169 :
1170 : - On entry, ulongs _r0 and _r1 point the partials to reduce
1171 : (cast to ulongs). These will have an alignment and
1172 : footprint compatible with the above.
1173 :
1174 : - On exit, _r1 has been reduced into _r0. It is okay if
1175 : _r1 is clobbered in this process.
1176 :
1177 : - [block_i0,block_i1) give the range of elements covered by
1178 : the output of this reduction (though this is not typically
1179 : used).
1180 :
1181 : IMPORTANT SAFETY TIP! While this reduction is often
1182 : theoretically parallelizable and threads
1183 : [tpool_t0,tpool_t1) are available here for such,
1184 : parallelization of this can often be counterproductive
1185 : (especially if the amount to reduce is small, the reduction
1186 : operation is cheap and the arrays to reduce have poor
1187 : spatial locality for reduction here due to the mapping
1188 : phase above).
1189 :
1190 : IMPORTANT SAFETY TIP! DO NOT RETURN FROM THIS BLOCK. (IF
1191 : ENDING A BLOCK EARLY, USE BREAK.)
1192 :
1193 : long * restrict h0 = (long *)_r0;
1194 : long const * restrict h1 = (long const *)_r1;
1195 :
1196 : for( long j=0L; j<4L; j++ ) h0[j] += h1[j];
1197 :
1198 : } FD_REDUCE_END
1199 :
1200 : FD_MAP_REDUCE operations are a compiler memory fence. */
1201 :
1202 : #define FD_MAP_REDUCE_PROTO(op) \
1203 : void \
1204 : op( void * _tpool, /* Assumes valid */ \
1205 : ulong tpool_t0, ulong tpool_t1, /* Assumes t0<=t1, caller is t0, (t0,t1) idle */ \
1206 : void * _block_i0, void * _block_i1, /* Assumes block_cnt in [0,LONG_MAX/tpool_cnt] */ \
1207 : ulong _a0, ulong _a1, \
1208 : ulong _a2, ulong _a3, \
1209 : ulong _a4, ulong _a5, \
1210 : ulong _r0 )
1211 :
1212 : #define FD_MAP_REDUCE_BEGIN(op,BLOCK_THRESH,REDUCE_ALIGN,REDUCE_FOOTPRINT) \
1213 : void \
1214 : op( void * _tpool, \
1215 : ulong tpool_t0, ulong tpool_t1, \
1216 : void * _block_i0, void * _block_i1, \
1217 : ulong _a0, ulong _a1, \
1218 : ulong _a2, ulong _a3, \
1219 : ulong _a4, ulong _a5, \
1220 13602089 : ulong _r0 ) { \
1221 13602089 : FD_COMPILER_MFENCE(); /* guarantees memory fence even if tpool_cnt==1 */ \
1222 13602089 : long block_thresh = (BLOCK_THRESH); \
1223 13602089 : ulong reduce_align = (REDUCE_ALIGN); \
1224 13602089 : ulong reduce_footprint = (REDUCE_FOOTPRINT); \
1225 13602089 : fd_tpool_t * tpool = (fd_tpool_t *)_tpool; \
1226 13602089 : long block_i0 = (long)_block_i0; \
1227 13602089 : long block_i1 = (long)_block_i1; \
1228 13602089 : long block_cnt; \
1229 13602089 : ulong _reduce_cnt = 0UL; \
1230 13602089 : struct { \
1231 13602089 : uint ts; /* ts is thread that needs to complete before reduction can proceed */ \
1232 13602089 : uint t1; /* [t0,t1) is range of threads available for reduction */ \
1233 13602089 : ulong r1; /* r1 is the scratch memory used for the reduction */ \
1234 13602089 : long i1; /* [i0,i1) is range the reduction output covers */ \
1235 13602089 : } _reduce_stack[ 16 ]; /* Assumes TILE_MAX<65536 (yes strictly less) */ \
1236 13602089 : fd_scratch_push(); \
1237 21754916 : for(;;) { \
1238 21754916 : ulong tpool_cnt = tpool_t1 - tpool_t0; \
1239 21754916 : /**/ block_cnt = block_i1 - block_i0; \
1240 21754916 : if( FD_LIKELY( (tpool_cnt<=1UL) | (block_cnt<=block_thresh) ) ) break; \
1241 21754916 : ulong tpool_cs = fd_tpool_private_split( tpool_cnt ); \
1242 9780905 : ulong tpool_ts = tpool_t0 + tpool_cs; \
1243 9780905 : long block_is = block_i0 + (long)((tpool_cs*(ulong)block_cnt) / tpool_cnt); /* No overflow */ \
1244 9780905 : ulong _r1 = (ulong)fd_scratch_alloc( reduce_align, reduce_footprint ); \
1245 9780905 : fd_tpool_exec( tpool, tpool_ts, op, tpool,tpool_ts,tpool_t1, (void *)block_is,(void *)block_i1, \
1246 9780905 : _a0,_a1,_a2,_a3,_a4,_a5,_r1 ); \
1247 9780905 : _reduce_stack[ _reduce_cnt ].ts = (uint)tpool_ts; \
1248 9780905 : _reduce_stack[ _reduce_cnt ].t1 = (uint)tpool_t1; \
1249 9780905 : _reduce_stack[ _reduce_cnt ].i1 = block_i1; \
1250 9780905 : _reduce_stack[ _reduce_cnt ].r1 = _r1; \
1251 9780905 : _reduce_cnt++; \
1252 9780905 : tpool_t1 = tpool_ts; \
1253 9780905 : block_i1 = block_is; \
1254 9780905 : } \
1255 13602089 : do
1256 :
1257 : #define FD_MAP_END \
1258 13602089 : while(0); \
1259 23588296 : while( _reduce_cnt ) { \
1260 9986207 : --_reduce_cnt; \
1261 9986207 : ulong _r1 = _reduce_stack[ _reduce_cnt ].r1; \
1262 9986207 : block_i1 = _reduce_stack[ _reduce_cnt ].i1; \
1263 9986207 : tpool_t1 = (ulong)_reduce_stack[ _reduce_cnt ].t1; \
1264 9986207 : block_cnt = block_i1 - block_i0; \
1265 9986207 : fd_tpool_wait( tpool, (ulong)_reduce_stack[ _reduce_cnt ].ts ); \
1266 9986207 : (void)_r0; (void)_r1; \
1267 9986207 : do
1268 :
1269 : #define FD_REDUCE_END \
1270 9986207 : while(0); \
1271 9986207 : } \
1272 13602089 : fd_scratch_pop(); \
1273 7114486 : }
1274 :
1275 : #define FD_MAP_REDUCE_PRIVATE_F(...) too_few_arguments_passed_to_FD_MAP_REDUCE
1276 1897440 : #define FD_MAP_REDUCE_PRIVATE_0(op,tpool,t0,t1,i0,i1, r0 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), 0UL, 0UL, 0UL, 0UL, 0UL, 0UL, (ulong)(r0) )
1277 300000 : #define FD_MAP_REDUCE_PRIVATE_1(op,tpool,t0,t1,i0,i1,a0, r0 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), 0UL, 0UL, 0UL, 0UL, 0UL, (ulong)(r0) )
1278 300000 : #define FD_MAP_REDUCE_PRIVATE_2(op,tpool,t0,t1,i0,i1,a0,a1, r0 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), (ulong)(a1), 0UL, 0UL, 0UL, 0UL, (ulong)(r0) )
1279 300000 : #define FD_MAP_REDUCE_PRIVATE_3(op,tpool,t0,t1,i0,i1,a0,a1,a2, r0 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), (ulong)(a1), (ulong)(a2), 0UL, 0UL, 0UL, (ulong)(r0) )
1280 300000 : #define FD_MAP_REDUCE_PRIVATE_4(op,tpool,t0,t1,i0,i1,a0,a1,a2,a3, r0 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), (ulong)(a1), (ulong)(a2), (ulong)(a3), 0UL, 0UL, (ulong)(r0) )
1281 300000 : #define FD_MAP_REDUCE_PRIVATE_5(op,tpool,t0,t1,i0,i1,a0,a1,a2,a3,a4, r0 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), (ulong)(a1), (ulong)(a2), (ulong)(a3), (ulong)(a4), 0UL, (ulong)(r0) )
1282 300000 : #define FD_MAP_REDUCE_PRIVATE_6(op,tpool,t0,t1,i0,i1,a0,a1,a2,a3,a4,a5,r0 ) (op)( (tpool),(t0),(t1), (void *)(i0),(void *)(i1), (ulong)(a0), (ulong)(a1), (ulong)(a2), (ulong)(a3), (ulong)(a4), (ulong)(a5), (ulong)(r0) )
1283 : #define FD_MAP_REDUCE_PRIVATE_M(...) too_many_arguments_passed_to_FD_MAP_REDUCE
1284 2100048 : #define FD_MAP_REDUCE(...) FD_EXPAND_THEN_CONCAT2(FD_MAP_REDUCE_PRIVATE_,FD_VA_ARGS_SELECT(__VA_ARGS__,M,M,M,M,M,M,M,M,M,M,M,M,M,M,M,M,M,M,M,6,5,4,3,2,1,0,F,F,F,F,F,F))(__VA_ARGS__)
1285 :
1286 : /* fd_tpool_worker_state_cstr converts an FD_TPOOL_WORKER_STATE_* code
1287 : into a human readable cstr. The lifetime of the returned pointer is
1288 : infinite. The returned pointer is always to a non-NULL cstr. */
1289 :
1290 : FD_FN_CONST char const *
1291 : fd_tpool_worker_state_cstr( int state );
1292 :
1293 : FD_PROTOTYPES_END
1294 :
1295 : #endif /* HEADER_fd_src_util_tpool_fd_tpool_h */
|