LCOV - code coverage report
Current view: top level - util/tpool - fd_tpool.cxx (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 244 249 98.0 %
Date: 2025-01-08 12:08:44 Functions: 13 13 100.0 %

          Line data    Source code
       1             : #include "fd_tpool.h"
       2             : 
       3             : struct fd_tpool_private_worker_cfg {
       4             :   fd_tpool_t * tpool;
       5             :   ulong        tile_idx;
       6             :   void *       scratch;
       7             :   ulong        scratch_sz;
       8             : };
       9             : 
      10             : typedef struct fd_tpool_private_worker_cfg fd_tpool_private_worker_cfg_t;
      11             : 
      12             : /* This is not static to allow tile 0 to attach to this if desired. */
      13             : 
      14             : FD_TL ulong fd_tpool_private_scratch_frame[ FD_TPOOL_WORKER_SCRATCH_DEPTH ] __attribute((aligned(FD_SCRATCH_FMEM_ALIGN)));
      15             : 
      16             : static int
      17             : fd_tpool_private_worker( int     argc,
      18          84 :                          char ** argv ) {
      19          84 :   ulong                           worker_idx = (ulong)(uint)argc;
      20          84 :   fd_tpool_private_worker_cfg_t * cfg        = (fd_tpool_private_worker_cfg_t *)argv;
      21             : 
      22          84 :   fd_tpool_t * tpool      = cfg->tpool;
      23          84 :   ulong        tile_idx   = cfg->tile_idx;
      24          84 :   void *       scratch    = cfg->scratch;
      25          84 :   ulong        scratch_sz = cfg->scratch_sz;
      26             : 
      27          84 :   fd_tpool_private_worker_t worker[1];
      28             : 
      29          84 :   FD_COMPILER_MFENCE();
      30          84 :   worker->state = FD_TPOOL_WORKER_STATE_BOOT;
      31          84 :   FD_COMPILER_MFENCE();
      32             : 
      33          84 :   worker->tile_idx   = (uint)tile_idx;
      34          84 :   worker->scratch    = scratch;
      35          84 :   worker->scratch_sz = scratch_sz;
      36             : 
      37          84 :   if( scratch_sz ) fd_scratch_attach( scratch, fd_tpool_private_scratch_frame, scratch_sz, FD_TPOOL_WORKER_SCRATCH_DEPTH );
      38             : 
      39          84 :   FD_COMPILER_MFENCE();
      40          84 :   worker->state = FD_TPOOL_WORKER_STATE_IDLE;
      41          84 :   FD_COMPILER_MFENCE();
      42             : 
      43          84 :   fd_tpool_private_worker( tpool )[ worker_idx ] = worker;
      44             : 
      45  2461238908 :   for(;;) {
      46             : 
      47             :     /* We are IDLE ... see what we should do next */
      48             : 
      49  2461238908 :     FD_COMPILER_MFENCE();
      50  2461238908 :     int state = worker->state;
      51  2461238908 :     FD_COMPILER_MFENCE();
      52             : 
      53  2461238908 :     if( FD_UNLIKELY( state!=FD_TPOOL_WORKER_STATE_EXEC ) ) {
      54  1954361289 :       if( FD_UNLIKELY( state!=FD_TPOOL_WORKER_STATE_IDLE ) ) break;
      55  1954361205 :       FD_SPIN_PAUSE();
      56  1954361205 :       continue;
      57  1954361289 :     }
      58             : 
      59             :     /* We are EXEC ... do the task and then transition to IDLE */
      60             : 
      61   506877619 :     fd_tpool_task_t task = worker->task;
      62             : 
      63   506877619 :     void * task_tpool  = worker->task_tpool;
      64   506877619 :     ulong  task_t0     = worker->task_t0;     ulong task_t1     = worker->task_t1;
      65   506877619 :     void * task_args   = worker->task_args;
      66   506877619 :     void * task_reduce = worker->task_reduce; ulong task_stride = worker->task_stride;
      67   506877619 :     ulong  task_l0     = worker->task_l0;     ulong task_l1     = worker->task_l1;
      68   506877619 :     ulong  task_m0     = worker->task_m0;     ulong task_m1     = worker->task_m1;
      69   506877619 :     ulong  task_n0     = worker->task_n0;     ulong task_n1     = worker->task_n1;
      70             : 
      71   506877619 :     FD_COMPILER_MFENCE();
      72             : 
      73   506877619 :     try {
      74   506877619 :       task( task_tpool,task_t0,task_t1, task_args, task_reduce,task_stride, task_l0,task_l1, task_m0,task_m1, task_n0,task_n1 );
      75   506877619 :     } catch( ... ) {
      76           0 :       FD_LOG_WARNING(( "uncaught exception; attempting to continue" ));
      77           0 :     }
      78             : 
      79   506877619 :     FD_COMPILER_MFENCE();
      80    29555221 :     worker->state = FD_TPOOL_WORKER_STATE_IDLE;
      81    29555221 :   }
      82             : 
      83             :   /* state is HALT, clean up and then reset back to BOOT */
      84             : 
      85           5 :   if( scratch_sz ) fd_scratch_detach( NULL );
      86             : 
      87           5 :   FD_COMPILER_MFENCE();
      88           5 :   worker->state = FD_TPOOL_WORKER_STATE_BOOT;
      89           5 :   FD_COMPILER_MFENCE();
      90             : 
      91           5 :   return 0;
      92          84 : }
      93             : 
      94             : ulong
      95           3 : fd_tpool_align( void ) {
      96           3 :   return FD_TPOOL_ALIGN;
      97           3 : }
      98             : 
      99             : ulong
     100     3003093 : fd_tpool_footprint( ulong worker_max ) {
     101     3003093 :   if( FD_UNLIKELY( !((1UL<=worker_max) & (worker_max<=FD_TILE_MAX)) ) ) return 0UL;
     102      474180 :   return fd_ulong_align_up( sizeof(fd_tpool_private_worker_t) +
     103      474180 :                             sizeof(fd_tpool_t) + worker_max*sizeof(fd_tpool_private_worker_t *), FD_TPOOL_ALIGN );
     104     3003093 : }
     105             : 
     106             : fd_tpool_t *
     107             : fd_tpool_init( void * mem,
     108        3099 :                ulong  worker_max ) {
     109             : 
     110        3099 :   FD_COMPILER_MFENCE();
     111             : 
     112        3099 :   if( FD_UNLIKELY( !mem ) ) {
     113           3 :     FD_LOG_WARNING(( "NULL mem" ));
     114           3 :     return NULL;
     115           3 :   }
     116             : 
     117        3096 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, fd_tpool_align() ) ) ) {
     118           3 :     FD_LOG_WARNING(( "bad alignment" ));
     119           3 :     return NULL;
     120           3 :   }
     121             : 
     122        3093 :   ulong footprint = fd_tpool_footprint( worker_max );
     123        3093 :   if( FD_UNLIKELY( !footprint ) ) {
     124           6 :     FD_LOG_WARNING(( "bad worker_max" ));
     125           6 :     return NULL;
     126           6 :   }
     127             : 
     128        3087 :   fd_memset( mem, 0, footprint );
     129             : 
     130        3087 :   fd_tpool_private_worker_t * worker0 = (fd_tpool_private_worker_t *)mem;
     131             : 
     132        3087 :   FD_COMPILER_MFENCE();
     133        3087 :   worker0->state = FD_TPOOL_WORKER_STATE_EXEC;
     134        3087 :   FD_COMPILER_MFENCE();
     135             : 
     136        3087 :   fd_tpool_t * tpool  = (fd_tpool_t *)(worker0+1);
     137        3087 :   tpool->worker_max = worker_max;
     138        3087 :   tpool->worker_cnt = 1UL;
     139             : 
     140        3087 :   FD_COMPILER_MFENCE();
     141        3087 :   fd_tpool_private_worker( tpool )[0] = worker0;
     142        3087 :   FD_COMPILER_MFENCE();
     143             : 
     144        3087 :   return tpool;
     145        3093 : }
     146             : 
     147             : void *
     148        3090 : fd_tpool_fini( fd_tpool_t * tpool ) {
     149             : 
     150        3090 :   FD_COMPILER_MFENCE();
     151             : 
     152        3090 :   if( FD_UNLIKELY( !tpool ) ) {
     153           3 :     FD_LOG_WARNING(( "NULL tpool" ));
     154           3 :     return NULL;
     155           3 :   }
     156             : 
     157        3150 :   while( fd_tpool_worker_cnt( tpool )>1UL ) {
     158          63 :     if( FD_UNLIKELY( !fd_tpool_worker_pop( tpool ) ) ) {
     159           0 :       FD_LOG_WARNING(( "fd_tpool_worker_pop failed" ));
     160           0 :       return NULL;
     161           0 :     }
     162          63 :   }
     163             : 
     164        3087 :   return (void *)fd_tpool_private_worker0( tpool );
     165        3087 : }
     166             : 
     167             : fd_tpool_t *
     168             : fd_tpool_worker_push( fd_tpool_t * tpool,
     169             :                       ulong        tile_idx,
     170             :                       void *       scratch,
     171         126 :                       ulong        scratch_sz ) {
     172             : 
     173         126 :   FD_COMPILER_MFENCE();
     174             : 
     175         126 :   if( FD_UNLIKELY( !tpool ) ) {
     176           3 :     FD_LOG_WARNING(( "NULL tpool" ));
     177           3 :     return NULL;
     178           3 :   }
     179             : 
     180         123 :   if( FD_UNLIKELY( !tile_idx ) ) {
     181           3 :     FD_LOG_WARNING(( "cannot push tile_idx 0" ));
     182           3 :     return NULL;
     183           3 :   }
     184             : 
     185         120 :   if( FD_UNLIKELY( tile_idx==fd_tile_idx() ) ) {
     186           3 :     FD_LOG_WARNING(( "cannot push self" ));
     187           3 :     return NULL;
     188           3 :   }
     189             : 
     190         117 :   if( FD_UNLIKELY( tile_idx>=fd_tile_cnt() ) ) {
     191           3 :     FD_LOG_WARNING(( "invalid tile_idx" ));
     192           3 :     return NULL;
     193           3 :   }
     194             : 
     195         114 :   if( FD_UNLIKELY( scratch_sz ) ) {
     196           6 :     if( FD_UNLIKELY( !scratch ) ) {
     197           3 :       FD_LOG_WARNING(( "NULL scratch" ));
     198           3 :       return NULL;
     199           3 :     }
     200             : 
     201           3 :     if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)scratch, FD_SCRATCH_SMEM_ALIGN ) ) ) {
     202           3 :       FD_LOG_WARNING(( "misaligned scratch" ));
     203           3 :       return NULL;
     204           3 :     }
     205           3 :   }
     206             : 
     207         108 :   fd_tpool_private_worker_t ** worker     = fd_tpool_private_worker( tpool );
     208         108 :   ulong                        worker_cnt = tpool->worker_cnt;
     209             : 
     210         108 :   if( FD_UNLIKELY( worker_cnt>=tpool->worker_max ) ) {
     211           3 :     FD_LOG_WARNING(( "too many workers" ));
     212           3 :     return NULL;
     213           3 :   }
     214             : 
     215         507 :   for( ulong worker_idx=0UL; worker_idx<worker_cnt; worker_idx++ )
     216         420 :     if( worker[ worker_idx ]->tile_idx==tile_idx ) {
     217          18 :       FD_LOG_WARNING(( "tile_idx already added to tpool" ));
     218          18 :       return NULL;
     219          18 :     }
     220             : 
     221          87 :   fd_tpool_private_worker_cfg_t cfg[1];
     222             : 
     223          87 :   cfg->tpool      = tpool;
     224          87 :   cfg->tile_idx   = tile_idx;
     225          87 :   cfg->scratch    = scratch;
     226          87 :   cfg->scratch_sz = scratch_sz;
     227             : 
     228          87 :   int     argc = (int)(uint)worker_cnt;
     229          87 :   char ** argv = (char **)fd_type_pun( cfg );
     230             : 
     231          87 :   FD_COMPILER_MFENCE();
     232          87 :   worker[ worker_cnt ] = NULL;
     233          87 :   FD_COMPILER_MFENCE();
     234             : 
     235          87 :   if( FD_UNLIKELY( !fd_tile_exec_new( tile_idx, fd_tpool_private_worker, argc, argv ) ) ) {
     236           3 :     FD_LOG_WARNING(( "fd_tile_exec_new failed (tile probably already in use)" ));
     237           3 :     return NULL;
     238           3 :   }
     239             : 
     240        2355 :   while( !FD_VOLATILE_CONST( worker[ worker_cnt ] ) ) FD_SPIN_PAUSE();
     241             : 
     242          84 :   tpool->worker_cnt = worker_cnt + 1UL;
     243          84 :   return tpool;
     244          87 : }
     245             : 
     246             : fd_tpool_t *
     247          93 : fd_tpool_worker_pop( fd_tpool_t * tpool ) {
     248             : 
     249          93 :   FD_COMPILER_MFENCE();
     250             : 
     251          93 :   if( FD_UNLIKELY( !tpool ) ) {
     252           3 :     FD_LOG_WARNING(( "NULL tpool" ));
     253           3 :     return NULL;
     254           3 :   }
     255             : 
     256          90 :   ulong worker_cnt = tpool->worker_cnt;
     257          90 :   if( FD_UNLIKELY( worker_cnt<=1UL ) ) {
     258           3 :     FD_LOG_WARNING(( "no workers to pop" ));
     259           3 :     return NULL;
     260           3 :   }
     261             : 
     262          87 :   fd_tpool_private_worker_t * worker   = fd_tpool_private_worker( tpool )[ worker_cnt-1UL ];
     263          87 :   fd_tile_exec_t *            exec     = fd_tile_exec( worker->tile_idx );
     264          87 :   int volatile *              vstate   = (int volatile *)&(worker->state);
     265          87 :   int                         state;
     266             : 
     267             :   /* Testing for IDLE isn't strictly necessary given requirements to use
     268             :      this but can help catch user errors.  Likewise, FD_ATOMIC_CAS isn't
     269             :      strictly necessary given correct operation but can more robustly
     270             :      catch such errors. */
     271             : 
     272          87 : # if FD_HAS_ATOMIC
     273             : 
     274          87 :   FD_COMPILER_MFENCE();
     275          87 :   state = FD_ATOMIC_CAS( vstate, FD_TPOOL_WORKER_STATE_IDLE, FD_TPOOL_WORKER_STATE_HALT );
     276          87 :   FD_COMPILER_MFENCE();
     277             : 
     278          87 :   if( FD_UNLIKELY( state!=FD_TPOOL_WORKER_STATE_IDLE ) ) {
     279           3 :     FD_LOG_WARNING(( "worker to pop is not idle (%i-%s)", state, fd_tpool_worker_state_cstr( state ) ));
     280           3 :     return NULL;
     281           3 :   }
     282             : 
     283             : # else
     284             : 
     285             :   FD_COMPILER_MFENCE();
     286             :   state = *vstate;
     287             :   FD_COMPILER_MFENCE();
     288             : 
     289             :   if( FD_UNLIKELY( state!=FD_TPOOL_WORKER_STATE_IDLE ) ) {
     290             :     FD_LOG_WARNING(( "worker to pop is not idle (%i-%s)", state, fd_tpool_worker_state_cstr( state ) ));
     291             :     return NULL;
     292             :   }
     293             : 
     294             :   FD_COMPILER_MFENCE();
     295             :   *vstate = FD_TPOOL_WORKER_STATE_HALT;
     296             :   FD_COMPILER_MFENCE();
     297             : 
     298             : # endif
     299             : 
     300             :   /* Wait for the worker to shutdown */
     301             : 
     302          84 :   int          ret;
     303          84 :   char const * err = fd_tile_exec_delete( exec, &ret );
     304          84 :   if(      FD_UNLIKELY( err ) ) FD_LOG_WARNING(( "tile err \"%s\" unexpected; attempting to continue", err ));
     305          84 :   else if( FD_UNLIKELY( ret ) ) FD_LOG_WARNING(( "tile ret %i unexpected; attempting to continue", ret ));
     306             : 
     307          84 :   tpool->worker_cnt = worker_cnt-1UL;
     308          84 :   return tpool;
     309          87 : }
     310             : 
     311             : #define FD_TPOOL_EXEC_ALL_IMPL_HDR(style)                                                          \
     312             : void                                                                                               \
     313             : fd_tpool_private_exec_all_##style##_node( void * _node_tpool,                                      \
     314             :                                           ulong  node_t0, ulong node_t1,                           \
     315             :                                           void * args,                                             \
     316             :                                           void * reduce,  ulong stride,                            \
     317             :                                           ulong  l0,      ulong l1,                                \
     318             :                                           ulong  _task,   ulong _tpool,                            \
     319    11516451 :                                           ulong  t0,      ulong t1 ) {                             \
     320    11516451 :   fd_tpool_t *    node_tpool = (fd_tpool_t *   )_node_tpool;                                       \
     321    11516451 :   fd_tpool_task_t task       = (fd_tpool_task_t)_task;                                             \
     322    11516451 :   ulong           wait_cnt   = 0UL;                                                                \
     323    11516451 :   ushort          wait_child[16];   /* Assumes tpool_cnt<=65536 */                                 \
     324    18083405 :   for(;;) {                                                                                        \
     325    18083405 :     ulong node_t_cnt = node_t1 - node_t0;                                                          \
     326    18083405 :     if( node_t_cnt<=1L ) break;                                                                    \
     327    18083405 :     ulong node_ts = node_t0 + fd_tpool_private_split( node_t_cnt );                                \
     328     8275671 :     fd_tpool_exec( node_tpool, node_ts, fd_tpool_private_exec_all_##style##_node,                  \
     329     8275671 :                    node_tpool, node_ts,node_t1, args, reduce,stride, l0,l1, _task,_tpool, t0,t1 ); \
     330     8275671 :     wait_child[ wait_cnt++ ] = (ushort)node_ts;                                                    \
     331     8275671 :     node_t1 = node_ts;                                                                             \
     332     8275671 :   }
     333             : 
     334             : #define FD_TPOOL_EXEC_ALL_IMPL_FTR                                                \
     335    19555430 :   while( wait_cnt ) fd_tpool_wait( node_tpool, (ulong)wait_child[ --wait_cnt ] ); \
     336    11516451 : }
     337             : 
     338      996292 : FD_TPOOL_EXEC_ALL_IMPL_HDR(rrobin)
     339      996292 :   ulong m_stride = t1-t0;
     340      996292 :   ulong m        = l0 + fd_ulong_min( node_t0-t0, ULONG_MAX-l0 ); /* robust against overflow */
     341    43806521 :   while( m<l1 ) {
     342    42810229 :     task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m,m+1UL, node_t0,node_t1 );
     343    42810229 :     m += fd_ulong_min( m_stride, ULONG_MAX-m ); /* robust against overflow */
     344    42810229 :   }
     345      996292 : FD_TPOOL_EXEC_ALL_IMPL_FTR
     346             : 
     347     1022198 : FD_TPOOL_EXEC_ALL_IMPL_HDR(block)
     348     1022198 :   ulong m0; ulong m1; FD_TPOOL_PARTITION( l0,l1,1UL, node_t0-t0,t1-t0, m0,m1 );
     349    46712895 :   for( ulong m=m0; m<m1; m++ ) task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m,m+1UL, node_t0,node_t1 );
     350     1022198 : FD_TPOOL_EXEC_ALL_IMPL_FTR
     351             : 
     352             : #if FD_HAS_ATOMIC
     353     1018409 : FD_TPOOL_EXEC_ALL_IMPL_HDR(taskq)
     354     1018409 :   ulong * l_next = (ulong *)_tpool;
     355     1018409 :   void  * tpool  = (void *)l_next[1];
     356   104967272 :   for(;;) {
     357             : 
     358             :     /* Note that we use an ATOMIC_CAS here instead of an
     359             :        ATOMIC_FETCH_AND_ADD to avoid overflow risks by having threads
     360             :        increment l0 into the tail.  ATOMIC_FETCH_AND_ADD could be used
     361             :        if there is no requirement to the effect that l1+FD_TILE_MAX does
     362             :        not overflow. */
     363             : 
     364   104967272 :     FD_COMPILER_MFENCE();
     365   104967272 :     ulong m0 = *l_next;
     366   104967272 :     FD_COMPILER_MFENCE();
     367             : 
     368   104967272 :     if( FD_UNLIKELY( m0>=l1 ) ) break;
     369   104192559 :     ulong m1 = m0+1UL;
     370   104192559 :     if( FD_UNLIKELY( FD_ATOMIC_CAS( l_next, m0, m1 )!=m0 ) ) {
     371    37639810 :       FD_SPIN_PAUSE();
     372    37639810 :       continue;
     373    37639810 :     }
     374             : 
     375    66552749 :     task( tpool,t0,t1, args,reduce,stride, l0,l1, m0,m1, node_t0,node_t1 );
     376    66552749 :   }
     377     1018409 : FD_TPOOL_EXEC_ALL_IMPL_FTR
     378             : #endif
     379             : 
     380     1011389 : FD_TPOOL_EXEC_ALL_IMPL_HDR(batch)
     381     1011389 :   ulong m0; ulong m1; FD_TPOOL_PARTITION( l0,l1,1UL, node_t0-t0,t1-t0, m0,m1 );
     382     1011389 :   task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m0,m1, node_t0,node_t1 );
     383     1011389 : FD_TPOOL_EXEC_ALL_IMPL_FTR
     384             : 
     385     7468163 : FD_TPOOL_EXEC_ALL_IMPL_HDR(raw)
     386     7468163 :   task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, l0,l1, node_t0,node_t1 );
     387     7468163 : FD_TPOOL_EXEC_ALL_IMPL_FTR
     388             : 
     389             : #undef FD_TPOOL_EXEC_ALL_IMPL_FTR
     390             : #undef FD_TPOOL_EXEC_ALL_IMPL_HDR
     391             : 
     392             : char const *
     393          18 : fd_tpool_worker_state_cstr( int state ) {
     394          18 :   switch( state ) {
     395           3 :   case FD_TPOOL_WORKER_STATE_BOOT: return "boot";
     396           3 :   case FD_TPOOL_WORKER_STATE_IDLE: return "idle";
     397           6 :   case FD_TPOOL_WORKER_STATE_EXEC: return "exec";
     398           3 :   case FD_TPOOL_WORKER_STATE_HALT: return "halt";
     399           3 :   default: break;
     400          18 :   }
     401           3 :   return "unknown";
     402          18 : }

Generated by: LCOV version 1.14