LCOV - code coverage report
Current view: top level - util/tpool - fd_tpool.cxx (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 233 254 91.7 %
Date: 2025-07-01 05:00:49 Functions: 12 13 92.3 %

          Line data    Source code
       1             : #include "fd_tpool.h"
       2             : 
       3             : #if FD_HAS_THREADS
       4             : #include <pthread.h>
       5             : #endif
       6             : 
       7             : struct fd_tpool_private_worker_cfg {
       8             :   fd_tpool_t * tpool;
       9             :   ulong        tile_idx;
      10             : };
      11             : 
      12             : typedef struct fd_tpool_private_worker_cfg fd_tpool_private_worker_cfg_t;
      13             : 
      14             : static int
      15             : fd_tpool_private_worker( int     argc,
      16          84 :                          char ** argv ) {
      17          84 :   ulong                           worker_idx = (ulong)(uint)argc;
      18          84 :   fd_tpool_private_worker_cfg_t * cfg        = (fd_tpool_private_worker_cfg_t *)argv;
      19             : 
      20          84 :   fd_tpool_t * tpool    = cfg->tpool;
      21          84 :   ulong        tile_idx = cfg->tile_idx;
      22             : 
      23             :   /* We are BOOT */
      24             : 
      25          84 :   fd_tpool_private_worker_t worker[1];
      26             : 
      27          84 :   memset( worker, 0, sizeof(fd_tpool_private_worker_t) );
      28             : 
      29          84 :   worker->tile_idx = (uint) tile_idx;
      30             : 
      31          84 : # if FD_HAS_THREADS
      32          84 :   int sleeper = !!(tpool->opt & FD_TPOOL_OPT_SLEEP);
      33             : 
      34          84 :   pthread_mutex_t lock[1];
      35          84 :   pthread_cond_t  wake[1];
      36             : 
      37          84 :   if( FD_UNLIKELY( sleeper ) ) {
      38           0 :     if( FD_UNLIKELY( pthread_mutex_init( lock, NULL ) ) ) FD_LOG_ERR(( "pthread_mutex_init failed" ));
      39           0 :     if( FD_UNLIKELY( pthread_cond_init ( wake, NULL ) ) ) FD_LOG_ERR(( "pthread_cond_init failed"  ));
      40           0 :     if( FD_UNLIKELY( pthread_mutex_lock( lock       ) ) ) FD_LOG_ERR(( "pthread_mutex_lock failed" ));
      41           0 :   }
      42             : 
      43          84 :   worker->lock = (ulong)lock;
      44          84 :   worker->wake = (ulong)wake;
      45          84 : # endif
      46             : 
      47          84 :   FD_COMPILER_MFENCE();
      48             : 
      49          84 :   fd_tpool_private_worker( tpool )[ worker_idx ] = worker;
      50             : 
      51          84 :   ulong const * arg  = worker->arg;
      52          84 :   uint          seq1 = worker->seq1;
      53             : 
      54   648493427 :   for(;;) {
      55             : 
      56             :     /* We are IDLE ... see what we should do next */
      57             : 
      58   648493427 : #   if FD_HAS_THREADS
      59   648493427 :     if( FD_UNLIKELY( sleeper ) && FD_UNLIKELY( pthread_cond_wait( wake, lock ) ) )
      60           0 :       FD_LOG_WARNING(( "pthread_cond_wait failed; attempting to continue" ));
      61   648493427 : #   endif
      62             : 
      63   648493427 :     FD_COMPILER_MFENCE();
      64   648493427 :     uint  seq0     = worker->seq0;
      65   648493427 :     FD_COMPILER_MFENCE();
      66   648493427 :     uint  _arg_cnt = worker->arg_cnt;
      67   648493427 :     ulong _task    = worker->task;
      68   648493427 :     FD_COMPILER_MFENCE();
      69             : 
      70   648493427 :     if( FD_UNLIKELY( seq0==seq1 ) ) { /* Got idle */
      71   644958083 :       FD_SPIN_PAUSE();
      72   644958083 :       continue;
      73   644958083 :     }
      74             : 
      75     3535344 :     if( FD_UNLIKELY( !_task ) ) break; /* Got halt */
      76             : 
      77             :     /* We are EXEC ... do the task and then transition to IDLE */
      78             : 
      79     3535260 :     try {
      80             : 
      81     5208747 :       if( _arg_cnt==UINT_MAX ) {
      82             : 
      83     5208747 :         fd_tpool_task_t task = (fd_tpool_task_t)_task;
      84             : 
      85     5208747 :         void * task_tpool  = (void *)arg[ 0];
      86     5208747 :         ulong  task_t0     =         arg[ 1]; ulong task_t1     = arg[ 2];
      87     5208747 :         void * task_args   = (void *)arg[ 3];
      88     5208747 :         void * task_reduce = (void *)arg[ 4]; ulong task_stride = arg[ 5];
      89     5208747 :         ulong  task_l0     =         arg[ 6]; ulong task_l1     = arg[ 7];
      90     5208747 :         ulong  task_m0     =         arg[ 8]; ulong task_m1     = arg[ 9];
      91     5208747 :         ulong  task_n0     =         arg[10]; ulong task_n1     = arg[11];
      92             : 
      93     5208747 :         task( task_tpool,task_t0,task_t1, task_args, task_reduce,task_stride, task_l0,task_l1, task_m0,task_m1, task_n0,task_n1 );
      94             : 
      95           5 :       } else {
      96             : 
      97           5 :         fd_tpool_task_v2_t task = (fd_tpool_task_v2_t)_task;
      98             : 
      99           5 :         task( tpool, worker_idx, (ulong)_arg_cnt, arg );
     100             : 
     101           5 :       }
     102             : 
     103     3535260 :     } catch( ... ) {
     104           0 :       FD_LOG_WARNING(( "uncaught exception; attempting to continue" ));
     105           0 :     }
     106             : 
     107    19639254 :     FD_COMPILER_MFENCE();
     108             : 
     109    19639254 :     worker->seq1 = seq0;
     110    19639254 :     seq1 = seq0;
     111    19639254 :   }
     112             : 
     113             :   /* We are HALT ... clean up and terminate */
     114             : 
     115    16104078 : # if FD_HAS_THREADS
     116    16104078 :   if( FD_UNLIKELY( sleeper ) ) {
     117           0 :     if( FD_UNLIKELY( pthread_mutex_unlock ( lock ) ) ) FD_LOG_WARNING(( "pthread_mutex_unlock failed; attempting to continue" ));
     118           0 :     if( FD_UNLIKELY( pthread_cond_destroy ( wake ) ) ) FD_LOG_WARNING(( "pthread_cond_destroy failed; attempting to continue" ));
     119           0 :     if( FD_UNLIKELY( pthread_mutex_destroy( lock ) ) ) FD_LOG_WARNING(( "pthread_mutex_destroy failed; attempting to continue" ));
     120           0 :   }
     121    16104078 : # endif
     122             : 
     123    16104078 :   return 0;
     124          84 : }
     125             : 
     126             : #if FD_HAS_THREADS
     127             : void
     128           0 : fd_tpool_private_wake( fd_tpool_private_worker_t * worker ) {
     129           0 :   pthread_mutex_t * lock = (pthread_mutex_t *)worker->lock;
     130           0 :   pthread_cond_t *  wake = (pthread_cond_t  *)worker->wake;
     131           0 :   if( FD_UNLIKELY( pthread_mutex_lock  ( lock ) ) ) FD_LOG_WARNING(( "pthread_mutex_lock failed; attempting to continue" ));
     132           0 :   if( FD_UNLIKELY( pthread_cond_signal ( wake ) ) ) FD_LOG_WARNING(( "pthread_cond_signal failed; attempting to continue" ));
     133           0 :   if( FD_UNLIKELY( pthread_mutex_unlock( lock ) ) ) FD_LOG_WARNING(( "pthread_mutex_unlock failed; attempting to continue" ));
     134           0 : }
     135             : #endif
     136             : 
     137             : ulong
     138        3099 : fd_tpool_align( void ) {
     139        3099 :   return FD_TPOOL_ALIGN;
     140        3099 : }
     141             : 
     142             : ulong
     143     3003093 : fd_tpool_footprint( ulong worker_max ) {
     144     3003093 :   if( FD_UNLIKELY( !((1UL<=worker_max) & (worker_max<=FD_TILE_MAX)) ) ) return 0UL;
     145      474180 :   return fd_ulong_align_up( sizeof(fd_tpool_private_worker_t) +
     146      474180 :                             sizeof(fd_tpool_t) + worker_max*sizeof(fd_tpool_private_worker_t *), FD_TPOOL_ALIGN );
     147     3003093 : }
     148             : 
     149             : fd_tpool_t *
     150             : fd_tpool_init( void * mem,
     151             :                ulong  worker_max,
     152        3099 :                ulong  opt ) {
     153             : 
     154        3099 :   FD_COMPILER_MFENCE();
     155             : 
     156        3099 :   if( FD_UNLIKELY( !mem ) ) {
     157           3 :     FD_LOG_WARNING(( "NULL mem" ));
     158           3 :     return NULL;
     159           3 :   }
     160             : 
     161        3096 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, fd_tpool_align() ) ) ) {
     162           3 :     FD_LOG_WARNING(( "bad alignment" ));
     163           3 :     return NULL;
     164           3 :   }
     165             : 
     166        3093 :   ulong footprint = fd_tpool_footprint( worker_max );
     167        3093 :   if( FD_UNLIKELY( !footprint ) ) {
     168           6 :     FD_LOG_WARNING(( "bad worker_max" ));
     169           6 :     return NULL;
     170           6 :   }
     171             : 
     172        3087 :   fd_memset( mem, 0, footprint );
     173             : 
     174        3087 :   fd_tpool_private_worker_t * worker0 = (fd_tpool_private_worker_t *)mem;
     175             : 
     176        3087 :   worker0->seq0 = 1U;
     177        3087 :   worker0->seq1 = 0U;
     178             : 
     179        3087 :   fd_tpool_t * tpool = (fd_tpool_t *)(worker0+1);
     180             : 
     181        3087 :   tpool->opt        = opt;
     182        3087 :   tpool->worker_max = (uint)worker_max;
     183        3087 :   tpool->worker_cnt = 1U;
     184             : 
     185        3087 :   FD_COMPILER_MFENCE();
     186        3087 :   fd_tpool_private_worker( tpool )[0] = worker0;
     187        3087 :   FD_COMPILER_MFENCE();
     188             : 
     189        3087 :   return tpool;
     190        3093 : }
     191             : 
     192             : void *
     193        3090 : fd_tpool_fini( fd_tpool_t * tpool ) {
     194             : 
     195        3090 :   FD_COMPILER_MFENCE();
     196             : 
     197        3090 :   if( FD_UNLIKELY( !tpool ) ) {
     198           3 :     FD_LOG_WARNING(( "NULL tpool" ));
     199           3 :     return NULL;
     200           3 :   }
     201             : 
     202        3150 :   while( fd_tpool_worker_cnt( tpool )>1UL ) {
     203          63 :     if( FD_UNLIKELY( !fd_tpool_worker_pop( tpool ) ) ) {
     204           0 :       FD_LOG_WARNING(( "fd_tpool_worker_pop failed" ));
     205           0 :       return NULL;
     206           0 :     }
     207          63 :   }
     208             : 
     209        3087 :   return (void *)fd_tpool_private_worker0( tpool );
     210        3087 : }
     211             : 
     212             : fd_tpool_t *
     213             : fd_tpool_worker_push( fd_tpool_t * tpool,
     214         120 :                       ulong        tile_idx ) {
     215             : 
     216         120 :   FD_COMPILER_MFENCE();
     217             : 
     218         120 :   if( FD_UNLIKELY( !tpool ) ) {
     219           3 :     FD_LOG_WARNING(( "NULL tpool" ));
     220           3 :     return NULL;
     221           3 :   }
     222             : 
     223         117 :   if( FD_UNLIKELY( !tile_idx ) ) {
     224           3 :     FD_LOG_WARNING(( "cannot push tile_idx 0" ));
     225           3 :     return NULL;
     226           3 :   }
     227             : 
     228         114 :   if( FD_UNLIKELY( tile_idx==fd_tile_idx() ) ) {
     229           3 :     FD_LOG_WARNING(( "cannot push self" ));
     230           3 :     return NULL;
     231           3 :   }
     232             : 
     233         111 :   if( FD_UNLIKELY( tile_idx>=fd_tile_cnt() ) ) {
     234           3 :     FD_LOG_WARNING(( "invalid tile_idx" ));
     235           3 :     return NULL;
     236           3 :   }
     237             : 
     238         108 :   fd_tpool_private_worker_t ** worker     = fd_tpool_private_worker( tpool );
     239         108 :   ulong                        worker_cnt = (ulong)tpool->worker_cnt;
     240             : 
     241         108 :   if( FD_UNLIKELY( worker_cnt>=(ulong)tpool->worker_max ) ) {
     242           3 :     FD_LOG_WARNING(( "too many workers" ));
     243           3 :     return NULL;
     244           3 :   }
     245             : 
     246         507 :   for( ulong worker_idx=0UL; worker_idx<worker_cnt; worker_idx++ )
     247         420 :     if( worker[ worker_idx ]->tile_idx==tile_idx ) {
     248          18 :       FD_LOG_WARNING(( "tile_idx already added to tpool" ));
     249          18 :       return NULL;
     250          18 :     }
     251             : 
     252          87 :   fd_tpool_private_worker_cfg_t cfg[1];
     253             : 
     254          87 :   cfg->tpool    = tpool;
     255          87 :   cfg->tile_idx = tile_idx;
     256             : 
     257          87 :   int     argc = (int)(uint)worker_cnt;
     258          87 :   char ** argv = (char **)fd_type_pun( cfg );
     259             : 
     260          87 :   FD_COMPILER_MFENCE();
     261          87 :   worker[ worker_cnt ] = NULL;
     262          87 :   FD_COMPILER_MFENCE();
     263             : 
     264          87 :   if( FD_UNLIKELY( !fd_tile_exec_new( tile_idx, fd_tpool_private_worker, argc, argv ) ) ) {
     265           3 :     FD_LOG_WARNING(( "fd_tile_exec_new failed (tile probably already in use)" ));
     266           3 :     return NULL;
     267           3 :   }
     268             : 
     269        2133 :   while( !FD_VOLATILE_CONST( worker[ worker_cnt ] ) ) FD_SPIN_PAUSE();
     270             : 
     271          84 :   tpool->worker_cnt = (uint)(worker_cnt + 1UL);
     272          84 :   return tpool;
     273          87 : }
     274             : 
     275             : fd_tpool_t *
     276          93 : fd_tpool_worker_pop( fd_tpool_t * tpool ) {
     277             : 
     278          93 :   FD_COMPILER_MFENCE();
     279             : 
     280          93 :   if( FD_UNLIKELY( !tpool ) ) {
     281           3 :     FD_LOG_WARNING(( "NULL tpool" ));
     282           3 :     return NULL;
     283           3 :   }
     284             : 
     285          90 :   ulong worker_cnt = (ulong)tpool->worker_cnt;
     286          90 :   if( FD_UNLIKELY( worker_cnt<=1UL ) ) {
     287           3 :     FD_LOG_WARNING(( "no workers to pop" ));
     288           3 :     return NULL;
     289           3 :   }
     290             : 
     291             :   /* Testing for IDLE isn't strictly necessary given requirements to use
     292             :      this and this isn't being done atomically with the actually pop but
     293             :      does help catch obvious user errors. */
     294             : 
     295          87 :   if( FD_UNLIKELY( !fd_tpool_worker_idle( tpool, worker_cnt-1UL ) ) ) {
     296           3 :     FD_LOG_WARNING(( "worker to pop is not idle" ));
     297           3 :     return NULL;
     298           3 :   }
     299             : 
     300             :   /* Send HALT to the worker */
     301             : 
     302          84 :   fd_tpool_private_worker_t * worker = fd_tpool_private_worker( tpool )[ worker_cnt-1UL ];
     303          84 :   uint                        seq0   = worker->seq0 + 1U;
     304          84 :   fd_tile_exec_t *            exec   = fd_tile_exec( worker->tile_idx );
     305             : 
     306          84 :   worker->task = 0UL;
     307          84 :   FD_COMPILER_MFENCE();
     308          84 :   worker->seq0 = seq0;
     309          84 :   FD_COMPILER_MFENCE();
     310          84 :   if( FD_UNLIKELY( tpool->opt & FD_TPOOL_OPT_SLEEP ) ) fd_tpool_private_wake( worker );
     311             : 
     312             :   /* Wait for the worker to shutdown */
     313             : 
     314          84 :   int          ret;
     315          84 :   char const * err = fd_tile_exec_delete( exec, &ret );
     316          84 :   if(      FD_UNLIKELY( err ) ) FD_LOG_WARNING(( "tile err \"%s\" unexpected; attempting to continue", err ));
     317          84 :   else if( FD_UNLIKELY( ret ) ) FD_LOG_WARNING(( "tile ret %i unexpected; attempting to continue", ret ));
     318             : 
     319          84 :   tpool->worker_cnt = (uint)(worker_cnt - 1UL);
     320          84 :   return tpool;
     321          87 : }
     322             : 
     323             : #define FD_TPOOL_EXEC_ALL_IMPL_HDR(style)                                                          \
     324             : void                                                                                               \
     325             : fd_tpool_private_exec_all_##style##_node( void * _node_tpool,                                      \
     326             :                                           ulong  node_t0, ulong node_t1,                           \
     327             :                                           void * args,                                             \
     328             :                                           void * reduce,  ulong stride,                            \
     329             :                                           ulong  l0,      ulong l1,                                \
     330             :                                           ulong  _task,   ulong _tpool,                            \
     331     8627191 :                                           ulong  t0,      ulong t1 ) {                             \
     332     8627191 :   fd_tpool_t *    node_tpool = (fd_tpool_t *   )_node_tpool;                                       \
     333     8627191 :   fd_tpool_task_t task       = (fd_tpool_task_t)_task;                                             \
     334     8627191 :   ulong           wait_cnt   = 0UL;                                                                \
     335     8627191 :   ushort          wait_child[16];   /* Assumes tpool_cnt<=65536 */                                 \
     336    16874568 :   for(;;) {                                                                                        \
     337    16874568 :     ulong node_t_cnt = node_t1 - node_t0;                                                          \
     338    16874568 :     if( node_t_cnt<=1L ) break;                                                                    \
     339    16874568 :     ulong node_ts = node_t0 + fd_tpool_private_split( node_t_cnt );                                \
     340     8146026 :     fd_tpool_exec( node_tpool, node_ts, fd_tpool_private_exec_all_##style##_node,                  \
     341     8146026 :                    node_tpool, node_ts,node_t1, args, reduce,stride, l0,l1, _task,_tpool, t0,t1 ); \
     342     8146026 :     wait_child[ wait_cnt++ ] = (ushort)node_ts;                                                    \
     343     8146026 :     node_t1 = node_ts;                                                                             \
     344     8146026 :   }
     345             : 
     346             : #define FD_TPOOL_EXEC_ALL_IMPL_FTR                                                \
     347    17366213 :   while( wait_cnt ) fd_tpool_wait( node_tpool, (ulong)wait_child[ --wait_cnt ] ); \
     348     8627191 : }
     349             : 
     350      772615 : FD_TPOOL_EXEC_ALL_IMPL_HDR(rrobin)
     351      772615 :   ulong m_stride = t1-t0;
     352      772615 :   ulong m        = l0 + fd_ulong_min( node_t0-t0, ULONG_MAX-l0 ); /* robust against overflow */
     353    67873859 :   while( m<l1 ) {
     354    67101244 :     task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m,m+1UL, node_t0,node_t1 );
     355    67101244 :     m += fd_ulong_min( m_stride, ULONG_MAX-m ); /* robust against overflow */
     356    67101244 :   }
     357      772615 : FD_TPOOL_EXEC_ALL_IMPL_FTR
     358             : 
     359      773040 : FD_TPOOL_EXEC_ALL_IMPL_HDR(block)
     360      773040 :   ulong m0; ulong m1; FD_TPOOL_PARTITION( l0,l1,1UL, node_t0-t0,t1-t0, m0,m1 );
     361    70792979 :   for( ulong m=m0; m<m1; m++ ) task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m,m+1UL, node_t0,node_t1 );
     362      773040 : FD_TPOOL_EXEC_ALL_IMPL_FTR
     363             : 
     364             : #if FD_HAS_ATOMIC
     365      783760 : FD_TPOOL_EXEC_ALL_IMPL_HDR(taskq)
     366      783760 :   ulong * l_next = (ulong *)_tpool;
     367      783760 :   void  * tpool  = (void *)l_next[1];
     368    83353498 :   for(;;) {
     369             : 
     370             :     /* Note that we use an ATOMIC_CAS here instead of an
     371             :        ATOMIC_FETCH_AND_ADD to avoid overflow risks by having threads
     372             :        increment l0 into the tail.  ATOMIC_FETCH_AND_ADD could be used
     373             :        if there is no requirement to the effect that l1+FD_TILE_MAX does
     374             :        not overflow. */
     375             : 
     376    83353498 :     FD_COMPILER_MFENCE();
     377    83353498 :     ulong m0 = *l_next;
     378    83353498 :     FD_COMPILER_MFENCE();
     379             : 
     380    83353498 :     if( FD_UNLIKELY( m0>=l1 ) ) break;
     381    82466938 :     ulong m1 = m0+1UL;
     382    82466938 :     if( FD_UNLIKELY( FD_ATOMIC_CAS( l_next, m0, m1 )!=m0 ) ) {
     383     8959649 :       FD_SPIN_PAUSE();
     384     8959649 :       continue;
     385     8959649 :     }
     386             : 
     387    73507289 :     task( tpool,t0,t1, args,reduce,stride, l0,l1, m0,m1, node_t0,node_t1 );
     388    73507289 :   }
     389      783760 : FD_TPOOL_EXEC_ALL_IMPL_FTR
     390             : #endif
     391             : 
     392      763336 : FD_TPOOL_EXEC_ALL_IMPL_HDR(batch)
     393      763336 :   ulong m0; ulong m1; FD_TPOOL_PARTITION( l0,l1,1UL, node_t0-t0,t1-t0, m0,m1 );
     394      763336 :   task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m0,m1, node_t0,node_t1 );
     395      763336 : FD_TPOOL_EXEC_ALL_IMPL_FTR
     396             : 
     397     5534440 : FD_TPOOL_EXEC_ALL_IMPL_HDR(raw)
     398     5534440 :   task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, l0,l1, node_t0,node_t1 );
     399     5534440 : FD_TPOOL_EXEC_ALL_IMPL_FTR
     400             : 
     401             : #undef FD_TPOOL_EXEC_ALL_IMPL_FTR
     402             : #undef FD_TPOOL_EXEC_ALL_IMPL_HDR

Generated by: LCOV version 1.14