Skip to content

Commit

Permalink
Fix telemetry for Socket connects to Dns endpoints (dotnet#54071)
Browse files Browse the repository at this point in the history
  • Loading branch information
MihaZupan authored Jun 24, 2021
1 parent 93f407d commit f3af8d8
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public partial class SocketAsyncEventArgs : EventArgs, IDisposable
private Socket? _currentSocket;
private bool _userSocket; // if false when performing Connect, _currentSocket should be disposed
private bool _disposeCalled;
private protected bool _disableTelemetry;

// Controls thread safety via Interlocked.
private const int Configuring = -1;
Expand Down Expand Up @@ -202,7 +201,7 @@ public int BytesTransferred

private void OnCompletedInternal()
{
if (SocketsTelemetry.Log.IsEnabled() && !_disableTelemetry) AfterConnectAcceptTelemetry();
if (SocketsTelemetry.Log.IsEnabled()) AfterConnectAcceptTelemetry();

OnCompleted(this);
}
Expand Down Expand Up @@ -813,11 +812,8 @@ caughtException is OperationCanceledException ||
}

// Complete the operation.
if (SocketsTelemetry.Log.IsEnabled() && !_disableTelemetry)
{
LogBytesTransferEvents(_connectSocket?.SocketType, SocketAsyncOperation.Connect, internalArgs.BytesTransferred);
AfterConnectAcceptTelemetry();
}
if (SocketsTelemetry.Log.IsEnabled()) LogBytesTransferEvents(_connectSocket?.SocketType, SocketAsyncOperation.Connect, internalArgs.BytesTransferred);

Complete();

// Clean up after our temporary arguments.
Expand All @@ -842,12 +838,7 @@ private sealed class MultiConnectSocketAsyncEventArgs : SocketAsyncEventArgs, IV
private ManualResetValueTaskSourceCore<bool> _mrvtsc;
private int _isCompleted;

public MultiConnectSocketAsyncEventArgs() : base(unsafeSuppressExecutionContextFlow: false)
{
// Instances of this type are an implementation detail of an overarching connect operation.
// We don't want to emit telemetry specific to operations on this inner instance.
_disableTelemetry = true;
}
public MultiConnectSocketAsyncEventArgs() : base(unsafeSuppressExecutionContextFlow: false) { }

public void GetResult(short token) => _mrvtsc.GetResult(token);
public ValueTaskSourceStatus GetStatus(short token) => _mrvtsc.GetStatus(token);
Expand Down Expand Up @@ -968,7 +959,7 @@ internal void FinishOperationSyncSuccess(int bytesTransferred, SocketFlags flags
break;
}

if (SocketsTelemetry.Log.IsEnabled() && !_disableTelemetry) LogBytesTransferEvents(_currentSocket?.SocketType, _completedOperation, bytesTransferred);
if (SocketsTelemetry.Log.IsEnabled()) LogBytesTransferEvents(_currentSocket?.SocketType, _completedOperation, bytesTransferred);

Complete();
}
Expand Down Expand Up @@ -1003,7 +994,7 @@ private void FinishOperationSync(SocketError socketError, int bytesTransferred,
FinishOperationSyncFailure(socketError, bytesTransferred, flags);
}

if (SocketsTelemetry.Log.IsEnabled() && !_disableTelemetry) AfterConnectAcceptTelemetry();
if (SocketsTelemetry.Log.IsEnabled()) AfterConnectAcceptTelemetry();
}

