forked from quixio/quix-streams-dotnet
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Use client.id to connect to Confluent Cloud Kafka (#155) (#156)
- Loading branch information
1 parent
17fc0dd
commit 33faf3c
Showing
4 changed files
with
252 additions
and
0 deletions.
There are no files selected for viewing
144 changes: 144 additions & 0 deletions
144
src/CsharpClient/QuixStreams.Streaming.UnitTests/StreamingClientShould.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Net; | ||
using System.Net.Http; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using FluentAssertions; | ||
using Xunit; | ||
|
||
namespace QuixStreams.Streaming.UnitTests; | ||
|
||
public class StreamingClientShould | ||
{ | ||
[Theory] | ||
[InlineData("confluent-testTopic")] | ||
[InlineData("quixdev-secondTest")] | ||
[InlineData("different-topic")] | ||
public void GetTopicConsumer_ShouldUseClientId(string topicName) | ||
{ | ||
// Arrange | ||
var messageHandler = new MockHttpMessageHandler(new Dictionary<string, string>() | ||
{ | ||
{ "/workspaces", workspaces }, | ||
{ "/topics", topics } | ||
}); | ||
var client = new HttpClient(messageHandler); | ||
var streamingClient = new QuixStreamingClient(httpClient: client, token: "faketoken"); | ||
|
||
// Act | ||
var topicConsumer = streamingClient.GetTopicConsumer(topicName); | ||
|
||
// Assert | ||
topicConsumer.Should().NotBeNull(); | ||
} | ||
|
||
private string topics = @" | ||
[ | ||
{ | ||
""id"": ""confluent-testTopic"", | ||
""name"": ""confluent-testTopic"", | ||
""workspaceId"": ""confluent"", | ||
""status"": ""Ready"", | ||
}, | ||
{ | ||
""id"": ""quixdev-secondTest"", | ||
""name"": ""quixdev-secondTest"", | ||
""workspaceId"": ""quixdev"", | ||
""status"": ""Ready"", | ||
}, | ||
{ | ||
""id"": ""different-topic"", | ||
""name"": ""different-topic"", | ||
""workspaceId"": ""different"", | ||
""status"": ""Ready"", | ||
} | ||
]"; | ||
|
||
private string workspaces = @" | ||
[ | ||
{ | ||
""workspaceId"": ""confluent"" | ||
,""name"": ""Confluent Kafka Workspace"", | ||
""status"": ""Ready"", | ||
""broker"": { | ||
""address"": ""xxxxx:9092"", | ||
""securityMode"": ""SaslSsl"", | ||
""sslPassword"": """", | ||
""saslMechanism"": ""Plain"", | ||
""username"": ""xxxxx"", | ||
""password"": ""xxxxx"", | ||
""hasCertificate"": false | ||
}, | ||
""brokerSettings"": { | ||
""brokerType"": ""ConfluentCloud"", | ||
""syncTopics"": false, | ||
""confluentCloudSettings"": { | ||
""clientID"": ""testclientid"" | ||
} | ||
} | ||
}, | ||
{ | ||
""workspaceId"": ""quixdev"", | ||
""name"": ""Shared Kafka Workspace"", | ||
""status"": ""Ready"", | ||
""broker"": { | ||
""address"": ""xxxx:9092"", | ||
""securityMode"": ""SaslSsl"", | ||
""sslPassword"": ""xxxx"", | ||
""saslMechanism"": ""ScramSha256"", | ||
""username"": ""xxxx"", | ||
""password"": ""xxxx"", | ||
""hasCertificate"": false | ||
}, | ||
""brokerSettings"": { | ||
""brokerType"": ""SharedKafka"" | ||
} | ||
}, | ||
{ | ||
""workspaceId"": ""different"", | ||
""name"": ""Unknown Kafka Workspace"", | ||
""status"": ""Ready"", | ||
""broker"": { | ||
""address"": ""xxxx:9092"", | ||
""securityMode"": ""SaslSsl"", | ||
""sslPassword"": ""xxxx"", | ||
""saslMechanism"": ""ScramSha256"", | ||
""username"": ""xxxx"", | ||
""password"": ""xxxx"", | ||
""hasCertificate"": false | ||
}, | ||
""brokerSettings"": { | ||
""brokerType"": ""ThisIsNew"" | ||
} | ||
} | ||
]"; | ||
|
||
private class MockHttpMessageHandler : HttpMessageHandler | ||
{ | ||
private readonly Dictionary<string, string> responses; | ||
|
||
public MockHttpMessageHandler(Dictionary<string, string> responses) | ||
{ | ||
this.responses = responses; | ||
} | ||
|
||
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, | ||
CancellationToken cancellationToken) | ||
{ | ||
foreach (var keyValuePair in responses) | ||
{ | ||
if (request.RequestUri != null && request.RequestUri.ToString().Contains(keyValuePair.Key)) | ||
{ | ||
return new HttpResponseMessage | ||
{ | ||
StatusCode = HttpStatusCode.OK, | ||
Content = new StringContent(keyValuePair.Value) | ||
}; | ||
} | ||
} | ||
|
||
throw new Exception("URL not found"); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
33 changes: 33 additions & 0 deletions
33
src/CsharpClient/QuixStreams.Streaming/QuixApi/WorkspaceBrokerTypeJsonConverter.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
using System; | ||
using Newtonsoft.Json; | ||
using QuixStreams.Streaming.QuixApi.Portal; | ||
|
||
namespace QuixStreams.Streaming.QuixApi | ||
{ | ||
public class WorkspaceBrokerTypeJsonConverter : JsonConverter | ||
{ | ||
public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) | ||
{ | ||
writer.WriteValue(value.ToString()); | ||
} | ||
|
||
public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer) | ||
{ | ||
if (reader.TokenType == JsonToken.String) | ||
{ | ||
string value = reader.Value.ToString(); | ||
if (Enum.TryParse(value, out WorkspaceBrokerType result)) | ||
{ | ||
return result; | ||
} | ||
} | ||
|
||
return WorkspaceBrokerType.Unknown; | ||
} | ||
|
||
public override bool CanConvert(Type objectType) | ||
{ | ||
return objectType == typeof(WorkspaceBrokerType); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters