LCOV - code coverage report
Current view: top level - util/fibre - fd_fibre.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 169 209 80.9 %
Date: 2025-01-08 12:08:44 Functions: 18 21 85.7 %

          Line data    Source code
       1             : #include "fd_fibre.h"
       2             : 
       3             : #include <stdio.h>
       4             : #include <stdlib.h>
       5             : #include <string.h>
       6             : #include <errno.h>
       7             : 
       8             : fd_fibre_t * fd_fibre_current = NULL;
       9             : 
      10             : /* top level function
      11             :    simply calls the user function then sets the done flag */
      12             : void
      13          45 : fd_fibre_run_fn( void * vp ) {
      14          45 :   fd_fibre_t * fibre = (fd_fibre_t*)vp;
      15             : 
      16             :   /* call user function */
      17          45 :   fibre->fn( fibre->arg );
      18             : 
      19             :   /* set done flag */
      20          45 :   fibre->done = 1;
      21          45 : }
      22             : 
      23             : /* footprint and alignment required for fd_fibre_init */
      24             : ulong
      25           6 : fd_fibre_init_footprint( void ) {
      26             :   /* size should be a multiple of the alignment */
      27           6 :   return fd_ulong_align_up( sizeof( fd_fibre_t ), FD_FIBRE_ALIGN );
      28           6 : }
      29             : 
      30             : ulong
      31           6 : fd_fibre_init_align( void ) {
      32           6 :   return FD_FIBRE_ALIGN;
      33           6 : }
      34             : 
      35             : /* initialize main fibre */
      36             : fd_fibre_t *
      37           6 : fd_fibre_init( void * mem ) {
      38           6 :   fd_fibre_t * fibre = (fd_fibre_t*)mem;
      39             : 
      40           6 :   memset( fibre, 0, sizeof( *fibre ) );
      41             : 
      42           6 :   fibre->stack    = NULL;
      43           6 :   fibre->stack_sz = 0;
      44             : 
      45           6 :   ucontext_t * ctx = &fibre->ctx;
      46             : 
      47           6 :   if( getcontext( ctx ) == -1 ) {
      48           0 :     fprintf( stderr, "getcontext failed with %d %s\n", errno, fd_io_strerror( errno ) );
      49           0 :     fflush( stderr );
      50           0 :     fd_fibre_abort();
      51           0 :   }
      52             : 
      53           6 :   fd_fibre_current = fibre;
      54             : 
      55           6 :   return fibre;
      56           6 : }
      57             : 
      58             : /* footprint and alignment required for fd_fibre_start */
      59             : ulong
      60          57 : fd_fibre_start_footprint( ulong stack_size ) {
      61          57 :   return fd_ulong_align_up( sizeof( fd_fibre_t ), FD_FIBRE_ALIGN  ) +
      62          57 :     fd_ulong_align_up( stack_size, FD_FIBRE_ALIGN );
      63          57 : }
      64             : 
      65          57 : ulong fd_fibre_start_align( void ) {
      66          57 :   return FD_FIBRE_ALIGN;
      67          57 : }
      68             : 
      69             : /* start a fibre */
      70             : 
      71             : /* this uses get/setcontext to start a new fibre
      72             :    the current fibre will continue running, and the new one will be
      73             :    inactive, and ready to switch to
      74             :    this is cooperative threading
      75             :      this fibre may be started on another thread */
      76             : fd_fibre_t *
      77          45 : fd_fibre_start( void * mem, ulong stack_sz, fd_fibre_fn_t fn, void * arg ) {
      78          45 :   if( fd_fibre_current == NULL ) {
      79           0 :     fprintf( stderr, "fd_fibre_init must be called before fd_fibre_start\n" );
      80           0 :     fflush( stderr );
      81           0 :     fd_fibre_abort();
      82           0 :   }
      83             : 
      84          45 :   ulong l_mem = (ulong)mem;
      85             : 
      86          45 :   void * stack = (void*)( l_mem +
      87          45 :       fd_ulong_align_up( sizeof( fd_fibre_t ), FD_FIBRE_ALIGN  ) );
      88             : 
      89          45 :   fd_fibre_t * fibre = (fd_fibre_t*)mem;
      90             : 
      91          45 :   memset( fibre, 0, sizeof( *fibre ) );
      92             : 
      93             :   /* set the current value of stack and stack_sz */
      94          45 :   fibre->stack_sz = stack_sz;
      95          45 :   fibre->stack    = stack;
      96             : 
      97          45 :   fibre->fn       = fn;
      98          45 :   fibre->arg      = arg;
      99             : 
     100             :   /* start with the current fibre */
     101          45 :   memcpy( &fibre->ctx, &fd_fibre_current->ctx, sizeof( fibre->ctx ) );
     102             : 
     103             :   /* set the successor context, for use in the event the fibre terminates */
     104          45 :   fibre->ctx.uc_link = &fd_fibre_current->ctx;
     105             : 
     106             :   /* set the stack for the new fibre */
     107          45 :   fibre->ctx.uc_stack.ss_sp   = stack;
     108          45 :   fibre->ctx.uc_stack.ss_size = stack_sz;
     109             : 
     110             :   /* make a new context */
     111          45 :   makecontext( &fibre->ctx, (void(*)(void))fd_fibre_run_fn, 1, fibre );
     112             : 
     113          45 :   return fibre;
     114          45 : }
     115             : 
     116             : /* free a fibre
     117             : 
     118             :    this frees up the resources of a fibre */
     119             : void
     120          42 : fd_fibre_free( fd_fibre_t * fibre ) {
     121             :   /* nothing to do, as caller owns memory */
     122          42 :   (void)fibre;
     123          42 : }
     124             : 
     125             : /* switch execution to a fibre
     126             : 
     127             :    switches execution to "swap_to"
     128             :    "swap_to" must have been created with either fd_fibre_init, or fd_fibre_start */
     129             : void
     130      316377 : fd_fibre_swap( fd_fibre_t * swap_to ) {
     131      316377 :   if( swap_to == fd_fibre_current ) {
     132           0 :     return;
     133           0 :   }
     134             : 
     135      316377 :   if( swap_to->done ) return;
     136             : 
     137             :   /* set the context to return to as the current context */
     138      316377 :   swap_to->ctx.uc_link = &fd_fibre_current->ctx;
     139             : 
     140             :   /* store current fibre for popping */
     141      316377 :   fd_fibre_t * fibre_pop = fd_fibre_current;
     142             : 
     143             :   /* set fd_fibre_current for next execution context */
     144      316377 :   fd_fibre_current = swap_to;
     145             : 
     146             :   /* switch to new fibre */
     147      316377 :   if( swapcontext( &fibre_pop->ctx, &swap_to->ctx ) == -1 ) {
     148           0 :     fprintf( stderr, "swapcontext failed with %d %s\n", errno, fd_io_strerror( errno ) );
     149           0 :     fflush( stdout );
     150           0 :     fd_fibre_abort();
     151           0 :   }
     152             : 
     153             :   /* return value of fibre to its previous value */
     154      316377 :   fd_fibre_current = fibre_pop;
     155      316377 : }
     156             : 
     157             : /* set a clock for scheduler */
     158             : long (*fd_fibre_clock)(void);
     159             : 
     160             : /* fibre for scheduler */
     161             : fd_fibre_t * fd_fibre_scheduler = NULL;
     162             : 
     163             : void
     164           6 : fd_fibre_set_clock( long (*clock)(void) ) {
     165           6 :   fd_fibre_clock = clock;
     166           6 : }
     167             : 
     168             : /* yield current fibre
     169             :    allows another fibre to run */
     170             : void
     171           3 : fd_fibre_yield( void ) {
     172             :   /* same as yield */
     173           3 :   fd_fibre_wait(0);
     174           3 : }
     175             : 
     176             : /* stops running currently executing fibre for a period */
     177             : void
     178         288 : fd_fibre_wait( long wait_ns ) {
     179             :   /* cannot wait if no scheduler */
     180         288 :   if( fd_fibre_scheduler == NULL ) return;
     181             : 
     182             :   /* calc wake time */
     183         288 :   long wake = fd_fibre_clock() + ( wait_ns < 1 ? 1 : wait_ns );
     184             : 
     185         288 :   fd_fibre_current->sched_time = wake;
     186             : 
     187         288 :   fd_fibre_schedule( fd_fibre_current );
     188             : 
     189             :   /* switch to the fibre scheduler */
     190         288 :   fd_fibre_swap( fd_fibre_scheduler );
     191         288 : }
     192             : 
     193             : /* stops running currently executing fibre until a particular
     194             :    time */
     195             : void
     196      153210 : fd_fibre_wait_until( long resume_time_ns ) {
     197      153210 :   long now = fd_fibre_clock();
     198      153210 :   if( resume_time_ns <= now ) {
     199             :     /* ensure that another fibre gets a chance at some point */
     200       48048 :     resume_time_ns = now + 1;
     201       48048 :   }
     202             : 
     203             :   /* cannot wait if no scheduler */
     204      153210 :   if( fd_fibre_scheduler == NULL ) return;
     205             : 
     206      153210 :   fd_fibre_current->sched_time = resume_time_ns;
     207             : 
     208      153210 :   fd_fibre_schedule( fd_fibre_current );
     209             : 
     210             :   /* switch to the fibre scheduler */
     211      153210 :   fd_fibre_swap( fd_fibre_scheduler );
     212      153210 : }
     213             : 
     214             : /* wakes another fibre */
     215             : void
     216           0 : fd_fibre_wake( fd_fibre_t * fibre ) {
     217           0 :   if( fd_fibre_current == fibre ) return;
     218             : 
     219           0 :   fibre->sched_time = fd_fibre_clock();
     220           0 :   fd_fibre_schedule( fibre );
     221           0 : }
     222             : 
     223             : /* sentinel for run queue */
     224             : fd_fibre_t fd_fibre_schedule_queue[1] = {{ .sentinel = 1, .next = fd_fibre_schedule_queue }};
     225             : 
     226             : /* add a fibre to the schedule */
     227             : void
     228      159756 : fd_fibre_schedule( fd_fibre_t * fibre ) {
     229      159756 :   if( fd_fibre_clock == NULL ) fd_fibre_abort();
     230             : 
     231      159756 :   fd_fibre_t * cur_fibre = fd_fibre_schedule_queue;
     232             : 
     233             :   /* remove from schedule */
     234      320658 :   while(1) {
     235      320658 :     if( cur_fibre->next == fibre ) {
     236        3102 :       cur_fibre->next = fibre->next;
     237        3102 :     }
     238             : 
     239      320658 :     cur_fibre = cur_fibre->next;
     240      320658 :     if( cur_fibre->sentinel ) break;
     241      320658 :   }
     242             : 
     243             :   /* add into schedule at appropriate place for wake time */
     244      159756 :   fd_fibre_t * prior = fd_fibre_schedule_queue;
     245      159756 :   long wake = fibre->sched_time;
     246             : 
     247      159756 :   cur_fibre = prior->next;
     248      259326 :   while( !cur_fibre->sentinel && wake > cur_fibre->sched_time ) {
     249       99570 :     prior     = cur_fibre;
     250       99570 :     cur_fibre = cur_fibre->next;
     251       99570 :   }
     252             : 
     253             :   /* insert into schedule */
     254      159756 :   fibre->next = cur_fibre;
     255      159756 :   prior->next = fibre;
     256      159756 : }
     257             : 
     258             : /* run the current schedule
     259             : 
     260             :    returns the time of the next ready fibre
     261             :    returns -1 if there are no fibres in the schedule */
     262             : long
     263      105513 : fd_fibre_schedule_run( void ) {
     264             :   /* set the currently running fibre as the scheduler */
     265      105513 :   fd_fibre_scheduler = fd_fibre_current;
     266             : 
     267      262167 :   while(1) {
     268      262167 :     fd_fibre_t * cur_fibre = fd_fibre_schedule_queue->next;
     269      262167 :     if( cur_fibre->sentinel ) return -1;
     270             : 
     271      262155 :     long      now       = fd_fibre_clock();
     272      262155 :     if( cur_fibre->sched_time > now ) {
     273             :       /* nothing more to do yet */
     274      105501 :       return cur_fibre->sched_time;
     275      105501 :     }
     276             : 
     277             :     /* remove from schedule */
     278      156654 :     fd_fibre_schedule_queue->next = cur_fibre->next;
     279             : 
     280             :     /* if fibre done, skip execution */
     281      156654 :     if( !cur_fibre->done ) {
     282      156648 :       fd_fibre_swap( cur_fibre );
     283      156648 :     }
     284      156654 :   }
     285             : 
     286           0 :   return -1;
     287      105513 : }
     288             : 
     289             : ulong
     290           0 : fd_fibre_pipe_align( void ) {
     291           0 :   return alignof( fd_fibre_pipe_t );
     292           0 : }
     293             : 
     294             : ulong
     295           0 : fd_fibre_pipe_footprint( ulong entries ) {
     296           0 :   return sizeof( fd_fibre_pipe_t ) + entries * sizeof( ulong );
     297           0 : }
     298             : 
     299             : fd_fibre_pipe_t *
     300          12 : fd_fibre_pipe_new( void * mem, ulong entries ) {
     301          12 :   fd_fibre_pipe_t * pipe = (fd_fibre_pipe_t*)mem;
     302             : 
     303          12 :   ulong * entries_array = (ulong*)&pipe[1];
     304             : 
     305          12 :   pipe->cap     = entries;
     306          12 :   pipe->head    = 0UL;
     307          12 :   pipe->tail    = 0UL;
     308          12 :   pipe->reader  = NULL;
     309          12 :   pipe->writer  = NULL;
     310          12 :   pipe->entries = entries_array;
     311             : 
     312          12 :   return pipe;
     313          12 : }
     314             : 
     315             : int
     316        3108 : fd_fibre_pipe_write( fd_fibre_pipe_t * pipe, ulong value, long timeout ) {
     317        3108 :   fd_fibre_t * prev_writer = pipe->writer;
     318             : 
     319        3108 :   ulong used = 0;
     320        3108 :   ulong free = 0;
     321             : 
     322        3108 :   long timeout_ts = fd_fibre_clock() + timeout;
     323             : 
     324             :   /* loop until either there is space for a new value to be
     325             :      written, or until we time out */
     326        3108 :   while(1) {
     327        3108 :     used = pipe->head - pipe->tail;
     328        3108 :     free = pipe->cap - used;
     329             : 
     330             :     /* if we have free space, break out of loop */
     331        3108 :     if( free ) break;
     332             : 
     333             :     /* we have no free space within which to write, so wait */
     334             : 
     335             :     /* update the writer to ourself */
     336           0 :     pipe->writer = fd_fibre_current;
     337             : 
     338             :     /* did we time out? */
     339           0 :     if( fd_fibre_clock() >= timeout_ts ) {
     340             :       /* restore writer before returning */
     341           0 :       pipe->writer = prev_writer;
     342             : 
     343             :       /* return timeout */
     344           0 :       return 1;
     345           0 :     }
     346             : 
     347             :     /* wait */
     348             : 
     349             :     /* set current fibre as the writer */
     350           0 :     pipe->writer = fd_fibre_current;
     351             : 
     352             :     /* set wakeup time */
     353           0 :     fd_fibre_current->sched_time = timeout_ts;
     354           0 :     fd_fibre_schedule( fd_fibre_current );
     355             : 
     356             :     /* switch to the scheduler */
     357           0 :     fd_fibre_swap( fd_fibre_scheduler );
     358           0 :   }
     359             : 
     360             :   /* we have free space, so store the value */
     361        3108 :   pipe->entries[pipe->head % pipe->cap] = value;
     362             : 
     363             :   /* increment the head */
     364        3108 :   pipe->head++;
     365             : 
     366             :   /* wake up one waiting reader, if any */
     367        3108 :   if( pipe->reader ) {
     368             :     /* ensure we are scheduled */
     369        3108 :     fd_fibre_current->sched_time = fd_fibre_clock();;
     370        3108 :     fd_fibre_schedule( fd_fibre_current );
     371             : 
     372        3108 :     fd_fibre_swap( pipe->reader );
     373        3108 :   }
     374             : 
     375             :   /* restore writer */
     376        3108 :   pipe->writer = prev_writer;
     377             : 
     378             :   /* return successful write */
     379        3108 :   return 0;
     380        3108 : }
     381             : 
     382             : int
     383        3114 : fd_fibre_pipe_read( fd_fibre_pipe_t * pipe, ulong *value, long timeout ) {
     384        3114 :   fd_fibre_t * prev_reader = pipe->reader;
     385             : 
     386        3114 :   ulong used = 0;
     387             : 
     388        3114 :   long timeout_ts = fd_fibre_clock() + timeout;
     389             : 
     390             :   /* loop until we have a value to be read, or until we time out */
     391        6228 :   while(1) {
     392        6228 :     used = pipe->head - pipe->tail;
     393             : 
     394             :     /* is data available? */
     395        6228 :     if( used ) break;
     396             : 
     397             :     /* no data available, so wait */
     398             : 
     399             :     /* update the reader */
     400        3120 :     pipe->reader = fd_fibre_current;
     401             : 
     402             :     /* did we time out? */
     403        3120 :     if( fd_fibre_clock() >= timeout_ts ) {
     404             :       /* restore the reader before returning */
     405           6 :       pipe->reader = prev_reader;
     406             : 
     407             :       /* return timeout */
     408           6 :       return 1;
     409           6 :     }
     410             : 
     411             :     /* wait */
     412             : 
     413             :     /* set current fibre as the reader */
     414        3114 :     pipe->reader = fd_fibre_current;
     415             : 
     416             :     /* set wakeup time */
     417        3114 :     fd_fibre_current->sched_time = timeout_ts;
     418        3114 :     fd_fibre_schedule( fd_fibre_current );
     419             : 
     420             :     /* switch to the scheduler */
     421        3114 :     fd_fibre_swap( fd_fibre_scheduler );
     422        3114 :   }
     423             : 
     424             :   /* we have data to provide, so retrieve it */
     425        3108 :   *value = pipe->entries[pipe->tail % pipe->cap];
     426             : 
     427             :   /* increment the tail */
     428        3108 :   pipe->tail++;
     429             : 
     430             :   /* wake up one waiting writer, if any */
     431        3108 :   if( pipe->writer ) {
     432             :     /* ensure we are scheduled */
     433           0 :     fd_fibre_current->sched_time = fd_fibre_clock();;
     434           0 :     fd_fibre_schedule( fd_fibre_current );
     435             : 
     436           0 :     fd_fibre_swap( pipe->writer );
     437           0 :   }
     438             : 
     439             :   /* restore reader */
     440        3108 :   pipe->reader = prev_reader;
     441             : 
     442             :   /* return success */
     443        3108 :   return 0;
     444        3114 : }

Generated by: LCOV version 1.14