Skip to content

Commit

Permalink
Set MaxConcurrentStreams = 100 on new Http2Connection until first SET…
Browse files Browse the repository at this point in the history
…TINGS frame (dotnet#40779)

* Set MaxConcurrentStreams = 100 on new Http2Connection until first SETTINGS frame

* Update src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs

Co-authored-by: Chris Ross <[email protected]>

* - Setting infinite MaxConcurrentStreams fixed
- Test verifying new connection request queueing added

* More tests

* - const case fixed
- Outerloop attributed added

* ExtraStreams case fixed

Co-authored-by: Chris Ross <[email protected]>
  • Loading branch information
alnikola and Tratcher authored Aug 15, 2020
1 parent a62b38d commit 12f46e4
Show file tree
Hide file tree
Showing 3 changed files with 232 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,27 @@ public override Task InitializeConnectionAsync()
}

public async Task<SettingsFrame> ReadAndSendSettingsAsync(TimeSpan? ackTimeout, params SettingsEntry[] settingsEntries)
{
SettingsFrame clientSettingsFrame = await ReadSettingsAsync().ConfigureAwait(false);
await SendSettingsAsync(ackTimeout, settingsEntries).ConfigureAwait(false);
return clientSettingsFrame;
}

public async Task SendSettingsAsync(TimeSpan? ackTimeout, SettingsEntry[] settingsEntries)
{
// Send the initial server settings frame.
SettingsFrame settingsFrame = new SettingsFrame(settingsEntries);
await WriteFrameAsync(settingsFrame).ConfigureAwait(false);

// Send the client settings frame ACK.
Frame settingsAck = new Frame(0, FrameType.Settings, FrameFlags.Ack, 0);
await WriteFrameAsync(settingsAck).ConfigureAwait(false);

// The client will send us a SETTINGS ACK eventually, but not necessarily right away.
await ExpectSettingsAckAsync((int?)ackTimeout?.TotalMilliseconds ?? 5000);
}

