Line data Source code
1 : #include "fd_fibre.h"
2 :
3 : #include <stdio.h>
4 : #include <stdlib.h>
5 : #include <string.h>
6 : #include <errno.h>
7 :
8 : fd_fibre_t * fd_fibre_current = NULL;
9 :
10 : /* top level function
11 : simply calls the user function then sets the done flag */
12 : void
13 39 : fd_fibre_run_fn( void * vp ) {
14 39 : fd_fibre_t * fibre = (fd_fibre_t*)vp;
15 :
16 : /* call user function */
17 39 : fibre->fn( fibre->arg );
18 :
19 : /* set done flag */
20 39 : fibre->done = 1;
21 39 : }
22 :
23 : /* footprint and alignment required for fd_fibre_init */
24 : ulong
25 3 : fd_fibre_init_footprint( void ) {
26 : /* size should be a multiple of the alignment */
27 3 : return fd_ulong_align_up( sizeof( fd_fibre_t ), FD_FIBRE_ALIGN );
28 3 : }
29 :
30 : ulong
31 3 : fd_fibre_init_align( void ) {
32 3 : return FD_FIBRE_ALIGN;
33 3 : }
34 :
35 : /* initialize main fibre */
36 : fd_fibre_t *
37 3 : fd_fibre_init( void * mem ) {
38 3 : fd_fibre_t * fibre = (fd_fibre_t*)mem;
39 :
40 3 : memset( fibre, 0, sizeof( *fibre ) );
41 :
42 3 : fibre->stack = NULL;
43 3 : fibre->stack_sz = 0;
44 :
45 3 : ucontext_t * ctx = &fibre->ctx;
46 :
47 3 : if( getcontext( ctx ) == -1 ) {
48 0 : fprintf( stderr, "getcontext failed with %d %s\n", errno, fd_io_strerror( errno ) );
49 0 : fflush( stderr );
50 0 : fd_fibre_abort();
51 0 : }
52 :
53 3 : fd_fibre_current = fibre;
54 :
55 3 : return fibre;
56 3 : }
57 :
58 : /* footprint and alignment required for fd_fibre_start */
59 : ulong
60 51 : fd_fibre_start_footprint( ulong stack_size ) {
61 51 : return fd_ulong_align_up( sizeof( fd_fibre_t ), FD_FIBRE_ALIGN ) +
62 51 : fd_ulong_align_up( stack_size, FD_FIBRE_ALIGN );
63 51 : }
64 :
65 51 : ulong fd_fibre_start_align( void ) {
66 51 : return FD_FIBRE_ALIGN;
67 51 : }
68 :
69 : /* start a fibre */
70 :
71 : /* this uses get/setcontext to start a new fibre
72 : the current fibre will continue running, and the new one will be
73 : inactive, and ready to switch to
74 : this is cooperative threading
75 : this fibre may be started on another thread */
76 : fd_fibre_t *
77 39 : fd_fibre_start( void * mem, ulong stack_sz, fd_fibre_fn_t fn, void * arg ) {
78 39 : if( fd_fibre_current == NULL ) {
79 0 : fprintf( stderr, "fd_fibre_init must be called before fd_fibre_start\n" );
80 0 : fflush( stderr );
81 0 : fd_fibre_abort();
82 0 : }
83 :
84 39 : ulong l_mem = (ulong)mem;
85 :
86 39 : void * stack = (void*)( l_mem +
87 39 : fd_ulong_align_up( sizeof( fd_fibre_t ), FD_FIBRE_ALIGN ) );
88 :
89 39 : fd_fibre_t * fibre = (fd_fibre_t*)mem;
90 :
91 39 : memset( fibre, 0, sizeof( *fibre ) );
92 :
93 : /* set the current value of stack and stack_sz */
94 39 : fibre->stack_sz = stack_sz;
95 39 : fibre->stack = stack;
96 :
97 39 : fibre->fn = fn;
98 39 : fibre->arg = arg;
99 :
100 : /* start with the current fibre */
101 39 : memcpy( &fibre->ctx, &fd_fibre_current->ctx, sizeof( fibre->ctx ) );
102 :
103 : /* set the successor context, for use in the event the fibre terminates */
104 39 : fibre->ctx.uc_link = &fd_fibre_current->ctx;
105 :
106 : /* set the stack for the new fibre */
107 39 : fibre->ctx.uc_stack.ss_sp = stack;
108 39 : fibre->ctx.uc_stack.ss_size = stack_sz;
109 :
110 : /* make a new context */
111 39 : makecontext( &fibre->ctx, (void(*)(void))fd_fibre_run_fn, 1, fibre );
112 :
113 39 : return fibre;
114 39 : }
115 :
116 : /* free a fibre
117 :
118 : this frees up the resources of a fibre */
119 : void
120 42 : fd_fibre_free( fd_fibre_t * fibre ) {
121 : /* nothing to do, as caller owns memory */
122 42 : (void)fibre;
123 42 : }
124 :
125 : /* switch execution to a fibre
126 :
127 : switches execution to "swap_to"
128 : "swap_to" must have been created with either fd_fibre_init, or fd_fibre_start */
129 : void
130 16071 : fd_fibre_swap( fd_fibre_t * swap_to ) {
131 16071 : if( swap_to == fd_fibre_current ) {
132 0 : return;
133 0 : }
134 :
135 16071 : if( swap_to->done ) return;
136 :
137 : /* set the context to return to as the current context */
138 16071 : swap_to->ctx.uc_link = &fd_fibre_current->ctx;
139 :
140 : /* store current fibre for popping */
141 16071 : fd_fibre_t * fibre_pop = fd_fibre_current;
142 :
143 : /* set fd_fibre_current for next execution context */
144 16071 : fd_fibre_current = swap_to;
145 :
146 : /* switch to new fibre */
147 16071 : if( swapcontext( &fibre_pop->ctx, &swap_to->ctx ) == -1 ) {
148 0 : fprintf( stderr, "swapcontext failed with %d %s\n", errno, fd_io_strerror( errno ) );
149 0 : fflush( stdout );
150 0 : fd_fibre_abort();
151 0 : }
152 :
153 : /* return value of fibre to its previous value */
154 16071 : fd_fibre_current = fibre_pop;
155 16071 : }
156 :
157 : /* set a clock for scheduler */
158 : long (*fd_fibre_clock)(void);
159 :
160 : /* fibre for scheduler */
161 : fd_fibre_t * fd_fibre_scheduler = NULL;
162 :
163 : void
164 3 : fd_fibre_set_clock( long (*clock)(void) ) {
165 3 : fd_fibre_clock = clock;
166 3 : }
167 :
168 : /* yield current fibre
169 : allows another fibre to run */
170 : void
171 0 : fd_fibre_yield( void ) {
172 : /* same as yield */
173 0 : fd_fibre_wait(0);
174 0 : }
175 :
176 : /* stops running currently executing fibre for a period */
177 : void
178 285 : fd_fibre_wait( long wait_ns ) {
179 : /* cannot wait if no scheduler */
180 285 : if( fd_fibre_scheduler == NULL ) return;
181 :
182 : /* calc wake time */
183 285 : long wake = fd_fibre_clock() + ( wait_ns < 1 ? 1 : wait_ns );
184 :
185 285 : fd_fibre_current->sched_time = wake;
186 :
187 285 : fd_fibre_schedule( fd_fibre_current );
188 :
189 : /* switch to the fibre scheduler */
190 285 : fd_fibre_swap( fd_fibre_scheduler );
191 285 : }
192 :
193 : /* stops running currently executing fibre until a particular
194 : time */
195 : void
196 3063 : fd_fibre_wait_until( long resume_time_ns ) {
197 3063 : long now = fd_fibre_clock();
198 3063 : if( resume_time_ns <= now ) {
199 : /* ensure that another fibre gets a chance at some point */
200 0 : resume_time_ns = now + 1;
201 0 : }
202 :
203 : /* cannot wait if no scheduler */
204 3063 : if( fd_fibre_scheduler == NULL ) return;
205 :
206 3063 : fd_fibre_current->sched_time = resume_time_ns;
207 :
208 3063 : fd_fibre_schedule( fd_fibre_current );
209 :
210 : /* switch to the fibre scheduler */
211 3063 : fd_fibre_swap( fd_fibre_scheduler );
212 3063 : }
213 :
214 : /* wakes another fibre */
215 : void
216 0 : fd_fibre_wake( fd_fibre_t * fibre ) {
217 0 : if( fd_fibre_current == fibre ) return;
218 :
219 0 : fibre->sched_time = fd_fibre_clock();
220 0 : fd_fibre_schedule( fibre );
221 0 : }
222 :
223 : /* sentinel for run queue */
224 : fd_fibre_t fd_fibre_schedule_queue[1] = {{ .sentinel = 1, .next = fd_fibre_schedule_queue }};
225 :
226 : /* add a fibre to the schedule */
227 : void
228 9600 : fd_fibre_schedule( fd_fibre_t * fibre ) {
229 9600 : if( fd_fibre_clock == NULL ) fd_fibre_abort();
230 :
231 9600 : fd_fibre_t * cur_fibre = fd_fibre_schedule_queue;
232 :
233 : /* remove from schedule */
234 20349 : while(1) {
235 20349 : if( cur_fibre->next == fibre ) {
236 3102 : cur_fibre->next = fibre->next;
237 3102 : }
238 :
239 20349 : cur_fibre = cur_fibre->next;
240 20349 : if( cur_fibre->sentinel ) break;
241 20349 : }
242 :
243 : /* add into schedule at appropriate place for wake time */
244 9600 : fd_fibre_t * prior = fd_fibre_schedule_queue;
245 9600 : long wake = fibre->sched_time;
246 :
247 9600 : cur_fibre = prior->next;
248 13065 : while( !cur_fibre->sentinel && wake > cur_fibre->sched_time ) {
249 3465 : prior = cur_fibre;
250 3465 : cur_fibre = cur_fibre->next;
251 3465 : }
252 :
253 : /* insert into schedule */
254 9600 : fibre->next = cur_fibre;
255 9600 : prior->next = fibre;
256 9600 : }
257 :
258 : /* run the current schedule
259 :
260 : returns the time of the next ready fibre
261 : returns -1 if there are no fibres in the schedule */
262 : long
263 3261 : fd_fibre_schedule_run( void ) {
264 : /* set the currently running fibre as the scheduler */
265 3261 : fd_fibre_scheduler = fd_fibre_current;
266 :
267 9759 : while(1) {
268 9759 : fd_fibre_t * cur_fibre = fd_fibre_schedule_queue->next;
269 9759 : if( cur_fibre->sentinel ) return -1;
270 :
271 9750 : long now = fd_fibre_clock();
272 9750 : if( cur_fibre->sched_time > now ) {
273 : /* nothing more to do yet */
274 3252 : return cur_fibre->sched_time;
275 3252 : }
276 :
277 : /* remove from schedule */
278 6498 : fd_fibre_schedule_queue->next = cur_fibre->next;
279 :
280 : /* if fibre done, skip execution */
281 6498 : if( !cur_fibre->done ) {
282 6492 : fd_fibre_swap( cur_fibre );
283 6492 : }
284 6498 : }
285 :
286 0 : return -1;
287 3261 : }
288 :
289 : ulong
290 0 : fd_fibre_pipe_align( void ) {
291 0 : return alignof( fd_fibre_pipe_t );
292 0 : }
293 :
294 : ulong
295 0 : fd_fibre_pipe_footprint( ulong entries ) {
296 0 : return sizeof( fd_fibre_pipe_t ) + entries * sizeof( ulong );
297 0 : }
298 :
299 : fd_fibre_pipe_t *
300 12 : fd_fibre_pipe_new( void * mem, ulong entries ) {
301 12 : fd_fibre_pipe_t * pipe = (fd_fibre_pipe_t*)mem;
302 :
303 12 : ulong * entries_array = (ulong*)&pipe[1];
304 :
305 12 : pipe->cap = entries;
306 12 : pipe->head = 0UL;
307 12 : pipe->tail = 0UL;
308 12 : pipe->reader = NULL;
309 12 : pipe->writer = NULL;
310 12 : pipe->entries = entries_array;
311 :
312 12 : return pipe;
313 12 : }
314 :
315 : int
316 3108 : fd_fibre_pipe_write( fd_fibre_pipe_t * pipe, ulong value, long timeout ) {
317 3108 : fd_fibre_t * prev_writer = pipe->writer;
318 :
319 3108 : ulong used = 0;
320 3108 : ulong free = 0;
321 :
322 3108 : long timeout_ts = fd_fibre_clock() + timeout;
323 :
324 : /* loop until either there is space for a new value to be
325 : written, or until we time out */
326 3108 : while(1) {
327 3108 : used = pipe->head - pipe->tail;
328 3108 : free = pipe->cap - used;
329 :
330 : /* if we have free space, break out of loop */
331 3108 : if( free ) break;
332 :
333 : /* we have no free space within which to write, so wait */
334 :
335 : /* update the writer to ourself */
336 0 : pipe->writer = fd_fibre_current;
337 :
338 : /* did we time out? */
339 0 : if( fd_fibre_clock() >= timeout_ts ) {
340 : /* restore writer before returning */
341 0 : pipe->writer = prev_writer;
342 :
343 : /* return timeout */
344 0 : return 1;
345 0 : }
346 :
347 : /* wait */
348 :
349 : /* set current fibre as the writer */
350 0 : pipe->writer = fd_fibre_current;
351 :
352 : /* set wakeup time */
353 0 : fd_fibre_current->sched_time = timeout_ts;
354 0 : fd_fibre_schedule( fd_fibre_current );
355 :
356 : /* switch to the scheduler */
357 0 : fd_fibre_swap( fd_fibre_scheduler );
358 0 : }
359 :
360 : /* we have free space, so store the value */
361 3108 : pipe->entries[pipe->head % pipe->cap] = value;
362 :
363 : /* increment the head */
364 3108 : pipe->head++;
365 :
366 : /* wake up one waiting reader, if any */
367 3108 : if( pipe->reader ) {
368 : /* ensure we are scheduled */
369 3108 : fd_fibre_current->sched_time = fd_fibre_clock();;
370 3108 : fd_fibre_schedule( fd_fibre_current );
371 :
372 3108 : fd_fibre_swap( pipe->reader );
373 3108 : }
374 :
375 : /* restore writer */
376 3108 : pipe->writer = prev_writer;
377 :
378 : /* return successful write */
379 3108 : return 0;
380 3108 : }
381 :
382 : int
383 3114 : fd_fibre_pipe_read( fd_fibre_pipe_t * pipe, ulong *value, long timeout ) {
384 3114 : fd_fibre_t * prev_reader = pipe->reader;
385 :
386 3114 : ulong used = 0;
387 :
388 3114 : long timeout_ts = fd_fibre_clock() + timeout;
389 :
390 : /* loop until we have a value to be read, or until we time out */
391 6228 : while(1) {
392 6228 : used = pipe->head - pipe->tail;
393 :
394 : /* is data available? */
395 6228 : if( used ) break;
396 :
397 : /* no data available, so wait */
398 :
399 : /* update the reader */
400 3120 : pipe->reader = fd_fibre_current;
401 :
402 : /* did we time out? */
403 3120 : if( fd_fibre_clock() >= timeout_ts ) {
404 : /* restore the reader before returning */
405 6 : pipe->reader = prev_reader;
406 :
407 : /* return timeout */
408 6 : return 1;
409 6 : }
410 :
411 : /* wait */
412 :
413 : /* set current fibre as the reader */
414 3114 : pipe->reader = fd_fibre_current;
415 :
416 : /* set wakeup time */
417 3114 : fd_fibre_current->sched_time = timeout_ts;
418 3114 : fd_fibre_schedule( fd_fibre_current );
419 :
420 : /* switch to the scheduler */
421 3114 : fd_fibre_swap( fd_fibre_scheduler );
422 3114 : }
423 :
424 : /* we have data to provide, so retrieve it */
425 3108 : *value = pipe->entries[pipe->tail % pipe->cap];
426 :
427 : /* increment the tail */
428 3108 : pipe->tail++;
429 :
430 : /* wake up one waiting writer, if any */
431 3108 : if( pipe->writer ) {
432 : /* ensure we are scheduled */
433 0 : fd_fibre_current->sched_time = fd_fibre_clock();;
434 0 : fd_fibre_schedule( fd_fibre_current );
435 :
436 0 : fd_fibre_swap( pipe->writer );
437 0 : }
438 :
439 : /* restore reader */
440 3108 : pipe->reader = prev_reader;
441 :
442 : /* return success */
443 3108 : return 0;
444 3114 : }
|