private static void LogBytesTransferEvents(SocketType? socketType, SocketAsyncOperation operation, int bytesTransferred)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void AcceptStart(EndPoint address)
{
if (IsEnabled(EventLevel.Informational, EventKeywords.All))
{
AcceptStart(address.ToString());
AcceptStart(address.Serialize().ToString());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.DotNet.RemoteExecutor;
using Microsoft.DotNet.XUnitExtensions;
using Xunit;
using Xunit.Abstractions;

Expand Down Expand Up @@ -117,14 +116,9 @@ await listener.RunWithCallbackAsync(e => events.Enqueue((e, e.ActivityId)), asyn

await WaitForEventCountersAsync(events);
});
Assert.DoesNotContain(events, ev => ev.Event.EventId == 0); // errors from the EventSource itself

VerifyStartStopEvents(events, connect: true, expectedCount: 1);
VerifyStartStopEvents(events, connect: false, expectedCount: 1);

Assert.DoesNotContain(events, e => e.Event.EventName == "ConnectFailed");
Assert.DoesNotContain(events, e => e.Event.EventName == "AcceptFailed");

VerifyEvents(events, connect: true, expectedCount: 1);
VerifyEvents(events, connect: false, expectedCount: 1);
VerifyEventCounters(events, connectCount: 1);
}, connectMethod, acceptMethod).Dispose();
}
Expand Down Expand Up @@ -153,12 +147,8 @@ await listener.RunWithCallbackAsync(e => events.Enqueue((e, e.ActivityId)), asyn

await WaitForEventCountersAsync(events);
});
Assert.DoesNotContain(events, ev => ev.Event.EventId == 0); // errors from the EventSource itself

VerifyStartStopEvents(events, connect: true, expectedCount: 1);

Assert.DoesNotContain(events, e => e.Event.EventName == "ConnectFailed");

VerifyEvents(events, connect: true, expectedCount: 1);
VerifyEventCounters(events, connectCount: 1, connectOnly: true);
}, connectMethod, useDnsEndPoint.ToString()).Dispose();
}
Expand All @@ -169,12 +159,6 @@ await listener.RunWithCallbackAsync(e => events.Enqueue((e, e.ActivityId)), asyn
[MemberData(nameof(SocketMethods_WithBools_MemberData))]
public void EventSource_SocketConnectFailure_LogsConnectFailed(string connectMethod, bool useDnsEndPoint)
{
if (useDnsEndPoint)
{
// [ActiveIssue("https://github.com/dotnet/runtime/issues/43931")]
throw new SkipTestException("https://github.com/dotnet/runtime/issues/43931");
}

RemoteExecutor.Invoke(async (connectMethod, useDnsEndPointString) =>
{
EndPoint endPoint = await GetRemoteEndPointAsync(useDnsEndPointString, port: 12345);
Expand Down Expand Up @@ -207,7 +191,10 @@ await listener.RunWithCallbackAsync(e => events.Enqueue((e, e.ActivityId)), asyn
await WaitForEventCountersAsync(events);
});

VerifyConnectFailureEvents(events);
// For DNS endpoints, we may see multiple Start/Failure/Stop events
int? expectedCount = bool.Parse(useDnsEndPointString) ? null : 1;
VerifyEvents(events, connect: true, expectedCount, shouldHaveFailures: true);
VerifyEventCounters(events, connectCount: 0);
}, connectMethod, useDnsEndPoint.ToString()).Dispose();
}

