Line data Source code
1 : #ifndef HEADER_fd_src_util_tpool_fd_tpool_h
2 : #define HEADER_fd_src_util_tpool_fd_tpool_h
3 :
4 : /* tpool provides APIs to group sets of tiles together for ultra low
5 : overhead and high scalability launching of thread parallel jobs.
6 : There is a lot of nuance that most thread pool APIs and
7 : implementations get wrong. And this nuance is very useful to
8 : understand why the APIs and implementations are the way they are.
9 : So, a crash course:
10 :
11 : High performance thread parallelism crash course ********************
12 :
13 : Consider the simple and highly idealized case of a job with N
14 : independent tasks that each take an approximately uniform time
15 : tau_task to execute.
16 :
17 : A serial implementation of this job might look like:
18 :
19 : for( ulong n=0; n<N; n++ ) ... do task n ...;
20 :
21 : And the overall time to do such a job is:
22 :
23 : T_serial ~ tau_overhead + tau_task N
24 :
25 : where tau_overhead represents O(1) costs like the overhead to setup
26 : the loop that will do the N tasks.
27 :
28 : In a parallel implementation, we want to uniformly divide these tasks
29 : over P threads to speed this up. If we use the naive strategy that
30 : many codes and libraries use, we have something like (either
31 : explicitly or under the hood and ignoring error trapping):
32 :
33 : static void *
34 : task_block( void * _args ) {
35 : task_args_t * args = (task_args_t *)_args;
36 : ... unpack task args here, including
37 : ulong n0 = args->n0;
38 : ulong n1 = args->n1;
39 :
40 : for( ulong n=n0; n<n1; n++ ) ... do task n ...;
41 :
42 : return NULL;
43 : }
44 :
45 : ... in the main thread ...
46 :
47 : task_args_t * args = (task_args_t *)malloc( P*sizeof(task_args_t) );
48 : pthread_t * thread = (pthread_t *) malloc( P*sizeof(pthread_t) );
49 : for( ulong p=0; p<P; p++ ) args[p] = ... block p info ...;
50 : for( ulong p=1; p<P; p++ ) pthread_create( &thread[p], NULL, task_block, &args[p] );
51 : task_block( &args[0] );
52 : for( ulong p=1; p<P; p++ ) pthread_join( thread[p], NULL );
53 : free( thread );
54 : free( args );
55 :
56 : (Above, we used the main thread as one of the P threads in job
57 : execution such that it both dispatches and does a share of the work.
58 : This discussion still applies if we just have main do dispatch only.)
59 :
60 : Ugh ... this does parallelize the task over P threads but it is
61 : practically worthless. The overall time to execute is:
62 :
63 : T_parallel_dumb ~ tau_overhead_dumb
64 : + tau_partition_dumb P
65 : + tau_dispatch_dumb (P-1)
66 : + tau_task N / P
67 :
68 : tau_overhead_dumb represents that the overhead from before, but
69 : with more dumb. That is, it is likely a lot larger than the serial
70 : version because of the allocations and need to wrap up the tasks in a
71 : thread dispatch function signature compatible way (pthread_create
72 : "start_routine" in this case). (There are usually additional
73 : overheads omitted from this simplified discussion.)
74 :
75 : tau_partition_dumb is the time spent by the main thread partitioning
76 : the work for the worker threads per worker thread. This is dumb
77 : because this work could be parallelized (i.e. done within the task
78 : itself). A second, less obvious, dumbness is that, in complex work
79 : distribution scenarios, it is often very difficult for the main
80 : thread to determine the optimal partition a worker thread should do
81 : (see sgemm dispatch example below of something very hard to do
82 : centrally but very easy to do distributed). This ends up a triple
83 : whammy of awful: the parallelization strategy is inefficient,
84 : computed inefficiently and computed by one thread. Amdahl
85 : bottlenecks for everyone!
86 :
87 : tau_dispatch_dumb is the time needed to start and stop a worker
88 : thread to run a task. This is dumb because things like
89 : pthread_create have a huge overhead and because the total overhead is
90 : linear in P. (In practice, this is even dumber than it appears
91 : because we didn't do anything fancy with the created threads to
92 : mitigate thread-core affinity effects, IPC effects, kernel scheduling
93 : effects, NUMA effects, TLB effects, ... neglected in this idealized
94 : example.)
95 :
96 : Thus, we better have an infinite N if we are going to be able
97 : usefully parallelize over many threads under the dumb parallelization
98 : strategies.
99 :
100 : But, in the real world, N is not infinite and often our goal is to
101 : reduce the time needed to do a finite amount of work. Throwing all
102 : the threads in the world at the job will not help because, while
103 : increasing the number of threads decreases the amount of time
104 : required to do the tasks hyperbolically, it _increases_ _linearly_
105 : the amount of time needed to setup the threads to do those tasks.
106 :
107 : This leads to the question: what is maximum number of threads we can
108 : use profitably?
109 :
110 : This will be the P that minimizes T_parallel_dumb. Call this value
111 : P_max_useful_dumb. Given this point is a minima of T_parallel_dumb,
112 : for P<P_max_useful_dumb / P>P_max_useful_dumb, using _more_ / _less_
113 : threads for fixed sized job is useful.
114 :
115 : From basic calculus, P_max_useful_dumb is where the derivative of
116 : T_parallel_dumb with respect to P is zero. With:
117 :
118 : dT_parallel_dumb / dP ~ tau_partition_dumb
119 : + tau_dispatch_dumb
120 : - tau_task N / P^2
121 :
122 : we can solve for dT_parallel_dumb / dP = 0. This yields:
123 :
124 : P_max_useful_dumb ~ sqrt( tau_task N / ( tau_partition_dumb + tau_dispatch_dumb ) )
125 :
126 : This does not even weak scale! Doubling N does not double the
127 : P_max_useful_dumb; it only allows increases by ~sqrt(2). Sad.
128 :
129 : And it is worse than that because tau_dispatch_dumb is typically
130 : massive and tau_task is often infinitesimal.
131 :
132 : For example, in parallelizing level 1 matrix operations (e.g. a
133 : vector add), N is often related to the number of scalar flops needed
134 : and thus tau_task is small O(1) times the marginal cost of doing a
135 : single flop, assuming the problem itself fits within cache. That is,
136 : tau_task is measured in picoseconds (e.g. wide vector SIMD double
137 : pumped FMA units on a ~3 GHz core).
138 :
139 : But tau_dispatch_dumb is measured in milliseconds (e.g.
140 : pthread_create + pthread_join with all their glorious O/S and kernel
141 : scheduler overheads).
142 :
143 : This means we need huge N before it being even worthwhile to consider
144 : parallelization under the dumb strategy. And once N is large enough,
145 : the problem transitions from being cacheable to uncachable. And in
146 : that regime, memory bandwidth limits not considered here dominate
147 : (all those cores starved of useful work because of badly designed
148 : software and limited memory bandwidth). Embarrassing.
149 :
150 : All too common a story: "We parallelized [important thing] with
151 : [rando popular but naive thread library du jour]. It got slower.
152 : Guess it was already as efficient as it could be."
153 :
154 : Sigh ... fortunately, the math also gives the prescription of what we
155 : need to do (and maybe gives a hint, contrary to belief floating
156 : around in blogs and what not, math is in fact critically important
157 : for coding).
158 :
159 : First, we need to parallelize the partition calculation by having
160 : each worker thread concurrently compute their slice of the job. We
161 : can do that by passing a reference to a description of whole job
162 : along with just enough context to each thread that they can determine
163 : their slice (e.g. a thread index and number of threads). This
164 : eliminates tau_partition_dumb from the denominator above but it
165 : requires a better thread dispatcher function signature than used
166 : above.
167 :
168 : Second, we create all the threads we are going to use up front, pin
169 : them to dedicated isolated cores at high kernel schedule priority
170 : with optimized NUMA and TLB aware stacks and scratch spaces to
171 : massively reduce tau_dispatch_dumb overheads (and also fix some more
172 : subtle performance drains in practical usage at the same time).
173 : These threads will spin on memory in optimized ways to make the time
174 : to wake up a thread on the order of the time it takes to transfer a
175 : cache line to a core (tens to hundreds of ns).
176 :
177 : Together, these turn the above into something like:
178 :
179 : static void
180 : task_block( void * _args,
181 : ulong p,
182 : ulong P ) {
183 : job_args_t * args = (job_args_t *)_args;
184 : ... unpack job args here, including
185 : ulong N = args->N;
186 :
187 : // Compute the block of tasks to be done by this thread
188 : ulong n0;
189 : ulong n1;
190 : FD_TPOOL_PARTITION( 0,N,1, p,P, n0,n1 );
191 :
192 : for( ulong n=n0; n<n1; n++ ) ... do task n ...;
193 : }
194 :
195 : ... in main thread ...
196 :
197 : job_args_t args[1] = { ... job info ... };
198 : for( ulong p=1; p<P; p++ ) tpool_exec( tpool,p, task_block,args, p,P );
199 : task_block( args, 0,P );
200 : for( ulong p=1; p<P; p++ ) tpool_wait( tpool,p );
201 :
202 : This is dramatically better in implementation as now we have:
203 :
204 : P_max_useful_okay ~ sqrt( tau_task N / tau_dispatch_smart )
205 :
206 : where tau_dispatch_smart << tau_dispatch_dumb. Additionally, the
207 : various overheads in the main thread are much lower.
208 :
209 : While this is able to use dramatically more threads profitably for a
210 : fixed job size, we still aren't able to weak scale. To do that, we
211 : need to parallelize the thread dispatch too.
212 :
213 : Third, we parallelize thread dispatch. The obvious but wrong
214 : solution is to use a single common memory location that all worker
215 : threads in the pool will wait on to start execution and a single
216 : common memory location that all workers will atomically increment at
217 : completion so the main thread only has to spin on one memory location
218 : to tell if the job is complete.
219 :
220 : This is wrong practically because, while it superficially looks like
221 : O(1) operations for the dispatch and wait, this is a cache coherence
222 : protocol nightmare beyond compare. There will be large constant O(N)
223 : messaging on the CPUs internal network-on-chip to distribute the
224 : information to/from all the worker threads. Such will often not be
225 : parallelized under the hood particularly well or particularly
226 : deterministic (especially the waiting part ... the main thread still
227 : needs to get N updates from the workers that will be serialized with
228 : lots of cache line ping-pong-ing on top of atomic operations under
229 : the hood as they funnel back toward the main thread).
230 :
231 : Instead, we can recursively divide the job to unambiguously
232 : parallelize task dispatch. This yields something like:
233 :
234 : static void
235 : task_node( tpool_t * tpool, // thread pool to use
236 : ulong p0, // Assumes p0>=0, caller is p0
237 : ulong p1, // Assumes p1>p0
238 : void * args,
239 : ulong P ) { // Assumes P>=p1
240 :
241 : // At this point we are worker thread p0 and we are responsible
242 : // for dispatching work to workers [p0,p1)
243 :
244 : ulong p_cnt = p1-p0;
245 : if( p_cnt>1UL ) {
246 :
247 : // At this point, we have more than one thread available.
248 : // Split the workers approximately in half and have worker
249 : // thread ps dispatch work to threads [ps,p1) while we
250 : // concurrently dispatch work to threads [p0,ps).
251 :
252 : ulong ps = p0 + (p_cnt>>1);
253 : tpool_exec( tpool,ps, task_node, tpool,ps,p1, args, P );
254 : task_node( tpool,p0,ps, args, P );
255 : tpool_wait( tpool,ps );
256 : return;
257 : }
258 :
259 : // At this point, there is only one thread available
260 : // Do our slice of the job.
261 :
262 : job_args_t * args = (job_args_t *)_args;
263 : ... unpack job args here, including
264 : ulong N = args->N;
265 :
266 : // Compute block of tasks to be done by this thread
267 : ulong n0;
268 : ulong n1;
269 : FD_TPOOL_PARTITION( 0,N,1, p,P, n0,n1 );
270 :
271 : for( ulong n=n0; n<n1; n++ ) ... do task n ...;
272 : }
273 :
274 : ... in the main thread ...
275 :
276 : job_args_t args[1] = { ... job info ... };
277 : task_node( tpool,0,P, args, P );
278 :
279 : This has an algorithmically different and cheaper cost model for
280 : thread parallelization:
281 :
282 : T_parallel_smart ~ tau_overhead_smart
283 : + tau_dispatch_smart ceil log2(P)
284 : + tau_task N / P
285 :
286 : With the magical power of calculus again, noting that
287 : log2(P)=ln(P)/ln(2) and ceil log2(P) ~ log2(P) asymptotically, we
288 : have:
289 :
290 : dT_parallel_smart / dP ~ tau_dispatch_smart / (P ln(2)) - tau_task N/P^2
291 :
292 : such that:
293 :
294 : P_max_useful_smart ~ tau_task ln(2) N / tau_dispatch_smart
295 :
296 : Now we can weak scale! If we double the amount of work, we can
297 : double the maximum useful threads we can apply.
298 :
299 : Since tau_overhead_smart ~ tau_overhead_serial << tau_overhead_dumb,
300 : tau_dispatch_smart << tau_dispatch_dumb and N isn't stuck inside a
301 : sqrt anymore, we also can profitably parallelize _orders_ _of_
302 : _magnitude_ smaller problems than before.
303 :
304 : As a further bonus, the above dispatch pattern naturally supports
305 : much more complex parallelizations (see the sgemm example below) and
306 : naturally has good thread-NUMA topology oblivious and cache oblivious
307 : algorithmic properties (ideally the tpool threads themselves have
308 : good spatial locality for best results).
309 :
310 : Last, we can wrap all this up so that, in simple cases, all the user
311 : needs to do is:
312 :
313 : static void
314 : task_block( void * _args,
315 : ulong p,
316 : ulong P ) {
317 : job_args_t * args = (job_args_t *)_args;
318 : ... unpack job args here, including
319 : ulong N = args->N;
320 :
321 : // Compute block of tasks to be done by this thread
322 : ulong n0;
323 : ulong n1;
324 : FD_TPOOL_PARTITION( 0,N,1, p,P, n0,n1 );
325 :
326 : for( ulong n=n0; n<n1; n++ ) ... do task n ...;
327 : }
328 :
329 : ... in the main thread ...
330 :
331 : job_args_t args[1] = { ... args ... };
332 : tpool_exec_all( tpool,0,P, task_block,args );
333 :
334 : and, in the main thread, it will act like:
335 :
336 : job_args_t args[1] = { ... args ... };
337 : for( ulong p=0; p<P; p++ ) task_block( args, p,P );
338 :
339 : but have a performance characteristic like T_parallel_smart.
340 :
341 : The actual function signature used for a fd_tpool_task_t below was
342 : picked to be sufficient to tightly and uniformly partition an
343 : arbitrarily shaped layer 3 blas sgemm A^T B matrix calculation over
344 : lots of cores in an arbitrarily shaped NUMA topologies without a lot
345 : of overhead in the dispatch logic. For example (and this also shows
346 : a powerful use case for scratch memory too):
347 :
348 : // This is called by worker thread t0 and uses tpool worker threads
349 : // [t0,t1) to compute:
350 : //
351 : // C([0,l1-l0),[0,m1-m0)) = A([m0,m1),[l0,l1))' B([m0,m1),[n0,n1))
352 :
353 : void
354 : atb_node( fd_tpool_t * tpool, // tpool to use
355 : ulong t0, ulong t1, // Assumes t1>t0
356 : atb_args_t * args, // Location of A and B matrices and their column strides
357 : float * C, // (l1-l0)*(n1-n0) col-major matrix with col stride sC
358 : ulong sC, // Assumes sC>=(l1-l0), C(i,j) at C[ i + sC*j ]
359 : ulong l0, ulong l1, // Assumes l1>=l0
360 : ulong m0, ulong m1, // Assumes m1>=m0
361 : ulong n0, ulong n1 ) { // Assumes n1>=n0
362 :
363 : // At this point, we are worker thread t0 and we are responsible
364 : // for dispatching work to worker threads [t0,t1)
365 :
366 : ulong t_cnt = t1-t0;
367 : if( t_cnt>1UL ) { // ... and optionally the task big enough to be worth parallelizing
368 :
369 : // We need to split this task over more than one thread.
370 : //
371 : // Split the worker threads into two approximately equal sized
372 : // sets (left and right) and then proportionally split the
373 : // longest matrix range to try to make the subtasks as square
374 : // as possible (and thus improve compute data ratio /
375 : // cacheability of subtasks). Note that we do not just split
376 : // matrix ranges in half because such a split would result in
377 : // an increasingly bad load imbalance between the left and
378 : // right workers when the matrix range to split is large but
379 : // t_cnt is small and odd.
380 : //
381 : // We then execute in parallel the left range on worker threads
382 : // [t0,th) and the right range on worker threads [th,t1). Yes,
383 : // this uses recursion in the thread dispatch to get radically
384 : // better scalability, lower overhead and ability to map
385 : // arbitrary shaped problems onto arbitrarily shaped massively
386 : // parallel hardware with minimal load imbalance (see notes
387 : // above for more details about this).
388 :
389 : ulong t_cnt_left = t_cnt >> 1;
390 : ulong ts = t0 + t_cnt_left;
391 :
392 : ulong l_cnt = l1-l0;
393 : ulong m_cnt = m1-m0;
394 : ulong n_cnt = n1-n0;
395 :
396 : if( FD_UNLIKELY( (m_cnt>l_cnt) & (m_cnt>n_cnt) ) ) { // Split m range
397 :
398 : // Splitting along m is more onerous than splitting along l
399 : // or n because the left and right subtask outputs would end
400 : // up clobbering each other or, if being atomically too
401 : // clever by half, having a non-deterministic result due to an
402 : // indeterminant summation order and floating point
403 : // non-associativity. So we allocate a temporary matrix near
404 : // these cores to hold the right half partial reduction such
405 : // that the left and right can do their partial reductions
406 : // independently (and deterministically) and then do a
407 : // (deterministic) thread parallel reduction of the two
408 : // results on threads [t0,t1) afterward (a level of fine
409 : // grained parallelization few even think is possible on
410 : // commodity processors).
411 :
412 : ulong m_cnt_left = (m_cnt*t_cnt_left)/t_cnt; // handle overflow here
413 : ulong ms = m0 + m_cnt_left;
414 :
415 : fd_scratch_push();
416 :
417 : float * CR = (float *)fd_scratch_alloc( 0UL, l_cnt*n_cnt*sizeof(float) );
418 :
419 : fd_tpool_exec( tpool,ts, atb_node,tpool,ts,t1, args, CR,l_cnt, l0,l1, ms,m1, n0,n1 );
420 : atb_node( tpool,t0,ts, args, C,sC, l0,l1, m0,ms, n0,n1 );
421 : fd_tpool_wait( tpool,ts );
422 :
423 : // Do C([0,l_cnt),[0,n_cnt)) += CR([0,l_cnt),[0,n_cnt)) on threads [t0,t1) here
424 :
425 : fd_scratch_pop();
426 :
427 : } else if( FD_UNLIKELY( n_cnt>l_cnt ) ) { // Split n range
428 :
429 : ulong n_cnt_left = (n_cnt*t_cnt_left)/t_cnt; // handle overflow here
430 : ulong ns = n0 + n_cnt_left;
431 :
432 : float * CR = C + sC*n_cnt_left;
433 :
434 : fd_tpool_exec( tpool,ts, atb_node,tpool,ts,t1, args, CR,sC, l0,l1, m0,m1, ns,n1 );
435 : atb_node( tpool,t0,ts, args, C,sC, l0,l1, m0,m1, n0,ns );
436 : fd_tpool_wait( tpool,ts );
437 :
438 : } else { // Split l range
439 :
440 : ulong l_cnt_left = (l_cnt*t_cnt_left)/t_cnt; // handle overflow here
441 : ulong ls = l0 + l_cnt_left;
442 :
443 : float * CR = C + l_cnt_left;
444 :
445 : fd_tpool_exec( tpool,ts, atb_node,tpool,ts,t1, args, CR,sC, ls,l1, m0,m1, n0,n1 );
446 : atb_node( tpool,t0,ts, args, C,sC, l0,ls, m0,m1, n0,n1 );
447 : fd_tpool_wait( tpool,ts );
448 :
449 : }
450 :
451 : return;
452 : }
453 :
454 : // At this point, we are at a leaf node
455 : // Do C([0,l1-l0),[0,n1-n0)) = A([m0,m1),[l0,l1))' B([m0,m1),[n0,n1)) here
456 : }
457 :
458 : End of high performance thread parallelism crash course ************/
459 :
460 : #include "../scratch/fd_scratch.h"
461 :
462 : /* FD_TPOOL_OPT_SLEEP specifies that tpool threads should sleep from the
463 : operating systems point of view when idle. Starting and stopping
464 : jobs is ~5x-10x more overhead than the default of spinning the core
465 : from the operating systems point of view while yieldig it from the
466 : physical core's point of view. */
467 :
468 84 : #define FD_TPOOL_OPT_SLEEP (1UL)
469 :
470 : /* FD_TPOOL_TASK_ARG_MAX specifies raw number of number of arguments
471 : that can be passed to the task. Different styles of task scheduling
472 : may impose additional constraints on this. */
473 :
474 : #define FD_TPOOL_TASK_ARG_MAX (43UL)
475 :
476 : /* FD_TPOOL_PARTITION partitions tasks indexed [task0,task1) over
477 : worker_cnt worker threads. On return, tasks indexed
478 : [worker_task0,worker_task1) is the range of tasks to be done by
479 : worker worker_idx.
480 :
481 : The number of tasks to a worker will be as uniform as possible, with
482 : the constraints that the tasks will be assigned to workers in
483 : monotonically increasing order and, for worker_idx<worker_cnt-1, the
484 : number of tasks assigned will be a multiple of lane_cnt in size.
485 : (That is, any final incomplete SIMD block will be assigned to worker
486 : worker_cnt-1.)
487 :
488 : Assumes task1>=task0, lane_cnt>0, worker_cnt>0 and
489 : worker_idx<worker_cnt. Performance will be fastest if lane_cnt
490 : and/or worker_cnt are an integer power-of-two (especially 1). This
491 : macro is robust. */
492 :
493 1544196955 : #define FD_TPOOL_PARTITION( task0, task1, lane_cnt, worker_idx, worker_cnt, worker_task0, worker_task1 ) do { \
494 1544196955 : ulong _ftp_task0 = (task0); \
495 1544196955 : ulong _ftp_task1 = (task1); \
496 1544196955 : ulong _ftp_lane_cnt = (lane_cnt); \
497 1544196955 : ulong _ftp_worker_idx = (worker_idx); \
498 1544196955 : ulong _ftp_worker_cnt = (worker_cnt); \
499 1544196955 : ulong _ftp_task_cnt = _ftp_task1 - _ftp_task0; \
500 1544196955 : ulong _ftp_block_cnt = _ftp_task_cnt / _ftp_lane_cnt; /* Num complete simd blocks, typically nop or fast shr */ \
501 1544196955 : ulong _ftp_block_rem = _ftp_task_cnt % _ftp_lane_cnt; /* Number of leftovers, typically nop or fast mask */ \
502 1544196955 : ulong _ftp_worker_block_min = _ftp_block_cnt / _ftp_worker_cnt; /* Min complete simd blocks for a worker */ \
503 1544196955 : ulong _ftp_worker_extra_cnt = _ftp_block_cnt % _ftp_worker_cnt; /* Num workers needing an extra complete simd block */ \
504 1544196955 : ulong _ftp_worker_task0 = _ftp_task0 \
505 1544196955 : + _ftp_lane_cnt*(_ftp_worker_block_min*_ftp_worker_idx + fd_ulong_min(_ftp_worker_idx,_ftp_worker_extra_cnt)); \
506 1544196955 : ulong _ftp_worker_task1 = _ftp_worker_task0 \
507 1544196955 : + _ftp_lane_cnt*(_ftp_worker_block_min + ((ulong)(_ftp_worker_idx<_ftp_worker_extra_cnt))) \
508 1544196955 : + fd_ulong_if( _ftp_worker_idx==(_ftp_worker_cnt-1UL), _ftp_block_rem, 0UL ); \
509 1544196955 : (worker_task0) = _ftp_worker_task0; \
510 1544196955 : (worker_task1) = _ftp_worker_task1; \
511 1544196955 : } while(0)
512 :
513 : /* A fd_tpool_t is an opaque handle of a thread pool */
514 :
515 : struct fd_tpool_private;
516 : typedef struct fd_tpool_private fd_tpool_t;
517 :
518 : /* A fd_tpool_task_t is the function signature used for the entry
519 : point of a task. Users are free to repurpose these arguments however
520 : they see fit for an individual executions but the number of
521 : arguments, types of argument and names used below reflect the intent
522 : described in the above crash course. Namely, this is adequate to map
523 : arbitrary level 3 BLAS dense matrix-matrix multiply with arbitrary
524 : shaped matrices to an arbitrary topology set of tiles ultra quickly
525 : and ultra tightly. Bulk dispatchers like tpool_exec_all might apply
526 : some additional conventions to some of these arguments. */
527 :
528 : typedef void
529 : (*fd_tpool_task_t)( void * tpool,
530 : ulong t0, ulong t1,
531 : void * args,
532 : void * reduce, ulong stride,
533 : ulong l0, ulong l1,
534 : ulong m0, ulong m1,
535 : ulong n0, ulong n1 );
536 :
537 : /* A fd_tpool_task_v2_t is the function signature used for entry
538 : point of a v2 task. This allows passing a lot more args than the
539 : original fd_tpool_task_t. tpool on entry will be the calling tpool
540 : and worker_idx will be the tpool worker running the task. arg_cnt is
541 : the number of user provided args (in [0,FD_TPOOL_TASK_ARG_MAX]) and
542 : arg[i] for i in [0,arg_cnt) are the i-th user argument. */
543 :
544 : typedef void
545 : (*fd_tpool_task_v2_t)( fd_tpool_t * tpool,
546 : ulong worker_idx,
547 : ulong arg_cnt,
548 : ulong const * arg );
549 :
550 : /* Private APIs *******************************************************/
551 :
552 : /* These are exposed here to facilitate inlining various operations in
553 : high performance contexts. */
554 :
555 : struct __attribute__((aligned(128))) fd_tpool_private_worker {
556 : uint seq0; /* Dispatch rd/wr (ideally wr-only), worker rd-only */
557 : uint arg_cnt; /* UINT_MAX for a v1 task, or arg count or a v2 task (in [0,FD_TPOOL_TASK_ARG_MAX]) */
558 : ulong task; /* 0 to halt worker, fd_tpool_task_t if arg_cnt==UINT_MAX, fd_tpool_task_v2_t otherwise */
559 : ulong arg[ FD_TPOOL_TASK_ARG_MAX ]; /* task arguments, indexed [0,arg_cnt) */
560 : uint seq1; /* Dispatch rd-only, worker wr-only (different cache line pair than seq0, arg_cnt,
561 : task and the most commonly used leading parts of arg) */
562 : uint tile_idx; /* rd-only (after init) */
563 : ulong lock; /* For use by sleep workers */
564 : ulong wake; /* For use by sleep workers */
565 : };
566 :
567 : typedef struct fd_tpool_private_worker fd_tpool_private_worker_t;
568 :
569 : struct fd_tpool_private {
570 :
571 : /* This point is 128 aligned and preceded by the worker0 sentinel */
572 :
573 : ulong opt; /* Bit-or of FD_TPOOL_OPTs */
574 : uint worker_max; /* Positive */
575 : uint worker_cnt; /* in [1,worker_max] */
576 :
577 : /* worker_max element fd_tpool_private_worker_t * array here, indexed
578 : [0,worker_cnt). worker[0] points to worker0 above. Note that we
579 : cannot use a flex array here because strict C++17 doesn't support
580 : it and we use C++ in fd_tpool.cxx to handle people using C++
581 : libraries that throw exceptions that are uncaught ... sigh. */
582 :
583 : };
584 :
585 : FD_PROTOTYPES_BEGIN
586 :
587 : FD_STATIC_ASSERT( FD_TILE_MAX<2048UL, update_implementation );
588 :
589 : /* fd_tpool_private_worker0 returns a pointer in the local address space
590 : to the worker0 sentinel */
591 :
592 : FD_FN_CONST static inline fd_tpool_private_worker_t *
593 3087 : fd_tpool_private_worker0( fd_tpool_t const * tpool ) {
594 3087 : return ((fd_tpool_private_worker_t *)tpool)-1;
595 3087 : }
596 :
597 : /* fd_tpool_private_worker returns a pointer in the local address space
598 : of the first element of the tpool's worker array. */
599 :
600 : FD_FN_CONST static inline fd_tpool_private_worker_t **
601 56906384 : fd_tpool_private_worker( fd_tpool_t const * tpool ) {
602 56906384 : return (fd_tpool_private_worker_t **)(tpool+1);
603 56906384 : }
604 :
605 : FD_FN_CONST static inline ulong /* Returns number of elements to left side */
606 30244699 : fd_tpool_private_split( ulong n ) { /* Assumes n>1 */
607 : # if 0 /* Simple splitting */
608 : return n>>1;
609 : # else /* NUMA aware splitting */
610 : /* This split has the property that the left side >= the right side
611 : and one of the splits is the largest power of two smaller than n.
612 : It results in building a balanced tree (the same as the simple
613 : split) but with all the leaf nodes concentrated to toward the left
614 : when n isn't a power of two (and that all the nodes are easily
615 : contiguously indexable). This can yield a slight reduction of the
616 : number of messages that might have to cross a NUMA boundary in many
617 : common usage scenarios. */
618 30244699 : int b = fd_ulong_find_msb( n );
619 30244699 : ulong m = 1UL << (b-1);
620 30244699 : return fd_ulong_if( !(n & m), n-m, m<<1 );
621 30244699 : # endif
622 30244699 : }
623 :
624 : #if FD_HAS_THREADS
625 :
626 : /* fd_tpool_private_wake will wake worker. Assumes worker is an idle
627 : sleeper. This implies among other things worker's tpool was created
628 : with FD_TPOOL_OPT_SLEEP, the caller is not the worker and the worker
629 : is not tpool worker 0. */
630 :
631 : void
632 : fd_tpool_private_wake( fd_tpool_private_worker_t * worker );
633 :
634 : #else
635 :
636 : static inline void
637 : fd_tpool_private_wake( fd_tpool_private_worker_t * worker ) {
638 : (void)worker;
639 : }
640 :
641 : #endif
642 :
643 : FD_PROTOTYPES_END
644 :
645 : /* End of private APIs ************************************************/
646 :
647 : /* FD_TPOOL_{ALIGN,FOOTPRINT} return the alignment and footprint
648 : required for a memory region to be used as a tpool. ALIGN will a
649 : integer power of two of at most 4096 and FOOTPRINT will be a multiple
650 : of ALIGN. worker_max is assumed to be valid (e.g. in
651 : [1,FD_TILE_MAX]. These are provided to facilitate compile time
652 : construction and for consistency with other constructors. (FIXME:
653 : consider using FD_LAYOUT here.) */
654 :
655 477279 : #define FD_TPOOL_ALIGN (128UL)
656 0 : #define FD_TPOOL_FOOTPRINT( worker_max ) ( ( sizeof(fd_tpool_private_worker_t) /* worker0 sentinel */ \
657 0 : + sizeof(fd_tpool_t) + /* tpool header */ \
658 0 : + ((ulong)(worker_max))*sizeof(fd_tpool_private_worker_t *) /* worker array */ \
659 0 : + FD_TPOOL_ALIGN-1UL ) & (~(FD_TPOOL_ALIGN-1UL)) )
660 :
661 : FD_PROTOTYPES_BEGIN
662 :
663 : /* fd_tpool_align returns FD_TPOOL_ALIGN. fd_tpool_footprint returns
664 : FD_TPOOL_FOOTPRINT( worker_max ) if worker_max is in [1,FD_TILE_MAX]
665 : or 0 otherwise. */
666 :
667 : FD_FN_CONST ulong fd_tpool_align( void );
668 : FD_FN_CONST ulong fd_tpool_footprint( ulong worker_max );
669 :
670 : /* fd_tpool_init formats a memory region mem with the appropriate
671 : alignment and footprint as a thread pool that can support up to
672 : worker_max worker threads. worker max must be in [1,FD_TILE_MAX].
673 : opt is a bit or of FD_TPOOL_OPT specifying additional behaviors of
674 : this tpool. Returns a handle for the tpool (this is not a simple
675 : cast of mem) on success and NULL on failure (logs details). On a
676 : success return, worker 0 will already exist. Many threads can
677 : temporarily assume the identity of worker 0 as the situation merits.
678 : Worker 0 is typically the thread that starts dispatches to other
679 : worker threads in an operation and can also flexibly participates in
680 : the tpool in bulk operations. This uses init/fini semantics instead
681 : of new/join/leave/delete semantics because thread pools aren't
682 : meaningfully sharable between processes / thread groups. This
683 : function acts as a compiler memory fence. */
684 :
685 : fd_tpool_t *
686 : fd_tpool_init( void * mem,
687 : ulong worker_max,
688 : ulong opt );
689 :
690 : /* fd_tpool_fini pops all worker threads pushed into the tpool and
691 : unformats the underlying memory region. This should be called at
692 : most once by a thread that was not pushed into the tpool (e.g. a
693 : "worker 0" thread) and no other operations on the tpool should be in
694 : progress when this is called or done after this is called. Returns
695 : the memory region used by the tpool on success (this is not a simple
696 : cast of mem) and NULL on failure (logs details). This function acts
697 : as a compiler memory fence. */
698 :
699 : void *
700 : fd_tpool_fini( fd_tpool_t * tpool );
701 :
702 : /* fd_tpool_worker_push pushes tile tile_idx into the tpool. tile_idx
703 : 0, the calling tile, and tiles that have already been pushed into the
704 : tpool cannot be pushed. Further, tile_idx should be idle and no
705 : other tile operations should be done it while it is a part of the
706 : tpool.
707 :
708 : Returns tpool on success (tile tile_idx is part of the tpool and will
709 : be considered as executing from tile's point of view while a member
710 : of the tpool ... no tile operations can or should be done on it while
711 : it is part of the tpool) and NULL on failure (logs details). Reasons
712 : for failure include NULL tpool, bad tile_idx, tile was not idle,
713 : etc).
714 :
715 : No other operations on tpool should be in process when this is called
716 : or started while this is running. This function acts as a compiler
717 : memory fence. */
718 :
719 : fd_tpool_t *
720 : fd_tpool_worker_push( fd_tpool_t * tpool,
721 : ulong tile_idx );
722 :
723 : /* fd_tpool_worker_pop pops the most recently pushed tpool worker
724 : thread.
725 :
726 : Returns tpool on success (the tile is no longer part of the tpool and
727 : considered idle from tile's POV and can be used for other purposes)
728 : and NULL on failure (logs details). Reasons for failure include NULL
729 : tpool, bad tile_idx, tile was not idle, etc).
730 :
731 : No other operations on the tpool should be in process when this is
732 : called or started while this is running. This function acts as a
733 : compiler memory fence. */
734 :
735 : fd_tpool_t *
736 : fd_tpool_worker_pop( fd_tpool_t * tpool );
737 :
738 : /* Accessors. As these are used in high performance contexts, these do
739 : no input argument checking. Specifically, they assume tpool is valid
740 : and (if applicable) worker_idx in [0,worker_cnt). worker 0 is
741 : special. The tile_idx for worker 0 is always returuned as 0. */
742 :
743 3072 : FD_FN_PURE static inline ulong fd_tpool_opt ( fd_tpool_t const * tpool ) { return tpool->opt; }
744 6264 : FD_FN_PURE static inline ulong fd_tpool_worker_cnt( fd_tpool_t const * tpool ) { return (ulong)tpool->worker_cnt; }
745 3114 : FD_FN_PURE static inline ulong fd_tpool_worker_max( fd_tpool_t const * tpool ) { return (ulong)tpool->worker_max; }
746 :
747 : FD_FN_PURE static inline ulong
748 : fd_tpool_worker_tile_idx( fd_tpool_t const * tpool,
749 3093 : ulong worker_idx ) {
750 3093 : return (ulong)fd_tpool_private_worker( tpool )[ worker_idx ]->tile_idx;
751 3093 : }
752 :
753 : /* fd_tpool_worker_idle atomically observes the state of tpool thread
754 : worker_idx at some point in time between when the call was made and
755 : the call returns. As this is used in high performance contexts, does
756 : no input argument checking. Specifically, assumes tpool is valid and
757 : worker_idx is in [0,worker_cnt). Return value if 0/1 indicates the
758 : worker was not idle / idle when observed. worker 0 is special and is
759 : never considered idle. This function acts as a compiler memory
760 : fence. */
761 :
762 : static inline int
763 : fd_tpool_worker_idle( fd_tpool_t const * tpool,
764 3186 : ulong worker_idx ) {
765 3186 : fd_tpool_private_worker_t * worker = fd_tpool_private_worker( tpool )[ worker_idx ];
766 3186 : uint const * _seq0 = &worker->seq0;
767 3186 : uint const * _seq1 = &worker->seq1;
768 :
769 3186 : uint seq1;
770 :
771 3186 : FD_COMPILER_MFENCE();
772 3186 : uint seq0 = *_seq0;
773 3186 : for(;;) {
774 3186 : FD_COMPILER_MFENCE();
775 3186 : /**/ seq1 = *_seq1;
776 3186 : FD_COMPILER_MFENCE();
777 3186 : uint seq2 = *_seq0;
778 3186 : FD_COMPILER_MFENCE();
779 3186 : if( FD_LIKELY( seq2==seq0 ) ) break;
780 :
781 : /* Looks like another thread scheduled a task for the worker while
782 : we were trying to observe the worker's state. Try again after a
783 : brief pause. */
784 :
785 0 : FD_SPIN_PAUSE();
786 0 : seq0 = seq2;
787 0 : }
788 :
789 3186 : return seq0==seq1;
790 3186 : }
791 :
792 : /* fd_tpool_exec calls
793 :
794 : task( task_tpool,
795 : task_t0, task_t1,
796 : task_args,
797 : task_reduce, task_stride,
798 : task_l0, task_l1,
799 : task_m0, task_m1,
800 : task_n0, task_n1 );
801 :
802 : on tpool thread worker_idx. This will run concurrently with the
803 : caller. Uncaught exceptions in task will be logged by the remote
804 : thread (in principle, that is assuming the uncaught exception did not
805 : leave the application an unstable state, thread worker_idx will still
806 : be usable for additional fd_tpool_exec). As this is used in high
807 : performance contexts, does no input argument checking. Specifically,
808 : assumes tpool is valid, worker_idx in (0,worker_cnt) (yes, open on
809 : both ends), caller is not tpool thread worker_idx, task is valid.
810 : worker_idx 0 is special and is considered to always be in the EXEC
811 : state so we cannot call exec on it. This function acts as a compiler
812 : memory fence. */
813 :
814 : static inline void
815 : fd_tpool_exec( fd_tpool_t * tpool, ulong worker_idx,
816 : fd_tpool_task_t task,
817 : void * task_tpool,
818 : ulong task_t0, ulong task_t1,
819 : void * task_args,
820 : void * task_reduce, ulong task_stride,
821 : ulong task_l0, ulong task_l1,
822 : ulong task_m0, ulong task_m1,
823 8969167 : ulong task_n0, ulong task_n1 ) {
824 8969167 : fd_tpool_private_worker_t * worker = fd_tpool_private_worker( tpool )[ worker_idx ];
825 8969167 : uint seq0 = worker->seq0 + 1U;
826 8969167 : worker->arg_cnt = UINT_MAX; /* Signal fd_tpool_exec dispatch */
827 8969167 : worker->task = (ulong)task;
828 8969167 : worker->arg[ 0] = (ulong)task_tpool;
829 8969167 : worker->arg[ 1] = task_t0; worker->arg[ 2] = task_t1;
830 8969167 : worker->arg[ 3] = (ulong)task_args;
831 8969167 : worker->arg[ 4] = (ulong)task_reduce; worker->arg[ 5] = task_stride;
832 8969167 : worker->arg[ 6] = task_l0; worker->arg[ 7] = task_l1;
833 8969167 : worker->arg[ 8] = task_m0; worker->arg[ 9] = task_m1;
834 8969167 : worker->arg[10] = task_n0; worker->arg[11] = task_n1;
835 8969167 : FD_COMPILER_MFENCE();
836 8969167 : worker->seq0 = seq0;
837 8969167 : FD_COMPILER_MFENCE();
838 8969167 : if( FD_UNLIKELY( tpool->opt & FD_TPOOL_OPT_SLEEP ) ) fd_tpool_private_wake( worker );
839 8969167 : }
840 :
841 : /* fd_tpool_wait waits for the tpool thread worker_idx to leave the
842 : EXEC state. As this is used in high performance contexts, this does
843 : no input argument checking. Specifically, assumes tpool is valid,
844 : worker_idx in (0,worker_cnt) (yes, open on both ends ... worker_idx 0
845 : is considered to always be in an exec state so we should not call
846 : wait on it), caller is not tpool thread worker_idx and no other
847 : caller will attempt to schedule a task on worker_idx while waiting.
848 : On return, the caller atomically observed worker_idx was not in the
849 : EXEC state at some point between when this was called and when this
850 : returned. This function acts as a compiler memory fence. */
851 :
852 : static inline void
853 : fd_tpool_wait( fd_tpool_t const * tpool,
854 29465939 : ulong worker_idx ) {
855 29465939 : fd_tpool_private_worker_t * worker = fd_tpool_private_worker( tpool )[ worker_idx ];
856 29465939 : uint const * _seq1 = &worker->seq1;
857 29465939 : uint seq0 = worker->seq0;
858 7395713838 : for(;;) {
859 7395713838 : FD_COMPILER_MFENCE();
860 7395713838 : uint seq1 = *_seq1;
861 7395713838 : FD_COMPILER_MFENCE();
862 7395713838 : if( FD_LIKELY( seq0==seq1 ) ) break;
863 7364010510 : FD_SPIN_PAUSE();
864 7364010510 : }
865 29465939 : }
866 :
867 : /* Assuming the tasks can be executed safely in any order and/or
868 : concurrently, fd_tpool_exec_all_rrobin, fd_tpool_exec_all_block and
869 : fd_tpool_exec_all_taskq are functionally equivalent to:
870 :
871 : for( ulong l=l0; l<l1; l++ )
872 : task( task_tpool,t0,t1, task_args,task_reduce,task_stride, task_l0,task_l1, l,l+1, t,t+1 );
873 :
874 : where t indicates to which tpool worker thread idx the particular
875 : task was assigned (and thus t is in [t0,t1), where thread t0 is the
876 : thread that did the dispatch). The rrobin variant stripes individual
877 : tasks deterministically over worker threads (e.g. worker thread t
878 : does tasks (t-t0)+0*(t1-t0),(t-t0)+1*(t1-t0),(t-t0)+2*(t1-t0), ...
879 : The block variant blocks individual tasks over worker threads.
880 :
881 : The taskq variant requires FD_HAS_ATOMIC support and assigns tasks to
882 : threads dynamically. Practically, this is only useful if there are a
883 : huge number of tasks to execute relative to the number of threads,
884 : the tasks to execute have highly non-uniform sizes, the cost to
885 : execute a task is much much greater than the cost of a single atomic
886 : memory operation, and thread-core affinity doesn't impact the time to
887 : execute a task ... conditions that, in total, are met far less
888 : frequently than most developers expect.)
889 :
890 : fd_tpool_exec_all_batch is functionally equivalent to:
891 :
892 : for( ulong t=t0; t<t1; t++ ) {
893 : ulong batch_task_l0;
894 : ulong batch_task_l1;
895 : FD_TPOOL_PARTITION( task_l0,task_l1,1, t-t0,t1-t0, batch_task_l0,batch_task_l1 );
896 : task( task_tpool,t0,t1, task_args,task_reduce,task_stride, task_l0,task_l1, batch_task_l0,batch_task_l1, t,t+1 );
897 : }
898 :
899 : The batch assigned to a thread will be the same tasks as those
900 : assigned to a thread by fd_tpool_exec_all_block. The difference is
901 : that fd_tpool_exec_all_batch effectively merges all the calls to task
902 : that fd_tpool_exec_all_block would make in a block into a single call
903 : per thread.
904 :
905 : fd_tpool_exec_all_raw is functionally equivalent to:
906 :
907 : for( ulong t=t0; t<t1; t++ )
908 : task( task_tpool,t0,t1, task_args,task_reduce,task_stride, task_l0,task_l1, task_l0,task_l1, t,t+1 );
909 :
910 : This allows the caller to use their own partitioning strategies with
911 : minimal overhead.
912 :
913 : All these are executed thread parallel using the calling thread and
914 : tpool worker threads (t0,t1) (yes, open on both ends ... the caller
915 : masquerades as worker thread t0 if isn't actually worker thread t0 as
916 : far as exec_all is concerned ... see safety tip above about scratch).
917 : The caller should not be any of worker threads (t0,t1) and worker
918 : threads (t0,t1) should be idle on entry and should not be dispatched
919 : to while an exec_all is running.
920 :
921 : As such, in all of these, a task knows automatically which worker
922 : thread is processing this (t), the range of tasks assigned to it
923 : (e.g. [batch_task_l0,batch_task_l1)), the entire range of workers
924 : [t0,t1) in use, the entire range of tasks [task_l0,task_l1) as well
925 : as the original values for task_tpool, task_args, task_reduce,
926 : task_stride.
927 :
928 : As this is used in high performance contexts, this does no input
929 : argument checking. Specifically, it assumes tpool is valid, task is
930 : valid, 0<=t0<t1<=worker_cnt, l0<=l1. These functions act as a
931 : compiler memory fence. */
932 :
933 : #define FD_TPOOL_EXEC_ALL_DECL(style) \
934 : void \
935 : fd_tpool_private_exec_all_##style##_node( void * _node_tpool, \
936 : ulong node_t0, ulong node_t1, \
937 : void * args, \
938 : void * reduce, ulong stride, \
939 : ulong l0, ulong l1, \
940 : ulong _task, ulong _tpool, \
941 : ulong t0, ulong t1 ); \
942 : \
943 : static inline void \
944 : fd_tpool_exec_all_##style( fd_tpool_t * tpool, \
945 : ulong t0, ulong t1, \
946 : fd_tpool_task_t task, \
947 : void * task_tpool, \
948 : void * task_args, \
949 : void * task_reduce, ulong task_stride, \
950 2797440 : ulong task_l0, ulong task_l1 ) { \
951 2797440 : FD_COMPILER_MFENCE(); \
952 2797440 : fd_tpool_private_exec_all_##style##_node( tpool, t0,t1, task_args, task_reduce,task_stride, task_l0,task_l1, \
953 2797440 : (ulong)task,(ulong)task_tpool, t0,t1 ); \
954 2797440 : }
955 :
956 : FD_TPOOL_EXEC_ALL_DECL(rrobin)
957 : FD_TPOOL_EXEC_ALL_DECL(block)
958 : FD_TPOOL_EXEC_ALL_DECL(batch)
959 : FD_TPOOL_EXEC_ALL_DECL(raw)
960 :
961 : #if FD_HAS_ATOMIC
962 : void
963 : fd_tpool_private_exec_all_taskq_node( void * _node_tpool,
964 : ulong node_t0, ulong node_t1,
965 : void * args,
966 : void * reduce, ulong stride,
967 : ulong l0, ulong l1,
968 : ulong _task, ulong _l_next,
969 : ulong t0, ulong t1 );
970 :
971 : static inline void
972 : fd_tpool_exec_all_taskq( fd_tpool_t * tpool,
973 : ulong t0, ulong t1,
974 : fd_tpool_task_t task,
975 : void * task_tpool,
976 : void * task_args,
977 : void * task_reduce, ulong task_stride,
978 300000 : ulong task_l0, ulong task_l1 ) {
979 300000 : ulong l_next[16] __attribute((aligned(128)));
980 300000 : l_next[0] = task_l0;
981 300000 : l_next[1] = (ulong)task_tpool;
982 300000 : FD_COMPILER_MFENCE();
983 300000 : fd_tpool_private_exec_all_taskq_node( tpool, t0,t1, task_args, task_reduce,task_stride, task_l0,task_l1,
984 300000 : (ulong)task,(ulong)l_next, t0,t1 );
985 300000 : }
986 : #endif
987 :
988 : #undef FD_TPOOL_EXEC_ALL_DECL
989 :
990 : #include "fd_map_reduce.h"
991 :
992 : FD_PROTOTYPES_END
993 :
994 : #endif /* HEADER_fd_src_util_tpool_fd_tpool_h */
|