public async Task<SettingsFrame> ReadSettingsAsync()
{
// Receive the initial client settings frame.
Frame receivedFrame = await ReadFrameAsync(_timeout).ConfigureAwait(false);
Expand All @@ -657,18 +678,6 @@ public async Task<SettingsFrame> ReadAndSendSettingsAsync(TimeSpan? ackTimeout,
Assert.Equal(FrameType.WindowUpdate, receivedFrame.Type);
Assert.Equal(FrameFlags.None, receivedFrame.Flags);
Assert.Equal(0, receivedFrame.StreamId);

// Send the initial server settings frame.
SettingsFrame settingsFrame = new SettingsFrame(settingsEntries);
await WriteFrameAsync(settingsFrame).ConfigureAwait(false);

// Send the client settings frame ACK.
Frame settingsAck = new Frame(0, FrameType.Settings, FrameFlags.Ack, 0);
await WriteFrameAsync(settingsAck).ConfigureAwait(false);

// The client will send us a SETTINGS ACK eventually, but not necessarily right away.
await ExpectSettingsAckAsync((int?)ackTimeout?.TotalMilliseconds ?? 5000);

return clientSettingsFrame;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ internal sealed partial class Http2Connection : HttpConnectionBase, IDisposable

private const int MaxStreamId = int.MaxValue;

// Temporary workaround for request burst handling on connection start.
private const int InitialMaxConcurrentStreams = 100;

private static readonly byte[] s_http2ConnectionPreface = Encoding.ASCII.GetBytes("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n");

#if DEBUG
Expand Down Expand Up @@ -124,13 +127,14 @@ public Http2Connection(HttpConnectionPool pool, Connection connection)
_httpStreams = new Dictionary<int, Http2Stream>();

_connectionWindow = new CreditManager(this, nameof(_connectionWindow), DefaultInitialWindowSize);
_concurrentStreams = new CreditManager(this, nameof(_concurrentStreams), int.MaxValue);
_concurrentStreams = new CreditManager(this, nameof(_concurrentStreams), InitialMaxConcurrentStreams);

_writeChannel = Channel.CreateUnbounded<WriteQueueEntry>(s_channelOptions);

_nextStream = 1;
_initialWindowSize = DefaultInitialWindowSize;
_maxConcurrentStreams = int.MaxValue;

_maxConcurrentStreams = InitialMaxConcurrentStreams;
_pendingWindowUpdate = 0;
_idleSinceTickCount = Environment.TickCount64;

Expand Down Expand Up @@ -288,7 +292,7 @@ private async Task ProcessIncomingFramesAsync()
if (NetEventSource.Log.IsEnabled()) Trace($"Frame 0: {frameHeader}.");

// Process the initial SETTINGS frame. This will send an ACK.
ProcessSettingsFrame(frameHeader);
ProcessSettingsFrame(frameHeader, initialFrame: true);
}
catch (IOException e)
{
Expand Down Expand Up @@ -558,7 +562,7 @@ private void ProcessDataFrame(FrameHeader frameHeader)
_incomingBuffer.Discard(frameHeader.PayloadLength);
}

private void ProcessSettingsFrame(FrameHeader frameHeader)
private void ProcessSettingsFrame(FrameHeader frameHeader, bool initialFrame = false)
{
Debug.Assert(frameHeader.Type == FrameType.Settings);

Expand Down Expand Up @@ -592,6 +596,7 @@ private void ProcessSettingsFrame(FrameHeader frameHeader)

// Parse settings and process the ones we care about.
ReadOnlySpan<byte> settings = _incomingBuffer.ActiveSpan.Slice(0, frameHeader.PayloadLength);
bool maxConcurrentStreamsReceived = false;
while (settings.Length > 0)
{
Debug.Assert((settings.Length % 6) == 0);
Expand All @@ -605,6 +610,7 @@ private void ProcessSettingsFrame(FrameHeader frameHeader)
{
case SettingId.MaxConcurrentStreams:
ChangeMaxConcurrentStreams(settingValue);
maxConcurrentStreamsReceived = true;
break;

case SettingId.InitialWindowSize:
Expand Down Expand Up @@ -632,6 +638,12 @@ private void ProcessSettingsFrame(FrameHeader frameHeader)
}
}

if (initialFrame && !maxConcurrentStreamsReceived)
{
// Set to 'infinite' because MaxConcurrentStreams was not set on the initial SETTINGS frame.
ChangeMaxConcurrentStreams(int.MaxValue);
}

_incomingBuffer.Discard(frameHeader.PayloadLength);

// Send acknowledgement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,176 @@ public async Task PostAsync_StreamRefused_RequestIsRetried()
}
}

[OuterLoop("Uses delays")]
[ConditionalFact(nameof(SupportsAlpn))]
public async Task GetAsync_SettingsFrameNotSentOnNewConnection_ClientApplies100StreamLimit()
{
const int DefaultMaxConcurrentStreams = 100;
TimeSpan timeout = TimeSpan.FromSeconds(3);
using (Http2LoopbackServer server = Http2LoopbackServer.CreateServer())
using (HttpClient client = CreateHttpClient())
{
Task<Http2LoopbackConnection> connectionTask = AcceptConnectionAndReadSettings(server, timeout);

List<Task<HttpResponseMessage>> sendTasks = new List<Task<HttpResponseMessage>>();
for (int i = 0; i < DefaultMaxConcurrentStreams; i++)
{
sendTasks.Add(client.GetAsync(server.Address));
}

Http2LoopbackConnection connection = await connectionTask.TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);

// Client sets the default MaxConcurrentStreams to 100, so accept 100 requests.
List<int> acceptedRequests = await AcceptRequests(connection, DefaultMaxConcurrentStreams).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);

Assert.Equal(DefaultMaxConcurrentStreams, acceptedRequests.Count);

// Extra request is queued on the client.
Task<HttpResponseMessage> extraSendTask = client.GetAsync(server.Address);
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => connection.ReadRequestHeaderAsync()).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);

