Skip to content

Commit

Permalink
FEAT: add Azure ServiceBus Queue/Topic MessagePump registration short…
Browse files Browse the repository at this point in the history
…cuts (arcus-azure#50)

* FEAT: add Azure ServiceBus Queue/Topic MessagePump registration shortcuts

* PR-SUG: add unit tests to verify if we wire up correctly

* PR-SUG: update service extensions tests with backdoor mocking verification
  • Loading branch information
stijnmoreels authored Jan 31, 2020
1 parent c960130 commit 2f64e2d
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,25 @@ public static IServiceCollection AddServiceBusQueueMessagePump<TMessagePump>(thi
return services;
}

/// <summary>
/// Adds a message handler to consume messages from Azure Service Bus Queue
/// </summary>
/// <remarks>When using this approach; the connection string should be scoped to the queue that is being processed, not the namespace</remarks>
/// <param name="services">Collection of services to use in the application</param>
/// <param name="queueName">Name of the queue to process</param>
/// <param name="secretName">Name of the secret to retrieve using your registered <see cref="ISecretProvider"/> implementation</param>
/// <param name="configureMessagePump">Capability to configure how the message pump should behave</param>
/// <returns>Collection of services to use in the application</returns>
public static IServiceCollection AddServiceBusQueueMessagePump<TMessagePump>(this IServiceCollection services, string queueName, string secretName, Action<AzureServiceBusMessagePumpOptions> configureMessagePump = null)
where TMessagePump : class, IHostedService
{
Guard.NotNull(services, nameof(services));

AddServiceBusMessagePump<TMessagePump>(services, queueName, string.Empty, getConnectionStringFromSecretFunc: secretProvider => secretProvider.GetRawSecretAsync(secretName), configureMessagePump: configureMessagePump);

return services;
}

/// <summary>
/// Adds a message handler to consume messages from Azure Service Bus Queue
/// </summary>
Expand Down Expand Up @@ -84,6 +103,25 @@ public static IServiceCollection AddServiceBusQueueMessagePump<TMessagePump>(thi
return services;
}

/// <summary>
/// Adds a message handler to consume messages from Azure Service Bus Topic
/// </summary>
/// <remarks>When using this approach; the connection string should be scoped to the topic that is being processed, not the namespace</remarks>
/// <param name="subscriptionName">Name of the subscription to process</param>
/// <param name="services">Collection of services to use in the application</param>
/// <param name="secretName">Name of the secret to retrieve using your registered <see cref="ISecretProvider"/> implementation</param>
/// <param name="configureMessagePump">Capability to configure how the message pump should behave</param>
/// <returns>Collection of services to use in the application</returns>
public static IServiceCollection AddServiceBusTopicMessagePump<TMessagePump>(this IServiceCollection services, string subscriptionName, string secretName, Action<AzureServiceBusMessagePumpOptions> configureMessagePump = null)
where TMessagePump : class, IHostedService
{
Guard.NotNull(services, nameof(services));

AddServiceBusMessagePump<TMessagePump>(services, subscriptionName: subscriptionName, getConnectionStringFromSecretFunc: secretProvider => secretProvider.GetRawSecretAsync(secretName), configureMessagePump: configureMessagePump);

return services;
}

/// <summary>
/// Adds a message handler to consume messages from Azure Service Bus Topic
/// </summary>
Expand Down Expand Up @@ -122,6 +160,25 @@ public static IServiceCollection AddServiceBusTopicMessagePump<TMessagePump>(thi
return services;
}

/// <summary>
/// Adds a message handler to consume messages from Azure Service Bus Topic
/// </summary>
/// <param name="topicName">Name of the topic to work with</param>
/// <param name="subscriptionName">Name of the subscription to process</param>
/// <param name="services">Collection of services to use in the application</param>
/// <param name="secretName">Name of the secret to retrieve using your registered <see cref="ISecretProvider"/> implementation</param>
/// <param name="configureMessagePump">Capability to configure how the message pump should behave</param>
/// <returns>Collection of services to use in the application</returns>
public static IServiceCollection AddServiceBusTopicMessagePump<TMessagePump>(this IServiceCollection services, string topicName, string subscriptionName, string secretName, Action<AzureServiceBusMessagePumpOptions> configureMessagePump = null)
where TMessagePump : class, IHostedService
{
Guard.NotNull(services, nameof(services));

AddServiceBusMessagePump<TMessagePump>(services, topicName, subscriptionName, getConnectionStringFromSecretFunc: secretProvider => secretProvider.GetRawSecretAsync(secretName), configureMessagePump: configureMessagePump);

return services;
}

/// <summary>
/// Adds a message handler to consume messages from Azure Service Bus Topic
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks.Abstractions" Version="3.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="3.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.8.0" />
<PackageReference Include="Moq" Version="4.13.1" />
<PackageReference Include="xunit" Version="2.3.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
</ItemGroup>
Expand Down
41 changes: 41 additions & 0 deletions src/Arcus.Messaging.Tests.Unit/ServiceBus/EmptyMessagePump.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Arcus.Messaging.Abstractions;
using Arcus.Messaging.Pumps.ServiceBus;
using Arcus.Messaging.Tests.Core.Messages.v1;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;

namespace Arcus.Messaging.Tests.Unit.ServiceBus
{
public class EmptyMessagePump : AzureServiceBusMessagePump<Order>
{
/// <summary>
/// Constructor
/// </summary>
/// <param name="configuration">Configuration of the application</param>
/// <param name="serviceProvider">Collection of services that are configured</param>
/// <param name="logger">Logger to write telemetry to</param>
public EmptyMessagePump(IConfiguration configuration, IServiceProvider serviceProvider, ILogger logger) : base(configuration, serviceProvider, logger) { }

/// <summary>
/// Process a new message that was received
/// </summary>
/// <param name="message">Message that was received</param>
/// <param name="messageContext">Context providing more information concerning the processing</param>
/// <param name="correlationInfo">
/// Information concerning correlation of telemetry & processes by using a variety of unique
/// identifiers
/// </param>
/// <param name="cancellationToken">Cancellation token</param>
protected override async Task ProcessMessageAsync(
Order message,
AzureServiceBusMessageContext messageContext,
MessageCorrelationInfo correlationInfo,
CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
using System.Threading.Tasks;
using Arcus.Messaging.Pumps.ServiceBus;
using Arcus.Security.Core;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Moq;
using Xunit;

namespace Arcus.Messaging.Tests.Unit.ServiceBus
{
public class IServiceCollectionExtensionsTests
{
[Fact]
public async Task AddServiceBusTopicMessagePump_WithSubscriptionNameIndirectSecretProvider_WiresUpCorrectly()
{
// Arrange
var services = new ServiceCollection();
var spySecretProvider = new Mock<ISecretProvider>();
services.AddSingleton(serviceProvider => spySecretProvider.Object);
services.AddSingleton(serviceProvider => Mock.Of<IConfiguration>());
services.AddSingleton(serviceProvider => Mock.Of<ILogger>());

// Act
IServiceCollection result =
services.AddServiceBusTopicMessagePump<EmptyMessagePump>(
"subscription name", "secret name", configureMessagePump: options => options.AutoComplete = true);

// Assert
Assert.NotNull(result);
ServiceProvider provider = result.BuildServiceProvider();

var messagePump = provider.GetService<IHostedService>();
Assert.IsType<EmptyMessagePump>(messagePump);

var settings = provider.GetService<AzureServiceBusMessagePumpSettings>();
await settings.GetConnectionStringAsync();
spySecretProvider.Verify(spy => spy.GetRawSecretAsync("secret name"), Times.Once);
}

[Fact]
public async Task AddServiceBusTopicMessagePump_WithTopicNameAndSubscriptionNameIndirectSecretProvider_WiresUpCorrectly()
{
// Arrange
var services = new ServiceCollection();
var spySecretProvider = new Mock<ISecretProvider>();
services.AddSingleton(serviceProvider => spySecretProvider.Object);
services.AddSingleton(serviceProvider => Mock.Of<IConfiguration>());
services.AddSingleton(serviceProvider => Mock.Of<ILogger>());

// Act
IServiceCollection result =
services.AddServiceBusTopicMessagePump<EmptyMessagePump>(
"topic name", "subscription name", "secret name", configureMessagePump: options => options.AutoComplete = true);

// Assert
// Assert
Assert.NotNull(result);
ServiceProvider provider = result.BuildServiceProvider();

var messagePump = provider.GetService<IHostedService>();
Assert.IsType<EmptyMessagePump>(messagePump);

var settings = provider.GetService<AzureServiceBusMessagePumpSettings>();
await settings.GetConnectionStringAsync();
spySecretProvider.Verify(spy => spy.GetRawSecretAsync("secret name"), Times.Once);
}

[Fact]
public async Task AddServiceBusQueueMessagePump_IndirectSecretProvider_WiresUpCorrectly()
{
// Arrange
var services = new ServiceCollection();
var spySecretProvider = new Mock<ISecretProvider>();
services.AddSingleton(serviceProvider => spySecretProvider.Object);
services.AddSingleton(serviceProvider => Mock.Of<IConfiguration>());
services.AddSingleton(serviceProvider => Mock.Of<ILogger>());

// Act
IServiceCollection result =
services.AddServiceBusQueueMessagePump<EmptyMessagePump>(
"queue name", "secret name", configureMessagePump: options => options.AutoComplete = true);

// Assert
Assert.NotNull(result);
ServiceProvider provider = result.BuildServiceProvider();

var messagePump = provider.GetService<IHostedService>();
Assert.IsType<EmptyMessagePump>(messagePump);

var settings = provider.GetService<AzureServiceBusMessagePumpSettings>();
await settings.GetConnectionStringAsync();
spySecretProvider.Verify(spy => spy.GetRawSecretAsync("secret name"), Times.Once);
}
}
}

0 comments on commit 2f64e2d

Please sign in to comment.