Skip to content

Commit

Permalink
Azure Service Bus provider
Browse files Browse the repository at this point in the history
  • Loading branch information
zarusz committed Jan 11, 2019
1 parent 0e45214 commit 7793f8d
Show file tree
Hide file tree
Showing 19 changed files with 653 additions and 22 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
dist/
.vscode/
.vscode/
secrets.txt
1 change: 0 additions & 1 deletion src/Samples/Sample.Simple.ConsoleApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using System.Globalization;
using System.Threading.Tasks;
using SlimMessageBus;
using SlimMessageBus.Host;
using SlimMessageBus.Host.Config;
using SlimMessageBus.Host.Serialization.Json;
using SlimMessageBus.Host.AzureEventHub;
Expand Down
24 changes: 12 additions & 12 deletions src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBusSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,6 @@

namespace SlimMessageBus.Host.AzureEventHub
{
public class TopicGroup
{
public TopicGroup(string topic, string group)
{
Topic = topic;
Group = group;
}

public string Topic { get; set; }
public string Group { get; set; }
}

public class EventHubMessageBusSettings
{
/// <summary>
Expand Down Expand Up @@ -67,4 +55,16 @@ public EventHubMessageBusSettings(string connectionString, string storageConnect
LeaseContainerName = leaseContainerName;
}
}

public class TopicGroup
{
public TopicGroup(string topic, string group)
{
Topic = topic;
Group = group;
}

public string Topic { get; set; }
public string Group { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using Microsoft.Azure.ServiceBus;
using SlimMessageBus.Host.Config;

namespace SlimMessageBus.Host.AzureServiceBus.Consumer
{
internal static class MessageExtensions
{
public static string FormatIf(this ConsumerSettings consumerSettings, Message msg, bool logLevel)
{
if (!logLevel)
{
return string.Empty;
}

return $"Topic: {consumerSettings.Topic}, SubscriptionName: {consumerSettings.GetSubscriptionName()}, SequenceNumber: {msg.SystemProperties.SequenceNumber}, DeliveryCount: {msg.SystemProperties.DeliveryCount}";
}

public static string FormatIf(this ConsumerSettings consumerSettings, bool logLevel)
{
if (!logLevel)
{
return string.Empty;
}

return $"Topic: {consumerSettings.Topic}, SubscriptionName: {consumerSettings.GetSubscriptionName()}, MessageType: {consumerSettings.MessageType}";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
using System;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;
using Common.Logging;
using Microsoft.Azure.ServiceBus;
using SlimMessageBus.Host.Config;

namespace SlimMessageBus.Host.AzureServiceBus.Consumer
{
public class TopicSubscriptionConsumer : IDisposable
{
private static readonly ILog Log = LogManager.GetLogger<TopicSubscriptionConsumer>();

public ServiceBusMessageBus MessageBus { get; }
public ConsumerSettings ConsumerSettings { get; }
private readonly ConsumerInstancePool<Message> _consumerInstancePool;
private readonly SubscriptionClient _subscriptionClient;

public TopicSubscriptionConsumer(ServiceBusMessageBus messageBus, ConsumerSettings consumerSettings)
{
MessageBus = messageBus;
ConsumerSettings = consumerSettings;

_consumerInstancePool = new ConsumerInstancePool<Message>(consumerSettings, messageBus, m => m.Body);

_subscriptionClient = messageBus.ServiceBusSettings.SubscriptionClientFactory(new SubscriptionFactoryParams(consumerSettings.Topic, consumerSettings.GetSubscriptionName()));

// Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
// Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
// Set it according to how many messages the application wants to process in parallel.
MaxConcurrentCalls = consumerSettings.Instances,

// Indicates whether the message pump should automatically complete the messages after returning from user callback.
// False below indicates the complete operation is handled by the user callback as in ProcessMessagesAsync().
AutoComplete = false
};

// Register the function that processes messages.
_subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
}

#region IDisposable

protected virtual void Dispose(bool disposing)
{
if (!disposing)
{
return;
}

_subscriptionClient.CloseAsync().GetAwaiter().GetResult();
_consumerInstancePool.Dispose();
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

#endregion

protected async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
// Process the message.
var mf = ConsumerSettings.FormatIf(message, Log.IsDebugEnabled);
Log.DebugFormat(CultureInfo.InvariantCulture, "Received message - {0}", mf);

await _consumerInstancePool.ProcessMessage(message).ConfigureAwait(false);

if (token.IsCancellationRequested)
{
// Note: Use the cancellationToken passed as necessary to determine if the subscriptionClient has already been closed.
// If subscriptionClient has already been closed, you can choose to not call CompleteAsync() or AbandonAsync() etc.
// to avoid unnecessary exceptions.
Log.DebugFormat(CultureInfo.InvariantCulture, "Abandon message - {0}", mf);
await _subscriptionClient.AbandonAsync(message.SystemProperties.LockToken).ConfigureAwait(false);
}
else
{
// Complete the message so that it is not received again.
// This can be done only if the subscriptionClient is created in ReceiveMode.PeekLock mode (which is the default).
Log.DebugFormat(CultureInfo.InvariantCulture, "Complete message - {0}", mf);
await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken).ConfigureAwait(false);
}
}

// Use this handler to examine the exceptions received on the message pump.
protected Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
try
{
// Execute the event hook
(ConsumerSettings.OnMessageFault ?? MessageBus.Settings.OnMessageFault)?.Invoke(ConsumerSettings, exceptionReceivedEventArgs, exceptionReceivedEventArgs.Exception);
}
catch (Exception eh)
{
// When the hook itself error out, catch the exception
Log.ErrorFormat(CultureInfo.InvariantCulture, "{0} method failed", eh, nameof(IConsumerEvents.OnMessageFault));
}
return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using SlimMessageBus.Host.Config;

namespace SlimMessageBus.Host.AzureServiceBus
{
public static class MessageBusBuilderExtensions
{
public static MessageBusBuilder WithProviderServiceBus(this MessageBusBuilder mbb, ServiceBusMessageBusSettings serviceBusSettings)
{
return mbb.WithProvider(settings => new ServiceBusMessageBus(settings, serviceBusSettings));
}
}
}
81 changes: 81 additions & 0 deletions src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Threading.Tasks;
using Common.Logging;
using Microsoft.Azure.ServiceBus;
using SlimMessageBus.Host.AzureServiceBus.Consumer;
using SlimMessageBus.Host.Collections;
using SlimMessageBus.Host.Config;

namespace SlimMessageBus.Host.AzureServiceBus
{
public class ServiceBusMessageBus : MessageBusBase
{
private static readonly ILog Log = LogManager.GetLogger<ServiceBusMessageBus>();

public ServiceBusMessageBusSettings ServiceBusSettings { get; }

private readonly SafeDictionaryWrapper<string, TopicClient> _producerByTopic;
private readonly List<TopicSubscriptionConsumer> _consumers = new List<TopicSubscriptionConsumer>();

public ServiceBusMessageBus(MessageBusSettings settings, ServiceBusMessageBusSettings serviceBusSettings) : base(settings)
{
ServiceBusSettings = serviceBusSettings;

_producerByTopic = new SafeDictionaryWrapper<string, TopicClient>(topic =>
{
Log.DebugFormat(CultureInfo.InvariantCulture, "Creating TopicClient for path {0}", topic);
return serviceBusSettings.TopicClientFactory(new TopicClientFactoryParams(topic));
});

Log.Info("Creating consumers");
foreach (var consumerSettings in settings.Consumers)
{
Log.InfoFormat(CultureInfo.InvariantCulture, "Creating consumer for {0}", consumerSettings.FormatIf(Log.IsInfoEnabled));
_consumers.Add(new TopicSubscriptionConsumer(this, consumerSettings));
}
}

#region Overrides of MessageBusBase

protected override void Dispose(bool disposing)
{
if (_consumers.Count > 0)
{
_consumers.ForEach(c => c.DisposeSilently("Consumer", Log));
_consumers.Clear();
}

if (_producerByTopic.Dictonary.Count > 0)
{
Task.WaitAll(_producerByTopic.Dictonary.Values.Select(x =>
{
Log.DebugFormat(CultureInfo.InvariantCulture, "Closing TopicClient for Topic {0}", x.Path);
return x.CloseAsync();
}).ToArray());

_producerByTopic.Clear();
}

base.Dispose(disposing);
}

public override async Task PublishToTransport(Type messageType, object message, string topic, byte[] payload)
{
AssertActive();

Log.DebugFormat(CultureInfo.InvariantCulture, "Producing message {0} of type {1} on topic {2} with size {3}", message, messageType.Name, topic, payload.Length);
var producer = _producerByTopic.GetOrAdd(topic);

var m = new Message(payload);
// ToDo: add support for partitioning key
await producer.SendAsync(m).ConfigureAwait(false);

Log.DebugFormat(CultureInfo.InvariantCulture, "Delivered message {0} of type {1} on topic {2}", message, messageType.Name, topic);
}

#endregion
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using System;
using Microsoft.Azure.ServiceBus;

namespace SlimMessageBus.Host.AzureServiceBus
{
public class ServiceBusMessageBusSettings
{
public string ServiceBusConnectionString { get; set; }

public Func<TopicClientFactoryParams, TopicClient> TopicClientFactory { get; set; }
public Func<SubscriptionFactoryParams, SubscriptionClient> SubscriptionClientFactory { get; set; }

public ServiceBusMessageBusSettings()
{
TopicClientFactory = x => new TopicClient(ServiceBusConnectionString, x.Path);
SubscriptionClientFactory = x => new SubscriptionClient(ServiceBusConnectionString, x.Path, x.SubscriptionName);
}

public ServiceBusMessageBusSettings(string serviceBusConnectionString)
: this()
{
ServiceBusConnectionString = serviceBusConnectionString;
}
}

public class TopicClientFactoryParams
{
public string Path { get; set; }

public TopicClientFactoryParams(string path)
{
Path = path;
}
}

public class SubscriptionFactoryParams
{
public string Path { get; set; }
public string SubscriptionName { get; set; }

public SubscriptionFactoryParams(string path, string subscriptionName)
{
Path = path;
SubscriptionName = subscriptionName;
}
}
}
46 changes: 46 additions & 0 deletions src/SlimMessageBus.Host.AzureServiceBus/SettingsExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using SlimMessageBus.Host.Config;

namespace SlimMessageBus.Host.AzureServiceBus
{
public static class SettingsExtensions
{
public static void SetSubscriptionName(this ConsumerSettings consumerSettings, string group)
{
consumerSettings.Properties["SubscriptionName"] = group;
}

public static string GetSubscriptionName(this ConsumerSettings consumerSettings)
{
return consumerSettings.Properties["SubscriptionName"] as string;
}

public static void SetSubscriptionName(this RequestResponseSettings consumerSettings, string group)
{
consumerSettings.Properties["SubscriptionName"] = group;
}

public static string GetSubscriptionName(this RequestResponseSettings consumerSettings)
{
return consumerSettings.Properties["SubscriptionName"] as string;
}

public static TopicSubscriberBuilder<TMessage> SubscriptionName<TMessage>(this TopicSubscriberBuilder<TMessage> builder, string subscriptionName)
{
builder.ConsumerSettings.SetSubscriptionName(subscriptionName);
return builder;
}

public static TopicHandlerBuilder<TRequest, TResponse> SubscriptionName<TRequest, TResponse>(this TopicHandlerBuilder<TRequest, TResponse> builder, string subscriptionName)
where TRequest : IRequestMessage<TResponse>
{
builder.ConsumerSettings.SetSubscriptionName(subscriptionName);
return builder;
}

public static RequestResponseBuilder SubscriptionName(this RequestResponseBuilder builder, string subscriptionName)
{
builder.Settings.SetSubscriptionName(subscriptionName);
return builder;
}
}
}
Loading

0 comments on commit 7793f8d

Please sign in to comment.