Line data Source code
1 : #include "fd_quic_stream_spam.h" 2 : 3 : void * 4 : fd_quic_stream_spam_new( void * mem, 5 : fd_quic_stream_gen_spam_t gen_fn, 6 3 : void * gen_ctx ){ 7 : 8 3 : if( FD_UNLIKELY( !mem ) ) { 9 0 : FD_LOG_WARNING(( "NULL mem" )); 10 0 : return NULL; 11 0 : } 12 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, alignof(fd_quic_stream_spam_t) ) ) ) { 13 0 : FD_LOG_WARNING(( "misaligned mem" )); 14 0 : return NULL; 15 0 : } 16 : 17 3 : fd_quic_stream_spam_t * spam = mem; 18 3 : memset( spam, 0, sizeof(fd_quic_stream_spam_t) ); 19 3 : spam->gen_fn = gen_fn; 20 3 : spam->gen_ctx = gen_ctx; 21 : 22 3 : return (void *)spam; 23 3 : } 24 : 25 : fd_quic_stream_spam_t * 26 3 : fd_quic_stream_spam_join( void * shspam ) { 27 : 28 3 : if( FD_UNLIKELY( !shspam ) ) { 29 0 : FD_LOG_WARNING(( "NULL shspam" )); 30 0 : return NULL; 31 0 : } 32 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shspam, alignof(fd_quic_stream_spam_t) ) ) ) { 33 0 : FD_LOG_WARNING(( "misaligned shspam" )); 34 0 : return NULL; 35 0 : } 36 : 37 3 : return (fd_quic_stream_spam_t *)shspam; 38 3 : } 39 : 40 : void * 41 0 : fd_quic_stream_spam_leave( fd_quic_stream_spam_t * spam ) { 42 0 : return (void *)spam; 43 0 : } 44 : 45 : void * 46 6 : fd_quic_stream_spam_delete( void * shspam ) { 47 6 : return shspam; 48 6 : } 49 : 50 : long 51 : fd_quic_stream_spam_service( fd_quic_conn_t * conn, 52 13131 : fd_quic_stream_spam_t * spam ) { 53 : 54 13131 : long streams_sent = 0L; 55 13131 : for(;;) { 56 : 57 13131 : fd_quic_stream_t * stream = spam->stream; 58 13131 : if( !stream ) stream = fd_quic_conn_new_stream( conn ); 59 13131 : if( !stream ) break; 60 13131 : ulong stream_id = stream->stream_id; 61 13131 : spam->stream = NULL; 62 : 63 : /* Generate stream payload */ 64 13131 : uchar payload_buf[ 4096UL ]; 65 13131 : fd_aio_pkt_info_t batch[1] = { { .buf=payload_buf, .buf_sz=4096UL } }; 66 13131 : spam->gen_fn( /* ctx */ NULL, &batch[ 0 ], stream ); 67 : 68 : /* Send data */ 69 13131 : int rc = fd_quic_stream_send( stream, payload_buf, batch->buf_sz, /* fin */ 1 ); 70 13131 : if( rc==FD_QUIC_SUCCESS ) { 71 : /* Stream send successful, close triggered via fin bit */ 72 : //FD_LOG_DEBUG(( "sent stream=%lu sz=%u", stream_id, batch->buf_sz )); 73 13131 : streams_sent++; 74 13131 : break; 75 13131 : } else { 76 0 : if( FD_UNLIKELY( rc!=FD_QUIC_SEND_ERR_FLOW ) ) { 77 0 : FD_LOG_WARNING(( "failed to send stream=%lu error=%d", stream_id, rc )); 78 0 : streams_sent = -1L; 79 : /* FIXME Ensure stuck stream is freed */ 80 0 : } else { 81 0 : spam->stream = stream; 82 0 : } 83 0 : goto fin; 84 0 : } 85 : 86 13131 : } 87 : 88 13131 : fin: 89 13131 : return streams_sent; 90 13131 : } 91 : 92 : void 93 : fd_quic_stream_spam_notify( fd_quic_stream_t * stream, 94 : void * stream_ctx, 95 13131 : int notify_type ) { 96 : 97 : /* Stream is about to be deallocated */ 98 : 99 13131 : (void)stream; 100 13131 : (void)notify_type; 101 : 102 : /* Nothing to do for completed streams */ 103 13131 : if( FD_LIKELY( !stream_ctx ) ) return; 104 : 105 : /* Stream still is still in pending stack. ctx points to position in 106 : pending list. Mark it as a "tombstone" so it's skipped when 107 : unwinding the pending stack. */ 108 0 : fd_quic_stream_t ** slot = (fd_quic_stream_t **)stream_ctx; 109 0 : *slot = NULL; 110 0 : } 111 : 112 : void 113 : fd_quic_stream_spam_gen( void * ctx, 114 : fd_aio_pkt_info_t * pkt, 115 43131 : fd_quic_stream_t * stream ) { 116 43131 : (void)ctx; 117 : 118 : /* Derive random bytes to send */ 119 43131 : fd_rng_t _rng[1]; 120 43131 : fd_rng_t * rng = fd_rng_join( fd_rng_new( _rng, (uint)stream->stream_id, 0UL ) ); 121 : 122 43131 : ulong data_sz = fd_rng_ulong_roll( rng, pkt->buf_sz ); 123 43131 : pkt->buf_sz = (ushort)data_sz; 124 : 125 12877257 : for( ulong i=0; i<fd_ulong_align_up( data_sz, 8UL ); i+=8 ) 126 12834126 : *(ulong *)( (uchar *)pkt->buf+i ) = fd_rng_ulong( rng ); 127 43131 : fd_rng_delete( fd_rng_leave( rng ) ); 128 43131 : } 129 :