Expand All @@ -216,12 +203,6 @@ await listener.RunWithCallbackAsync(e => events.Enqueue((e, e.ActivityId)), asyn
[MemberData(nameof(SocketMethods_MemberData))]
public void EventSource_SocketAcceptFailure_LogsAcceptFailed(string acceptMethod)
{
if (acceptMethod == "Sync" && PlatformDetection.IsRedHatFamily7)
{
// [ActiveIssue("https://github.com/dotnet/runtime/issues/42686")]
throw new SkipTestException("Disposing a Socket performing a sync operation can hang on RedHat7 systems");
}

RemoteExecutor.Invoke(async acceptMethod =>
{
using var listener = new TestEventListener("System.Net.Sockets", EventLevel.Verbose, 0.1);
Expand All @@ -246,18 +227,8 @@ await Assert.ThrowsAnyAsync<Exception>(async () =>

await WaitForEventCountersAsync(events);
});
Assert.DoesNotContain(events, ev => ev.Event.EventId == 0); // errors from the EventSource itself

VerifyStartStopEvents(events, connect: false, expectedCount: 1);

(EventWrittenEventArgs Event, Guid ActivityId) failed = Assert.Single(events, e => e.Event.EventName == "AcceptFailed");
Assert.Equal(2, failed.Event.Payload.Count);
Assert.True(Enum.IsDefined((SocketError)failed.Event.Payload[0]));
Assert.IsType<string>(failed.Event.Payload[1]);

(_, Guid startActivityId) = Assert.Single(events, e => e.Event.EventName == "AcceptStart");
Assert.Equal(startActivityId, failed.ActivityId);

VerifyEvents(events, connect: false, expectedCount: 1, shouldHaveFailures: true);
VerifyEventCounters(events, connectCount: 0);
}, acceptMethod).Dispose();
}
Expand All @@ -270,12 +241,6 @@ await Assert.ThrowsAnyAsync<Exception>(async () =>
[InlineData("Eap", false)]
public void EventSource_ConnectAsyncCanceled_LogsConnectFailed(string connectMethod, bool useDnsEndPoint)
{
if (useDnsEndPoint)
{
// [ActiveIssue("https://github.com/dotnet/runtime/issues/46030")]
throw new SkipTestException("https://github.com/dotnet/runtime/issues/46030");
}

RemoteExecutor.Invoke(async (connectMethod, useDnsEndPointString) =>
{
EndPoint endPoint = await GetRemoteEndPointAsync(useDnsEndPointString, port: 12345);
Expand Down Expand Up @@ -326,27 +291,13 @@ await Assert.ThrowsAnyAsync<Exception>(async () =>
await WaitForEventCountersAsync(events);
});

VerifyConnectFailureEvents(events);
// For DNS endpoints, we may see multiple Start/Failure/Stop events
int? expectedCount = bool.Parse(useDnsEndPointString) ? null : 1;
VerifyEvents(events, connect: true, expectedCount, shouldHaveFailures: true);
VerifyEventCounters(events, connectCount: 0);
}, connectMethod, useDnsEndPoint.ToString()).Dispose();
}

private static void VerifyConnectFailureEvents(ConcurrentQueue<(EventWrittenEventArgs Event, Guid ActivityId)> events)
{
Assert.DoesNotContain(events, ev => ev.Event.EventId == 0); // errors from the EventSource itself

VerifyStartStopEvents(events, connect: true, expectedCount: 1);

(EventWrittenEventArgs Event, Guid ActivityId) failed = Assert.Single(events, e => e.Event.EventName == "ConnectFailed");
Assert.Equal(2, failed.Event.Payload.Count);
Assert.True(Enum.IsDefined((SocketError)failed.Event.Payload[0]));
Assert.IsType<string>(failed.Event.Payload[1]);

(_, Guid startActivityId) = Assert.Single(events, e => e.Event.EventName == "ConnectStart");
Assert.Equal(startActivityId, failed.ActivityId);

VerifyEventCounters(events, connectCount: 0);
}

[OuterLoop]
[ConditionalFact(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))]
public void EventSource_EventsRaisedAsExpected()
Expand Down Expand Up @@ -382,40 +333,13 @@ await listener.RunWithCallbackAsync(e => events.Enqueue((e, e.ActivityId)), asyn

await WaitForEventCountersAsync(events);
});
Assert.DoesNotContain(events, ev => ev.Event.EventId == 0); // errors from the EventSource itself

VerifyStartStopEvents(events, connect: true, expectedCount: 10);

Assert.DoesNotContain(events, e => e.Event.EventName == "ConnectFailed");

VerifyEvents(events, connect: true, expectedCount: 10);
VerifyEventCounters(events, connectCount: 10, shouldHaveTransferedBytes: true, shouldHaveDatagrams: true);
}
}).Dispose();
}