// Send SETTINGS frame to increase the default stream limit by 1, unblocking the extra request.
await connection.SendSettingsAsync(timeout, new[] { new SettingsEntry() { SettingId = SettingId.MaxConcurrentStreams, Value = DefaultMaxConcurrentStreams + 1 } }).ConfigureAwait(false);

(int extraStreamId, _) = await connection.ReadAndParseRequestHeaderAsync().TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);

// Respond to all the requests.
acceptedRequests.Add(extraStreamId);
foreach (int streamId in acceptedRequests)
{
await connection.SendDefaultResponseAsync(streamId).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
}

sendTasks.Add(extraSendTask);
await Task.WhenAll(sendTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);

foreach (Task<HttpResponseMessage> sendTask in sendTasks)
{
using HttpResponseMessage response = sendTask.Result;
Assert.True(response.IsSuccessStatusCode);
}
}
}

[OuterLoop("Uses delays")]
[ConditionalFact(nameof(SupportsAlpn))]
public async Task GetAsync_ServerDelaysSendingSettingsThenSetsLowerMaxConcurrentStreamsLimitThenIncreaseIt_ClientAppliesEachLimitChangeProperly()
{
const int DefaultMaxConcurrentStreams = 100;
const int ExtraStreams = 20;
TimeSpan timeout = TimeSpan.FromSeconds(3);
using (Http2LoopbackServer server = Http2LoopbackServer.CreateServer())
using (HttpClient client = CreateHttpClient())
{
Task<Http2LoopbackConnection> connectionTask = AcceptConnectionAndReadSettings(server, timeout);

List<Task<HttpResponseMessage>> sendTasks = new List<Task<HttpResponseMessage>>();
for (int i = 0; i < DefaultMaxConcurrentStreams + ExtraStreams; i++)
{
sendTasks.Add(client.GetAsync(server.Address));
}

Http2LoopbackConnection connection = await connectionTask.TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);

// Client sets the default MaxConcurrentStreams to 100, so accept 100 requests.
List<int> acceptedRequests = await AcceptRequests(connection, DefaultMaxConcurrentStreams + ExtraStreams).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
Assert.Equal(DefaultMaxConcurrentStreams, acceptedRequests.Count);

// Send SETTINGS frame with MaxConcurrentStreams = 102
await connection.SendSettingsAsync(timeout, new[] { new SettingsEntry() { SettingId = SettingId.MaxConcurrentStreams, Value = DefaultMaxConcurrentStreams + 2 } }).ConfigureAwait(false);

// Increased MaxConcurrentStreams ublocks only 2 requests.
List<int> acceptedExtraRequests = await AcceptRequests(connection, ExtraStreams).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
Assert.Equal(2, acceptedExtraRequests.Count);

acceptedRequests.AddRange(acceptedExtraRequests);

// Send SETTINGS frame with MaxConcurrentStreams = DefaultMaxConcurrentStreams + ExtraStreams
await connection.ExpectSettingsAckAsync().ConfigureAwait(false);
Frame frame = new SettingsFrame(new SettingsEntry() { SettingId = SettingId.MaxConcurrentStreams, Value = DefaultMaxConcurrentStreams + ExtraStreams });
await connection.WriteFrameAsync(frame).ConfigureAwait(false);

// Increased MaxConcurrentStreams ublocks all remaining requests.
acceptedExtraRequests = await AcceptRequests(connection, ExtraStreams - 2).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
Assert.Equal(ExtraStreams - 2, acceptedExtraRequests.Count);

acceptedRequests.AddRange(acceptedExtraRequests);

// Respond to all the requests.
foreach (int streamId in acceptedRequests)
{
await connection.SendDefaultResponseAsync(streamId).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
}

await Task.WhenAll(sendTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);

foreach (Task<HttpResponseMessage> sendTask in sendTasks)
{
using HttpResponseMessage response = sendTask.Result;
Assert.True(response.IsSuccessStatusCode);
}
}
}

