Skip to content

Commit

Permalink
add timeout handling to QuicStream (dotnet#56060)
Browse files Browse the repository at this point in the history
* add timeout handling to QuicStream

* feedback from review

* fix bad resolve

* feedback from review
  • Loading branch information
wfurt authored Aug 11, 2021
1 parent 4ea7dc4 commit a33dbf3
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 3 deletions.
3 changes: 3 additions & 0 deletions src/libraries/System.Net.Quic/ref/System.Net.Quic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ internal QuicStream() { }
public override bool CanRead { get { throw null; } }
public override bool CanSeek { get { throw null; } }
public override bool CanWrite { get { throw null; } }
public override bool CanTimeout { get { throw null; } }
public override long Length { get { throw null; } }
public override long Position { get { throw null; } set { } }
public long StreamId { get { throw null; } }
Expand All @@ -101,6 +102,7 @@ public override void Flush() { }
public override int Read(System.Span<byte> buffer) { throw null; }
public override System.Threading.Tasks.Task<int> ReadAsync(byte[] buffer, int offset, int count, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.ValueTask<int> ReadAsync(System.Memory<byte> buffer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override int ReadTimeout { get { throw null; } set { } }
public override long Seek(long offset, System.IO.SeekOrigin origin) { throw null; }
public override void SetLength(long value) { }
public void Shutdown() { }
Expand All @@ -114,6 +116,7 @@ public override void Write(System.ReadOnlySpan<byte> buffer) { }
public override System.Threading.Tasks.ValueTask WriteAsync(System.ReadOnlyMemory<byte> buffer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public System.Threading.Tasks.ValueTask WriteAsync(System.ReadOnlyMemory<System.ReadOnlyMemory<byte>> buffers, bool endStream, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public System.Threading.Tasks.ValueTask WriteAsync(System.ReadOnlyMemory<System.ReadOnlyMemory<byte>> buffers, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override int WriteTimeout { get { throw null; } set { } }
}
public partial class QuicStreamAbortedException : System.Net.Quic.QuicException
{
Expand Down
6 changes: 6 additions & 0 deletions src/libraries/System.Net.Quic/src/Resources/Strings.resx
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@
<data name="net_quic_writing_notallowed" xml:space="preserve">
<value>Writing is not allowed on stream.</value>
</data>
<data name="net_quic_timeout_use_gt_zero" xml:space="preserve">
<value>Timeout can only be set to 'System.Threading.Timeout.Infinite' or a value &gt; 0.</value>
</data>
<data name="net_quic_timeout" xml:space="preserve">
<value>Connection timed out.</value>
</data>
<data name="net_quic_ssl_option" xml:space="preserve">
<value>'{0}' is not supported by System.Net.Quic.</value>
</data>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,20 @@ internal override long StreamId

private StreamBuffer? ReadStreamBuffer => _isInitiator ? _streamState._inboundStreamBuffer : _streamState._outboundStreamBuffer;

internal override bool CanTimeout => false;

internal override int ReadTimeout
{
get => throw new InvalidOperationException();
set => throw new InvalidOperationException();
}

internal override int WriteTimeout
{
get => throw new InvalidOperationException();
set => throw new InvalidOperationException();
}

internal override bool CanRead => !_disposed && ReadStreamBuffer is not null;

internal override int Read(Span<byte> buffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net.Quic.Implementations.MsQuic.Internal;
using System.Runtime.ExceptionServices;
using System.Runtime.InteropServices;
Expand Down Expand Up @@ -192,6 +193,47 @@ internal MsQuicStream(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_F

internal override bool CanWrite => _disposed == 0 && _canWrite;

internal override bool CanTimeout => true;

private int _readTimeout = Timeout.Infinite;

internal override int ReadTimeout
{
get
{
ThrowIfDisposed();
return _readTimeout;
}
set
{
ThrowIfDisposed();
if (value <= 0 && value != System.Threading.Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(value), SR.net_quic_timeout_use_gt_zero);
}
_readTimeout = value;
}
}

private int _writeTimeout = Timeout.Infinite;
internal override int WriteTimeout
{
get
{
ThrowIfDisposed();
return _writeTimeout;
}
set
{
ThrowIfDisposed();
if (value <= 0 && value != System.Threading.Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(value), SR.net_quic_timeout_use_gt_zero);
}
_writeTimeout = value;
}
}

internal override long StreamId
{
get
Expand Down Expand Up @@ -404,7 +446,6 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
{
var state = (State)obj!;
bool completePendingRead;

lock (state)
{
completePendingRead = state.ReadState == ReadState.PendingRead;
Expand Down Expand Up @@ -593,24 +634,54 @@ internal override int Read(Span<byte> buffer)
{
ThrowIfDisposed();
byte[] rentedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
CancellationTokenSource? cts = null;
try
{
int readLength = ReadAsync(new Memory<byte>(rentedBuffer, 0, buffer.Length)).AsTask().GetAwaiter().GetResult();
if (_readTimeout > 0)
{
cts = new CancellationTokenSource(_readTimeout);
}
int readLength = ReadAsync(new Memory<byte>(rentedBuffer, 0, buffer.Length), cts != null ? cts.Token : default).AsTask().GetAwaiter().GetResult();
rentedBuffer.AsSpan(0, readLength).CopyTo(buffer);
return readLength;
}
catch (OperationCanceledException) when (cts != null && cts.IsCancellationRequested)
{
// sync operations do not have Cancellation
throw new IOException(SR.net_quic_timeout);
}
finally
{
ArrayPool<byte>.Shared.Return(rentedBuffer);
cts?.Dispose();
}
}

internal override void Write(ReadOnlySpan<byte> buffer)
{
ThrowIfDisposed();
CancellationTokenSource? cts = null;


if (_writeTimeout > 0)
{
cts = new CancellationTokenSource(_writeTimeout);
}

// TODO: optimize this.
WriteAsync(buffer.ToArray()).AsTask().GetAwaiter().GetResult();
try
{
WriteAsync(buffer.ToArray()).AsTask().GetAwaiter().GetResult();
}
catch (OperationCanceledException) when (cts != null && cts.IsCancellationRequested)
{
// sync operations do not have Cancellation
throw new IOException(SR.net_quic_timeout);
}
finally
{
cts?.Dispose();
}
}

// MsQuic doesn't support explicit flushing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ internal abstract class QuicStreamProvider : IDisposable, IAsyncDisposable
{
internal abstract long StreamId { get; }

internal abstract bool CanTimeout { get; }

internal abstract bool CanRead { get; }

internal abstract int ReadTimeout { get; set; }

internal abstract int Read(Span<byte> buffer);

internal abstract ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default);
Expand All @@ -25,6 +29,8 @@ internal abstract class QuicStreamProvider : IDisposable, IAsyncDisposable

internal abstract void Write(ReadOnlySpan<byte> buffer);

internal abstract int WriteTimeout { get; set; }

internal abstract ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default);

internal abstract ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, bool endStream, CancellationToken cancellationToken = default);
Expand Down
14 changes: 14 additions & 0 deletions src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,20 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati

public override void Write(ReadOnlySpan<byte> buffer) => _provider.Write(buffer);

public override bool CanTimeout => _provider.CanTimeout;

public override int ReadTimeout
{
get => _provider.ReadTimeout;
set => _provider.ReadTimeout = value;
}

public override int WriteTimeout
{
get => _provider.WriteTimeout;
set => _provider.WriteTimeout = value;
}

public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default) => _provider.WriteAsync(buffer, cancellationToken);

public override void Flush() => _provider.Flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public sealed class MsQuicQuicStreamConformanceTests : QuicStreamConformanceTest
protected override QuicImplementationProvider Provider => QuicImplementationProviders.MsQuic;
protected override bool UsableAfterCanceledReads => false;
protected override bool BlocksOnZeroByteReads => true;
protected override bool CanTimeout => true;

public MsQuicQuicStreamConformanceTests(ITestOutputHelper output)
{
Expand Down

0 comments on commit a33dbf3

Please sign in to comment.