Skip to content

Commit

Permalink
Handle multiple incoming connection at the beginning of Http3Loopback…
Browse files Browse the repository at this point in the history
…Connection use (dotnet#69453)

* Handle multiple incoming connection at the beginning of Http3LoopbackConnection use

* Dispose delayed QuicStreams
  • Loading branch information
rzikm authored May 24, 2022
1 parent 2a99e18 commit e86511e
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 77 deletions.
120 changes: 52 additions & 68 deletions src/libraries/Common/tests/System/Net/Http/Http3LoopbackConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ internal sealed class Http3LoopbackConnection : GenericLoopbackConnection

private readonly QuicConnection _connection;

// Queue for holding streams we accepted before we managed to accept the control stream
private readonly Queue<QuicStream> _delayedStreams = new Queue<QuicStream>();

// This is specifically request streams, not control streams
private readonly Dictionary<int, Http3LoopbackStream> _openStreams = new Dictionary<int, Http3LoopbackStream>();

Expand All @@ -51,6 +54,8 @@ public Http3LoopbackConnection(QuicConnection connection)
_connection = connection;
}

public long MaxHeaderListSize { get; private set; } = -1;

public override void Dispose()
{
// Close any remaining request streams (but NOT control streams, as these should not be closed while the connection is open)
Expand All @@ -59,6 +64,11 @@ public override void Dispose()
stream.Dispose();
}

foreach (QuicStream stream in _delayedStreams)
{
stream.Dispose();
}

// We don't dispose the connection currently, because this causes races when the server connection is closed before
// the client has received and handled all response data.
// See discussion in https://github.com/dotnet/runtime/pull/57223#discussion_r687447832
Expand Down Expand Up @@ -107,100 +117,74 @@ public override Task InitializeConnectionAsync()
throw new NotImplementedException();
}

public async Task<Http3LoopbackStream> AcceptStreamAsync()
private Task EnsureControlStreamAcceptedAsync()
{
QuicStream quicStream = await _connection.AcceptStreamAsync().ConfigureAwait(false);
var stream = new Http3LoopbackStream(quicStream);

if (quicStream.CanWrite)
if (_inboundControlStream != null)
{
_openStreams.Add(checked((int)quicStream.StreamId), stream);
_currentStream = stream;
_currentStreamId = quicStream.StreamId;
return Task.CompletedTask;
}

return stream;
}

private async Task HandleControlStreamAsync(Http3LoopbackStream controlStream)
{
if (_inboundControlStream is not null)
return EnsureControlStreamAcceptedInternalAsync();
async Task EnsureControlStreamAcceptedInternalAsync()
{
throw new Exception("Received second control stream from client???");
}
Http3LoopbackStream controlStream;

while (true)
{
QuicStream quicStream = await _connection.AcceptStreamAsync().ConfigureAwait(false);

Assert.False(controlStream.CanWrite);
if (!quicStream.CanWrite)
{
// control stream accepted
controlStream = new Http3LoopbackStream(quicStream);
break;
}

long? streamType = await controlStream.ReadIntegerAsync();
Assert.Equal(Http3LoopbackStream.ControlStream, streamType);
// control streams are unidirectional, so this must be a request stream
// keep it for later and wait for another stream
_delayedStreams.Enqueue(quicStream);
}

List<(long settingId, long settingValue)> settings = await controlStream.ReadSettingsAsync();
(long settingId, long settingValue) = Assert.Single(settings);
long? streamType = await controlStream.ReadIntegerAsync();
Assert.Equal(Http3LoopbackStream.ControlStream, streamType);

Assert.Equal(Http3LoopbackStream.MaxHeaderListSize, settingId);
List<(long settingId, long settingValue)> settings = await controlStream.ReadSettingsAsync();
(long settingId, long settingValue) = Assert.Single(settings);

_inboundControlStream = controlStream;
Assert.Equal(Http3LoopbackStream.MaxHeaderListSize, settingId);
MaxHeaderListSize = settingValue;

_inboundControlStream = controlStream;
}
}

// This will automatically handle the control stream, including validating its contents
public async Task<Http3LoopbackStream> AcceptRequestStreamAsync()
{
Http3LoopbackStream requestStream = null;
await EnsureControlStreamAcceptedAsync().ConfigureAwait(false);

while (true)
if (!_delayedStreams.TryDequeue(out QuicStream quicStream))
{
var stream = await AcceptStreamAsync().ConfigureAwait(false);

// Accepted request stream.
if (stream.CanWrite)
{
// Only one expected.
Assert.True(requestStream is null, "Expected single request stream, got a second");
quicStream = await _connection.AcceptStreamAsync().ConfigureAwait(false);
}

// Control stream is set --> return the request stream.
if (_inboundControlStream is not null)
{
return stream;
}
var stream = new Http3LoopbackStream(quicStream);

// Control stream not set --> need to accept another stream.
requestStream = stream;
continue;
}
Assert.True(quicStream.CanWrite, "Expected writeable stream.");

// Must be the control stream.
await HandleControlStreamAsync(stream);
_openStreams.Add(checked((int)quicStream.StreamId), stream);
_currentStream = stream;
_currentStreamId = quicStream.StreamId;

// We've already accepted request stream --> return it.
if (requestStream is not null)
{
return requestStream;
}
}
return stream;
}

public async Task<(Http3LoopbackStream clientControlStream, Http3LoopbackStream requestStream)> AcceptControlAndRequestStreamAsync()
{
Http3LoopbackStream streamA = null, streamB = null;

try
{
streamA = await AcceptStreamAsync();
streamB = await AcceptStreamAsync();
Http3LoopbackStream requestStream = await AcceptRequestStreamAsync();
Http3LoopbackStream controlStream = _inboundControlStream;

return (streamA.CanWrite, streamB.CanWrite) switch
{
(false, true) => (streamA, streamB),
(true, false) => (streamB, streamA),
_ => throw new Exception("Expected one unidirectional and one bidirectional stream; received something else.")
};
}
catch
{
streamA?.Dispose();
streamB?.Dispose();
throw;
}
return (controlStream, requestStream);
}

public async Task EstablishControlStreamAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,7 @@ public async Task ClientSettingsReceived_Success(int headerSizeLimit)
using (requestStream)
{
Assert.False(settingsStream.CanWrite, "Expected unidirectional control stream.");

long? streamType = await settingsStream.ReadIntegerAsync();
Assert.Equal(Http3LoopbackStream.ControlStream, streamType);

List<(long settingId, long settingValue)> settings = await settingsStream.ReadSettingsAsync();
(long settingId, long settingValue) = Assert.Single(settings);

Assert.Equal(Http3LoopbackStream.MaxHeaderListSize, settingId);
Assert.Equal(headerSizeLimit * 1024L, settingValue);
Assert.Equal(headerSizeLimit * 1024L, connection.MaxHeaderListSize);

await requestStream.ReadRequestDataAsync();
await requestStream.SendResponseAsync();
Expand Down

0 comments on commit e86511e

Please sign in to comment.