[OuterLoop("Uses delays")]
[ConditionalFact(nameof(SupportsAlpn))]
public async Task GetAsync_ServerSendSettingsWithoutMaxConcurrentStreams_ClientAppliesInfiniteLimit()
{
const int DefaultMaxConcurrentStreams = 100;
TimeSpan timeout = TimeSpan.FromSeconds(3);
using (Http2LoopbackServer server = Http2LoopbackServer.CreateServer())
using (HttpClient client = CreateHttpClient())
{
Task<Http2LoopbackConnection> connectionTask = AcceptConnectionAndReadSettings(server, timeout);

List<Task<HttpResponseMessage>> sendTasks = new List<Task<HttpResponseMessage>>();
for (int i = 0; i < DefaultMaxConcurrentStreams; i++)
{
sendTasks.Add(client.GetAsync(server.Address));
}

Http2LoopbackConnection connection = await connectionTask.TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);

// Client sets the default MaxConcurrentStreams to 100, so accept 100 requests.
List<int> acceptedRequests = await AcceptRequests(connection, DefaultMaxConcurrentStreams).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
Assert.Equal(DefaultMaxConcurrentStreams, acceptedRequests.Count);

// Extra request is queued on the client.
Task<HttpResponseMessage> extraSendTask = client.GetAsync(server.Address);
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => connection.ReadRequestHeaderAsync()).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);

// Send SETTINGS frame without MaxConcurrentSettings value.
await connection.SendSettingsAsync(timeout, new SettingsEntry[0]).ConfigureAwait(false);

// Send 100 more requests
sendTasks.Add(extraSendTask);
for (int i = 0; i < DefaultMaxConcurrentStreams; i++)
{
sendTasks.Add(client.GetAsync(server.Address));
}

// Client sets the MaxConcurrentStreams to 'infinite', so accept 100 + 1 extra requests.
List<int> acceptedExtraRequests = await AcceptRequests(connection, DefaultMaxConcurrentStreams + 1).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
Assert.Equal(DefaultMaxConcurrentStreams + 1, acceptedExtraRequests.Count);

// Respond to all the requests.
acceptedRequests.AddRange(acceptedExtraRequests);
foreach (int streamId in acceptedRequests)
{
await connection.SendDefaultResponseAsync(streamId).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
}

sendTasks.Add(extraSendTask);
await Task.WhenAll(sendTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);

foreach (Task<HttpResponseMessage> sendTask in sendTasks)
{
using HttpResponseMessage response = sendTask.Result;
Assert.True(response.IsSuccessStatusCode);
}
}
}

// This test is based on RFC 7540 section 6.1:
// "If a DATA frame is received whose stream identifier field is 0x0, the recipient MUST
// respond with a connection error (Section 5.4.1) of type PROTOCOL_ERROR."
Expand Down Expand Up @@ -476,6 +646,31 @@ public async Task HeadersFrame_IdleStream_ConnectionError()
}
}

private async Task<Http2LoopbackConnection> AcceptConnectionAndReadSettings(Http2LoopbackServer server, TimeSpan timeout)
{
Http2LoopbackConnection connection = await server.AcceptConnectionAsync(timeout).ConfigureAwait(false);
await connection.ReadSettingsAsync().ConfigureAwait(false);
return connection;
}

private async Task<List<int>> AcceptRequests(Http2LoopbackConnection connection, int maxRequests = int.MaxValue)
{
List<int> streamIds = new List<int>();
for (int i = 0; i < maxRequests; i++)
{
try
{
streamIds.Add((await connection.ReadAndParseRequestHeaderAsync().ConfigureAwait(false)).streamId);
}
catch (OperationCanceledException)
{
return streamIds;
}
}

return streamIds;
}

private static Frame MakeSimpleHeadersFrame(int streamId, bool endHeaders = false, bool endStream = false) =>
new HeadersFrame(new byte[] { 0x88 }, // :status: 200
(endHeaders ? FrameFlags.EndHeaders : FrameFlags.None) | (endStream ? FrameFlags.EndStream : FrameFlags.None),
Expand Down

0 comments on commit 12f46e4

Please sign in to comment.