private static void VerifyStartStopEvents(ConcurrentQueue<(EventWrittenEventArgs Event, Guid ActivityId)> events, bool connect, int expectedCount)
{
string startName = connect ? "ConnectStart" : "AcceptStart";
(EventWrittenEventArgs Event, Guid ActivityId)[] starts = events.Where(e => e.Event.EventName == startName).ToArray();
Assert.Equal(expectedCount, starts.Length);
foreach ((EventWrittenEventArgs Event, _) in starts)
{
object startPayload = Assert.Single(Event.Payload);
Assert.False(string.IsNullOrWhiteSpace(startPayload as string));
}

string stopName = connect ? "ConnectStop" : "AcceptStop";
(EventWrittenEventArgs Event, Guid ActivityId)[] stops = events.Where(e => e.Event.EventName == stopName).ToArray();
Assert.Equal(expectedCount, stops.Length);
Assert.All(stops, stop => Assert.Empty(stop.Event.Payload));

for (int i = 0; i < expectedCount; i++)
{
Assert.NotEqual(Guid.Empty, starts[i].ActivityId);
Assert.Equal(starts[i].ActivityId, stops[i].ActivityId);
}
}

private static async Task WaitForEventAsync(ConcurrentQueue<(EventWrittenEventArgs Event, Guid ActivityId)> events, string name)
{
DateTime startTime = DateTime.UtcNow;
Expand Down Expand Up @@ -452,6 +376,76 @@ static bool IsBytesSentEventCounter(EventWrittenEventArgs e)
}
}

private static void VerifyEvents(ConcurrentQueue<(EventWrittenEventArgs Event, Guid ActivityId)> events, bool connect, int? expectedCount, bool shouldHaveFailures = false)
{
bool start = false;
Guid startGuid = Guid.Empty;
bool seenFailures = false;
bool seenFailureAfterStart = false;
int numberOfStops = 0;

foreach ((EventWrittenEventArgs Event, Guid ActivityId) in events)
{
Assert.False(Event.EventId == 0, $"Received an error event from EventSource: {Event.Message}");

if (Event.EventName.Contains("Connect") != connect)
{
continue;
}

switch (Event.EventName)
{
case "ConnectStart":
case "AcceptStart":
Assert.False(start, "Start without a Stop");
Assert.NotEqual(Guid.Empty, ActivityId);
startGuid = ActivityId;
seenFailureAfterStart = false;
start = true;

string startAddress = Assert.IsType<string>(Assert.Single(Event.Payload));
Assert.Matches(@"^InterNetwork.*?:\d\d:{(?:\d{1,3},?)+}$", startAddress);
break;

case "ConnectStop":
case "AcceptStop":
Assert.True(start, "Stop without a Start");
Assert.Equal(startGuid, ActivityId);
startGuid = Guid.Empty;
numberOfStops++;
start = false;

Assert.Empty(Event.Payload);
break;

case "ConnectFailed":
case "AcceptFailed":
Assert.True(start, "Failed should come between Start and Stop");
Assert.False(seenFailureAfterStart, "Start may only have one Failed event");
Assert.Equal(startGuid, ActivityId);
seenFailureAfterStart = true;
seenFailures = true;

Assert.Equal(2, Event.Payload.Count);
Assert.True(Enum.IsDefined((SocketError)Event.Payload[0]));
Assert.IsType<string>(Event.Payload[1]);
break;
}
}

Assert.False(start, "Start without a Stop");
Assert.Equal(shouldHaveFailures, seenFailures);

if (expectedCount.HasValue)
{
Assert.Equal(expectedCount, numberOfStops);
}
else
{
Assert.NotEqual(0, numberOfStops);
}
}

private static void VerifyEventCounters(ConcurrentQueue<(EventWrittenEventArgs Event, Guid ActivityId)> events, int connectCount, bool connectOnly = false, bool shouldHaveTransferedBytes = false, bool shouldHaveDatagrams = false)
{
Dictionary<string, double[]> eventCounters = events
Expand Down

0 comments on commit f3af8d8

Please sign in to comment.