LCOV - code coverage report
Current view: top level - util/tpool - fd_tpool.inc (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 237 258 91.9 %
Date: 2026-03-31 06:22:16 Functions: 12 13 92.3 %

          Line data    Source code
       1             : #include "fd_tpool.h"
       2             : 
       3             : #if FD_HAS_THREADS
       4             : #include <pthread.h>
       5             : #endif
       6             : 
       7             : struct fd_tpool_private_worker_cfg {
       8             :   fd_tpool_t * tpool;
       9             :   ulong        tile_idx;
      10             : };
      11             : 
      12             : typedef struct fd_tpool_private_worker_cfg fd_tpool_private_worker_cfg_t;
      13             : 
      14             : static int
      15             : fd_tpool_private_worker_( int     argc,
      16          84 :                           char ** argv ) {
      17          84 :   ulong                           worker_idx = (ulong)(uint)argc;
      18          84 :   fd_tpool_private_worker_cfg_t * cfg        = (fd_tpool_private_worker_cfg_t *)argv;
      19             : 
      20          84 :   fd_tpool_t * tpool    = cfg->tpool;
      21          84 :   ulong        tile_idx = cfg->tile_idx;
      22             : 
      23             :   /* We are BOOT */
      24             : 
      25          84 :   fd_tpool_private_worker_t worker[1];
      26             : 
      27          84 :   memset( worker, 0, sizeof(fd_tpool_private_worker_t) );
      28             : 
      29          84 :   worker->tile_idx = (uint) tile_idx;
      30             : 
      31          84 : # if FD_HAS_THREADS
      32          84 :   int sleeper = !!(tpool->opt & FD_TPOOL_OPT_SLEEP);
      33             : 
      34          84 :   pthread_mutex_t lock[1];
      35          84 :   pthread_cond_t  wake[1];
      36             : 
      37          84 :   if( FD_UNLIKELY( sleeper ) ) {
      38           0 :     if( FD_UNLIKELY( pthread_mutex_init( lock, NULL ) ) ) FD_LOG_ERR(( "pthread_mutex_init failed" ));
      39           0 :     if( FD_UNLIKELY( pthread_cond_init ( wake, NULL ) ) ) FD_LOG_ERR(( "pthread_cond_init failed"  ));
      40           0 :     if( FD_UNLIKELY( pthread_mutex_lock( lock       ) ) ) FD_LOG_ERR(( "pthread_mutex_lock failed" ));
      41           0 :   }
      42             : 
      43          84 :   worker->lock = (ulong)lock;
      44          84 :   worker->wake = (ulong)wake;
      45          84 : # endif
      46             : 
      47          84 :   FD_COMPILER_MFENCE();
      48             : 
      49          84 :   fd_tpool_private_worker( tpool )[ worker_idx ] = worker;
      50             : 
      51          84 :   ulong const * arg  = worker->arg;
      52          84 :   uint          seq1 = worker->seq1;
      53             : 
      54  2083355041 :   for(;;) {
      55             : 
      56             :     /* We are IDLE ... see what we should do next */
      57             : 
      58  2083355041 : #   if FD_HAS_THREADS
      59  2083355041 :     if( FD_UNLIKELY( sleeper ) && FD_UNLIKELY( pthread_cond_wait( wake, lock ) ) )
      60           0 :       FD_LOG_WARNING(( "pthread_cond_wait failed; attempting to continue" ));
      61  2083355041 : #   endif
      62             : 
      63  2083355041 :     FD_COMPILER_MFENCE();
      64  2083355041 :     uint  seq0     = worker->seq0;
      65  2083355041 :     FD_COMPILER_MFENCE();
      66  2083355041 :     uint  _arg_cnt = worker->arg_cnt;
      67  2083355041 :     ulong _task    = worker->task;
      68  2083355041 :     FD_COMPILER_MFENCE();
      69             : 
      70  2083355041 :     if( FD_UNLIKELY( seq0==seq1 ) ) { /* Got idle */
      71  2040486659 :       FD_SPIN_PAUSE();
      72  2040486659 :       continue;
      73  2040486659 :     }
      74             : 
      75    42868382 :     if( FD_UNLIKELY( !_task ) ) break; /* Got halt */
      76             : 
      77             :     /* We are EXEC ... do the task and then transition to IDLE */
      78             : 
      79    42868298 : #   ifdef __cplusplus
      80    42868298 :     try {
      81    42868298 : #   endif
      82             : 
      83    42868298 :       if( _arg_cnt==UINT_MAX ) {
      84             : 
      85     8673517 :         fd_tpool_task_t task = (fd_tpool_task_t)_task;
      86             : 
      87     8673517 :         void * task_tpool  = (void *)arg[ 0];
      88     8673517 :         ulong  task_t0     =         arg[ 1]; ulong task_t1     = arg[ 2];
      89     8673517 :         void * task_args   = (void *)arg[ 3];
      90     8673517 :         void * task_reduce = (void *)arg[ 4]; ulong task_stride = arg[ 5];
      91     8673517 :         ulong  task_l0     =         arg[ 6]; ulong task_l1     = arg[ 7];
      92     8673517 :         ulong  task_m0     =         arg[ 8]; ulong task_m1     = arg[ 9];
      93     8673517 :         ulong  task_n0     =         arg[10]; ulong task_n1     = arg[11];
      94             : 
      95     8673517 :         task( task_tpool,task_t0,task_t1, task_args, task_reduce,task_stride, task_l0,task_l1, task_m0,task_m1, task_n0,task_n1 );
      96             : 
      97    34194781 :       } else {
      98             : 
      99    34194781 :         fd_tpool_task_v2_t task = (fd_tpool_task_v2_t)_task;
     100             : 
     101    34194781 :         task( tpool, worker_idx, (ulong)_arg_cnt, arg );
     102             : 
     103    34194781 :       }
     104             : 
     105    42868298 : #   ifdef __cplusplus
     106    42868298 :     } catch( ... ) {
     107           0 :       FD_LOG_WARNING(( "uncaught exception; attempting to continue" ));
     108           0 :     }
     109    42868298 : #   endif
     110             : 
     111    42868298 :     FD_COMPILER_MFENCE();
     112             : 
     113    28660983 :     worker->seq1 = seq0;
     114    28660983 :     seq1 = seq0;
     115    28660983 :   }
     116             : 
     117             :   /* We are HALT ... clean up and terminate */
     118             : 
     119           5 : # if FD_HAS_THREADS
     120           5 :   if( FD_UNLIKELY( sleeper ) ) {
     121           0 :     if( FD_UNLIKELY( pthread_mutex_unlock ( lock ) ) ) FD_LOG_WARNING(( "pthread_mutex_unlock failed; attempting to continue" ));
     122           0 :     if( FD_UNLIKELY( pthread_cond_destroy ( wake ) ) ) FD_LOG_WARNING(( "pthread_cond_destroy failed; attempting to continue" ));
     123           0 :     if( FD_UNLIKELY( pthread_mutex_destroy( lock ) ) ) FD_LOG_WARNING(( "pthread_mutex_destroy failed; attempting to continue" ));
     124           0 :   }
     125           5 : # endif
     126             : 
     127           5 :   return 0;
     128          84 : }
     129             : 
     130             : #if FD_HAS_THREADS
     131             : void
     132           0 : fd_tpool_private_wake( fd_tpool_private_worker_t * worker ) {
     133           0 :   pthread_mutex_t * lock = (pthread_mutex_t *)worker->lock;
     134           0 :   pthread_cond_t *  wake = (pthread_cond_t  *)worker->wake;
     135           0 :   if( FD_UNLIKELY( pthread_mutex_lock  ( lock ) ) ) FD_LOG_WARNING(( "pthread_mutex_lock failed; attempting to continue" ));
     136           0 :   if( FD_UNLIKELY( pthread_cond_signal ( wake ) ) ) FD_LOG_WARNING(( "pthread_cond_signal failed; attempting to continue" ));
     137           0 :   if( FD_UNLIKELY( pthread_mutex_unlock( lock ) ) ) FD_LOG_WARNING(( "pthread_mutex_unlock failed; attempting to continue" ));
     138           0 : }
     139             : #endif
     140             : 
     141             : ulong
     142        3105 : fd_tpool_align( void ) {
     143        3105 :   return FD_TPOOL_ALIGN;
     144        3105 : }
     145             : 
     146             : ulong
     147     3003099 : fd_tpool_footprint( ulong worker_max ) {
     148     3003099 :   if( FD_UNLIKELY( !((1UL<=worker_max) & (worker_max<=FD_TILE_MAX)) ) ) return 0UL;
     149      474186 :   return fd_ulong_align_up( sizeof(fd_tpool_private_worker_t) +
     150      474186 :                             sizeof(fd_tpool_t) + worker_max*sizeof(fd_tpool_private_worker_t *), FD_TPOOL_ALIGN );
     151     3003099 : }
     152             : 
     153             : fd_tpool_t *
     154             : fd_tpool_init( void * mem,
     155             :                ulong  worker_max,
     156        3105 :                ulong  opt ) {
     157             : 
     158        3105 :   FD_COMPILER_MFENCE();
     159             : 
     160        3105 :   if( FD_UNLIKELY( !mem ) ) {
     161           3 :     FD_LOG_WARNING(( "NULL mem" ));
     162           3 :     return NULL;
     163           3 :   }
     164             : 
     165        3102 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, fd_tpool_align() ) ) ) {
     166           3 :     FD_LOG_WARNING(( "bad alignment" ));
     167           3 :     return NULL;
     168           3 :   }
     169             : 
     170        3099 :   ulong footprint = fd_tpool_footprint( worker_max );
     171        3099 :   if( FD_UNLIKELY( !footprint ) ) {
     172           6 :     FD_LOG_WARNING(( "bad worker_max" ));
     173           6 :     return NULL;
     174           6 :   }
     175             : 
     176        3093 :   fd_memset( mem, 0, footprint );
     177             : 
     178        3093 :   fd_tpool_private_worker_t * worker0 = (fd_tpool_private_worker_t *)mem;
     179             : 
     180        3093 :   worker0->seq0 = 1U;
     181        3093 :   worker0->seq1 = 0U;
     182             : 
     183        3093 :   fd_tpool_t * tpool = (fd_tpool_t *)(worker0+1);
     184             : 
     185        3093 :   tpool->opt        = opt;
     186        3093 :   tpool->worker_max = (uint)worker_max;
     187        3093 :   tpool->worker_cnt = 1U;
     188             : 
     189        3093 :   FD_COMPILER_MFENCE();
     190        3093 :   fd_tpool_private_worker( tpool )[0] = worker0;
     191        3093 :   FD_COMPILER_MFENCE();
     192             : 
     193        3093 :   return tpool;
     194        3099 : }
     195             : 
     196             : void *
     197        3096 : fd_tpool_fini( fd_tpool_t * tpool ) {
     198             : 
     199        3096 :   FD_COMPILER_MFENCE();
     200             : 
     201        3096 :   if( FD_UNLIKELY( !tpool ) ) {
     202           3 :     FD_LOG_WARNING(( "NULL tpool" ));
     203           3 :     return NULL;
     204           3 :   }
     205             : 
     206        3156 :   while( fd_tpool_worker_cnt( tpool )>1UL ) {
     207          63 :     if( FD_UNLIKELY( !fd_tpool_worker_pop( tpool ) ) ) {
     208           0 :       FD_LOG_WARNING(( "fd_tpool_worker_pop failed" ));
     209           0 :       return NULL;
     210           0 :     }
     211          63 :   }
     212             : 
     213        3093 :   return (void *)fd_tpool_private_worker0( tpool );
     214        3093 : }
     215             : 
     216             : fd_tpool_t *
     217             : fd_tpool_worker_push( fd_tpool_t * tpool,
     218         120 :                       ulong        tile_idx ) {
     219             : 
     220         120 :   FD_COMPILER_MFENCE();
     221             : 
     222         120 :   if( FD_UNLIKELY( !tpool ) ) {
     223           3 :     FD_LOG_WARNING(( "NULL tpool" ));
     224           3 :     return NULL;
     225           3 :   }
     226             : 
     227         117 :   if( FD_UNLIKELY( !tile_idx ) ) {
     228           3 :     FD_LOG_WARNING(( "cannot push tile_idx 0" ));
     229           3 :     return NULL;
     230           3 :   }
     231             : 
     232         114 :   if( FD_UNLIKELY( tile_idx==fd_tile_idx() ) ) {
     233           3 :     FD_LOG_WARNING(( "cannot push self" ));
     234           3 :     return NULL;
     235           3 :   }
     236             : 
     237         111 :   if( FD_UNLIKELY( tile_idx>=fd_tile_cnt() ) ) {
     238           3 :     FD_LOG_WARNING(( "invalid tile_idx" ));
     239           3 :     return NULL;
     240           3 :   }
     241             : 
     242         108 :   fd_tpool_private_worker_t ** worker     = fd_tpool_private_worker( tpool );
     243         108 :   ulong                        worker_cnt = (ulong)tpool->worker_cnt;
     244             : 
     245         108 :   if( FD_UNLIKELY( worker_cnt>=(ulong)tpool->worker_max ) ) {
     246           3 :     FD_LOG_WARNING(( "too many workers" ));
     247           3 :     return NULL;
     248           3 :   }
     249             : 
     250         507 :   for( ulong worker_idx=0UL; worker_idx<worker_cnt; worker_idx++ )
     251         420 :     if( worker[ worker_idx ]->tile_idx==tile_idx ) {
     252          18 :       FD_LOG_WARNING(( "tile_idx already added to tpool" ));
     253          18 :       return NULL;
     254          18 :     }
     255             : 
     256          87 :   fd_tpool_private_worker_cfg_t cfg[1];
     257             : 
     258          87 :   cfg->tpool    = tpool;
     259          87 :   cfg->tile_idx = tile_idx;
     260             : 
     261          87 :   int     argc = (int)(uint)worker_cnt;
     262          87 :   char ** argv = (char **)fd_type_pun( cfg );
     263             : 
     264          87 :   FD_COMPILER_MFENCE();
     265          87 :   worker[ worker_cnt ] = NULL;
     266          87 :   FD_COMPILER_MFENCE();
     267             : 
     268          87 :   if( FD_UNLIKELY( !fd_tile_exec_new( tile_idx, fd_tpool_private_worker_, argc, argv ) ) ) {
     269           3 :     FD_LOG_WARNING(( "fd_tile_exec_new failed (tile probably already in use)" ));
     270           3 :     return NULL;
     271           3 :   }
     272             : 
     273        3140 :   while( !FD_VOLATILE_CONST( worker[ worker_cnt ] ) ) FD_SPIN_PAUSE();
     274             : 
     275          84 :   tpool->worker_cnt = (uint)(worker_cnt + 1UL);
     276          84 :   return tpool;
     277          87 : }
     278             : 
     279             : fd_tpool_t *
     280          93 : fd_tpool_worker_pop( fd_tpool_t * tpool ) {
     281             : 
     282          93 :   FD_COMPILER_MFENCE();
     283             : 
     284          93 :   if( FD_UNLIKELY( !tpool ) ) {
     285           3 :     FD_LOG_WARNING(( "NULL tpool" ));
     286           3 :     return NULL;
     287           3 :   }
     288             : 
     289          90 :   ulong worker_cnt = (ulong)tpool->worker_cnt;
     290          90 :   if( FD_UNLIKELY( worker_cnt<=1UL ) ) {
     291           3 :     FD_LOG_WARNING(( "no workers to pop" ));
     292           3 :     return NULL;
     293           3 :   }
     294             : 
     295             :   /* Testing for IDLE isn't strictly necessary given requirements to use
     296             :      this and this isn't being done atomically with the actually pop but
     297             :      does help catch obvious user errors. */
     298             : 
     299          87 :   if( FD_UNLIKELY( !fd_tpool_worker_idle( tpool, worker_cnt-1UL ) ) ) {
     300           3 :     FD_LOG_WARNING(( "worker to pop is not idle" ));
     301           3 :     return NULL;
     302           3 :   }
     303             : 
     304             :   /* Send HALT to the worker */
     305             : 
     306          84 :   fd_tpool_private_worker_t * worker = fd_tpool_private_worker( tpool )[ worker_cnt-1UL ];
     307          84 :   uint                        seq0   = worker->seq0 + 1U;
     308          84 :   fd_tile_exec_t *            exec   = fd_tile_exec( worker->tile_idx );
     309             : 
     310          84 :   worker->task = 0UL;
     311          84 :   FD_COMPILER_MFENCE();
     312          84 :   worker->seq0 = seq0;
     313          84 :   FD_COMPILER_MFENCE();
     314          84 :   if( FD_UNLIKELY( tpool->opt & FD_TPOOL_OPT_SLEEP ) ) fd_tpool_private_wake( worker );
     315             : 
     316             :   /* Wait for the worker to shutdown */
     317             : 
     318          84 :   int          ret;
     319          84 :   char const * err = fd_tile_exec_delete( exec, &ret );
     320          84 :   if(      FD_UNLIKELY( err ) ) FD_LOG_WARNING(( "tile err \"%s\" unexpected; attempting to continue", err ));
     321          84 :   else if( FD_UNLIKELY( ret ) ) FD_LOG_WARNING(( "tile ret %i unexpected; attempting to continue", ret ));
     322             : 
     323          84 :   tpool->worker_cnt = (uint)(worker_cnt - 1UL);
     324          84 :   return tpool;
     325          87 : }
     326             : 
     327             : #define FD_TPOOL_EXEC_ALL_IMPL_HDR(style)                                                          \
     328             : void                                                                                               \
     329             : fd_tpool_private_exec_all_##style##_node( void * _node_tpool,                                      \
     330             :                                           ulong  node_t0, ulong node_t1,                           \
     331             :                                           void * args,                                             \
     332             :                                           void * reduce,  ulong stride,                            \
     333             :                                           ulong  l0,      ulong l1,                                \
     334             :                                           ulong  _task,   ulong _tpool,                            \
     335    11521067 :                                           ulong  t0,      ulong t1 ) {                             \
     336    11521067 :   fd_tpool_t *    node_tpool = (fd_tpool_t *   )_node_tpool;                                       \
     337    11521067 :   fd_tpool_task_t task       = (fd_tpool_task_t)_task;                                             \
     338    11521067 :   ulong           wait_cnt   = 0UL;                                                                \
     339    11521067 :   ushort          wait_child[16];   /* Assumes tpool_cnt<=65536 */                                 \
     340    18575758 :   for(;;) {                                                                                        \
     341    18575758 :     ulong node_t_cnt = node_t1 - node_t0;                                                          \
     342    18575758 :     if( node_t_cnt<=1L ) break;                                                                    \
     343    18575758 :     ulong node_ts = node_t0 + fd_tpool_private_split( node_t_cnt );                                \
     344     8366693 :     fd_tpool_exec( node_tpool, node_ts, fd_tpool_private_exec_all_##style##_node,                  \
     345     8366693 :                    node_tpool, node_ts,node_t1, args, reduce,stride, l0,l1, _task,_tpool, t0,t1 ); \
     346     8366693 :     wait_child[ wait_cnt++ ] = (ushort)node_ts;                                                    \
     347     8366693 :     node_t1 = node_ts;                                                                             \
     348     8366693 :   }
     349             : 
     350             : #define FD_TPOOL_EXEC_ALL_IMPL_FTR                                                \
     351    19839302 :   while( wait_cnt ) fd_tpool_wait( node_tpool, (ulong)wait_child[ --wait_cnt ] ); \
     352    11521067 : }
     353             : 
     354     1024576 : FD_TPOOL_EXEC_ALL_IMPL_HDR(rrobin)
     355     1024576 :   ulong m_stride = t1-t0;
     356     1024576 :   ulong m        = l0 + fd_ulong_min( node_t0-t0, ULONG_MAX-l0 ); /* robust against overflow */
     357    47851274 :   while( m<l1 ) {
     358    46826698 :     task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m,m+1UL, node_t0,node_t1 );
     359    46826698 :     m += fd_ulong_min( m_stride, ULONG_MAX-m ); /* robust against overflow */
     360    46826698 :   }
     361     1024576 : FD_TPOOL_EXEC_ALL_IMPL_FTR
     362             : 
     363     1014550 : FD_TPOOL_EXEC_ALL_IMPL_HDR(block)
     364     1014550 :   ulong m0; ulong m1; FD_TPOOL_PARTITION( l0,l1,1UL, node_t0-t0,t1-t0, m0,m1 );
     365    47939472 :   for( ulong m=m0; m<m1; m++ ) task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m,m+1UL, node_t0,node_t1 );
     366     1014550 : FD_TPOOL_EXEC_ALL_IMPL_FTR
     367             : 
     368             : #if FD_HAS_ATOMIC
     369     1009795 : FD_TPOOL_EXEC_ALL_IMPL_HDR(taskq)
     370     1009795 :   ulong * l_next = (ulong *)_tpool;
     371     1009795 :   void  * tpool  = (void *)l_next[1];
     372    98554668 :   for(;;) {
     373             : 
     374             :     /* Note that we use an ATOMIC_CAS here instead of an
     375             :        ATOMIC_FETCH_AND_ADD to avoid overflow risks by having threads
     376             :        increment l0 into the tail.  ATOMIC_FETCH_AND_ADD could be used
     377             :        if there is no requirement to the effect that l1+FD_TILE_MAX does
     378             :        not overflow. */
     379             : 
     380    98554668 :     FD_COMPILER_MFENCE();
     381    98554668 :     ulong m0 = *l_next;
     382    98554668 :     FD_COMPILER_MFENCE();
     383             : 
     384    98554668 :     if( FD_UNLIKELY( m0>=l1 ) ) break;
     385    97607393 :     ulong m1 = m0+1UL;
     386    97607393 :     if( FD_UNLIKELY( FD_ATOMIC_CAS( l_next, m0, m1 )!=m0 ) ) {
     387    37385443 :       FD_SPIN_PAUSE();
     388    37385443 :       continue;
     389    37385443 :     }
     390             : 
     391    60221950 :     task( tpool,t0,t1, args,reduce,stride, l0,l1, m0,m1, node_t0,node_t1 );
     392    60221950 :   }
     393     1009795 : FD_TPOOL_EXEC_ALL_IMPL_FTR
     394             : #endif
     395             : 
     396     1025784 : FD_TPOOL_EXEC_ALL_IMPL_HDR(batch)
     397     1025784 :   ulong m0; ulong m1; FD_TPOOL_PARTITION( l0,l1,1UL, node_t0-t0,t1-t0, m0,m1 );
     398     1025784 :   task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m0,m1, node_t0,node_t1 );
     399     1025784 : FD_TPOOL_EXEC_ALL_IMPL_FTR
     400             : 
     401     7446362 : FD_TPOOL_EXEC_ALL_IMPL_HDR(raw)
     402     7446362 :   task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, l0,l1, node_t0,node_t1 );
     403     7446362 : FD_TPOOL_EXEC_ALL_IMPL_FTR
     404             : 
     405             : #undef FD_TPOOL_EXEC_ALL_IMPL_FTR
     406             : #undef FD_TPOOL_EXEC_ALL_IMPL_HDR

Generated by: LCOV version 1.14