Skip to content

Commit

Permalink
Fix - combine context predicate w/ Service Bus operations (arcus-azur…
Browse files Browse the repository at this point in the history
…e#135)

* Fix - combine context predicate w/ Service Bus operations

* pr-fix: use queue for the dead letter test

* pr-fix: use correct integration test program type
  • Loading branch information
stijnmoreels authored Nov 10, 2020
1 parent 1b93c04 commit d7185b9
Showing 7 changed files with 174 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -18,6 +18,8 @@ namespace Arcus.Messaging.Pumps.Abstractions.MessageHandling
/// </summary>
public class MessageHandler
{
private readonly object _service;
private readonly Type _serviceType;
private readonly ILogger _logger;

private MessageHandler(Type serviceType, object service, ILogger logger)
@@ -29,24 +31,14 @@ private MessageHandler(Type serviceType, object service, ILogger logger)
() => serviceType.GenericTypeArguments.Length != 2,
$"Message handler type '{serviceType.Name}' has not the expected 2 generic type arguments");

_service = service;
_serviceType = serviceType;
_logger = logger;

Service = service;
ServiceType = serviceType;
MessageType = ServiceType.GenericTypeArguments[0];
MessageContextType = ServiceType.GenericTypeArguments[1];
MessageType = _serviceType.GenericTypeArguments[0];
MessageContextType = _serviceType.GenericTypeArguments[1];
}

/// <summary>
/// Gets the instance of the message handler that this abstracted message handler represents.
/// </summary>
public object Service { get; }

/// <summary>
/// Gets the type of the message handler that this abstracted message handler represents.
/// </summary>
public Type ServiceType { get; }

/// <summary>
/// Gets the type of the message that this abstracted message handler can process.
/// </summary>
@@ -93,35 +85,62 @@ public static IEnumerable<MessageHandler> SubtractFrom(IServiceProvider serviceP
return messageHandlers.AsEnumerable();
}

/// <summary>
/// Gets the concrete class type of the <see cref="IMessageHandler{TMessage,TMessageContext}"/> instance.
/// </summary>
/// <exception cref="TypeNotFoundException">Thrown when there's a problem with finding the type of the <see cref="IMessageHandler{TMessage,TMessageContext}"/>.</exception>
/// <exception cref="ValueMissingException">Thrown when there's no value for the <see cref="IMessageHandler{TMessage,TMessageContext}"/> type value.</exception>
public object GetMessageHandlerInstance()
{
if (_service.GetType().Name == typeof(MessageHandlerRegistration<,>).Name)
{
object messageHandlerType = _service.GetRequiredPropertyValue("Service", BindingFlags.Instance | BindingFlags.NonPublic);
return messageHandlerType;
}

return _service;
}

/// <summary>
/// Gets the type of the <see cref="IMessageHandler{TMessage,TMessageContext}"/> instance.
/// </summary>
/// <exception cref="TypeNotFoundException">Thrown when there's a problem with finding the type of the <see cref="IMessageHandler{TMessage,TMessageContext}"/>.</exception>
/// <exception cref="ValueMissingException">Thrown when there's no value for the <see cref="IMessageHandler{TMessage,TMessageContext}"/> type value.</exception>
public Type GetMessageHandlerType()
{
object messageHandlerInstance = GetMessageHandlerInstance();
return messageHandlerInstance.GetType();
}

/// <summary>
/// Determines if the given <typeparamref name="TMessageContext"/> matches the generic parameter of this message handler.
/// </summary>
/// <typeparam name="TMessageContext">The type of the message context.</typeparam>
public bool CanProcessMessage<TMessageContext>(TMessageContext messageContext) where TMessageContext : MessageContext
{
Type expectedMessageContextType = ServiceType.GenericTypeArguments[1];
Type expectedMessageContextType = _serviceType.GenericTypeArguments[1];
Type actualMessageContextType = typeof(TMessageContext);

if (actualMessageContextType == expectedMessageContextType)
{
_logger.LogInformation(
"Message context type '{ActualMessageContextType}' matches registered message handler's {MessageHandlerType} context type {ExpectedMessageContextType}",
actualMessageContextType.Name, ServiceType.Name, expectedMessageContextType.Name);
actualMessageContextType.Name, _serviceType.Name, expectedMessageContextType.Name);

if (Service.GetType().Name == typeof(MessageHandlerRegistration<,>).Name)
if (_service.GetType().Name == typeof(MessageHandlerRegistration<,>).Name)
{
_logger.LogTrace(
"Determining whether the message context predicate registered with the message handler {MessageHandlerType} holds...",
ServiceType.Name);
_serviceType.Name);

var canProcessMessage = (bool) Service.InvokeMethod(
var canProcessMessage = (bool) _service.InvokeMethod(
"CanProcessMessage",
BindingFlags.Instance | BindingFlags.NonPublic,
messageContext);

_logger.LogInformation(
"Message context predicate registered with the message handler {MessageHandlerType} resulted in {Result}, so {Action} process this message",
ServiceType.Name, canProcessMessage, canProcessMessage ? "can" : "can't");
_serviceType.Name, canProcessMessage, canProcessMessage ? "can" : "can't");

return canProcessMessage;
}
@@ -132,7 +151,7 @@ public bool CanProcessMessage<TMessageContext>(TMessageContext messageContext) w

_logger.LogInformation(
"Message context type '{ActualMessageContextType}' doesn't matches registered message handler's {MessageHandlerType} context type {ExpectedMessageContextType}",
actualMessageContextType.Name, ServiceType.Name, expectedMessageContextType.Name);
actualMessageContextType.Name, _serviceType.Name, expectedMessageContextType.Name);

// Message context type doesn't match registration message context type.
return false;
@@ -165,36 +184,37 @@ public async Task ProcessMessageAsync<TMessageContext>(
Guard.NotNull(messageContext, nameof(messageContext), "Requires a message context to send to the message handler");
Guard.NotNull(correlationInfo, nameof(correlationInfo), "Requires correlation information to send to the message handler");

Type messageHandlerType = GetMessageHandlerType();
_logger.LogTrace(
"Start processing '{MessageType}' message in message handler '{MessageHandlerType}'...",
message.GetType().Name, ServiceType.Name);
message.GetType().Name, messageHandlerType.Name);

const string methodName = "ProcessMessageAsync";
try
{
var processMessageAsync =
(Task)Service.InvokeMethod(
(Task)_service.InvokeMethod(
methodName, BindingFlags.Instance | BindingFlags.Public, message, messageContext, correlationInfo, cancellationToken);

if (processMessageAsync is null)
{
throw new InvalidOperationException(
$"The '{typeof(IMessageHandler<,>).Name}' implementation '{Service.GetType().Name}' returned 'null' while calling the '{methodName}' method");
$"The '{typeof(IMessageHandler<,>).Name}' implementation '{messageHandlerType.Name}' returned 'null' while calling the '{methodName}' method");
}

await processMessageAsync;

_logger.LogInformation(
"Message handler '{MessageHandlerType}' successfully processed '{MessageType}' message", ServiceType.Name, MessageType.Name);
"Message handler '{MessageHandlerType}' successfully processed '{MessageType}' message", messageHandlerType.Name, MessageType.Name);
}
catch (AmbiguousMatchException exception)
{
_logger.LogError(
"Ambiguous match found of '{MethodName}' methods in the '{MessageHandlerType}'. Make sure that only 1 matching '{MethodName}' was found on the '{MessageHandlerType}' message handler",
methodName, ServiceType.Name, methodName, ServiceType.Name);
methodName, messageHandlerType.Name, methodName, messageHandlerType.Name);

throw new AmbiguousMatchException(
$"Ambiguous match found of '{methodName}' methods in the '{Service.GetType().Name}'. ", exception);
$"Ambiguous match found of '{methodName}' methods in the '{messageHandlerType.Name}'. ", exception);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Arcus.Messaging.Abstractions;
using GuardNet;

namespace Arcus.Messaging.Pumps.Abstractions.MessageHandling
{
/// <summary>
/// Represents an specific <see cref="IMessageHandler{TMessage,TMessageContext}"/> registration that requires more metadata information than only the instance itself.
/// </summary>
/// <typeparam name="TMessage">The type of the message the <see cref="IMessageHandler{TMessage,TMessageContext}"/> can handle.</typeparam>
/// <typeparam name="TMessageContext">The type of the message context the <see cref="IMessageHandler{TMessage,TMessageContext}"/> can handle.</typeparam>
internal class MessageHandlerRegistration<TMessage, TMessageContext> : IMessageHandler<TMessage, TMessageContext>
where TMessageContext : MessageContext
{
private readonly Func<TMessageContext, bool> _messageContextFilter;
private readonly IMessageHandler<TMessage, TMessageContext> _messageHandlerImplementation;

/// <summary>
/// Initializes a new instance of the <see cref="MessageHandlerRegistration"/> class.
/// Initializes a new instance of the <see cref="MessageHandlerRegistration{TMessage, TMessageContext}"/> class.
/// </summary>
/// <param name="messageContextFilter">The filter to determine if a given <see cref="MessageContext"/> can be handled by the <see cref="IMessageHandler{TMessage,TMessageContext}"/> implementation.</param>
/// <param name="messageHandlerImplementation">The <see cref="IMessageHandler{TMessage,TMessageContext}"/> implementation that this registration instance represents.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="messageContextFilter"/> or <paramref name="messageHandlerImplementation"/> is <c>null</c>.</exception>
internal MessageHandlerRegistration(
Func<TMessageContext, bool> messageContextFilter,
IMessageHandler<TMessage, TMessageContext> messageHandlerImplementation)
@@ -26,14 +30,22 @@ internal MessageHandlerRegistration(
Guard.NotNull(messageHandlerImplementation, nameof(messageHandlerImplementation));

_messageContextFilter = messageContextFilter;
_messageHandlerImplementation = messageHandlerImplementation;

Service = messageHandlerImplementation;
}

/// <summary>
///
/// Gets the type of the <see cref="IMessageHandler{TMessage,TMessageContext}"/> implementation.
/// </summary>
/// <param name="messageContext"></param>
/// <returns></returns>
internal IMessageHandler<TMessage, TMessageContext> Service { get; }

/// <summary>
/// Determine if the <see cref="IMessageHandler{TMessage,TMessageContext}"/> can process messages within the given <paramref name="messageContext"/> type.
/// </summary>
/// <param name="messageContext">The specific message context, providing information about the received message.</param>
/// <returns>
/// [true] if the <see cref="IMessageHandler{TMessage,TMessageContext}"/> can process the message within the given <paramref name="messageContext"/>; [false] otherwise.
/// </returns>
internal bool CanProcessMessage(TMessageContext messageContext)
{
return _messageContextFilter(messageContext);
@@ -42,7 +54,6 @@ internal bool CanProcessMessage(TMessageContext messageContext)
/// <summary>
/// Process the given <paramref name="message"/> in the current <see cref="IMessageHandler{TMessage,TMessageContext}"/> representation.
/// </summary>
/// <typeparam name="TMessageContext">The type of the message context used in the <see cref="IMessageHandler{TMessage,TMessageContext}"/>.</typeparam>
/// <param name="message">The parsed message to be processed by the <see cref="IMessageHandler{TMessage,TMessageContext}"/>.</param>
/// <param name="messageContext">The context providing more information concerning the processing.</param>
/// <param name="correlationInfo">
@@ -56,7 +67,7 @@ public async Task ProcessMessageAsync(
MessageCorrelationInfo correlationInfo,
CancellationToken cancellationToken)
{
await _messageHandlerImplementation.ProcessMessageAsync(message, messageContext, correlationInfo, cancellationToken);
await Service.ProcessMessageAsync(message, messageContext, correlationInfo, cancellationToken);
}
}
}
5 changes: 3 additions & 2 deletions src/Arcus.Messaging.Pumps.Abstractions/MessagePump.cs
Original file line number Diff line number Diff line change
@@ -231,18 +231,19 @@ private async Task<bool> ProcessMessageAsync<TMessageContext>(
return true;
}

Type messageHandlerType = handler.GetMessageHandlerType();
if (!canProcessMessage)
{
Logger.LogTrace(
"Message handler '{MessageHandlerType}' is not able to process the message because the message context '{MessageContextType}' didn't match the correct message handler's message context",
handler.ServiceType.Name, handler.MessageContextType.Name);
messageHandlerType.Name, handler.MessageContextType.Name);
}

if (!tryDeserializeToMessageFormat)
{
Logger.LogTrace(
"Message handler '{MessageHandlerType}' is not able to process the message because the incoming message cannot be deserialized to the message that the message handler can handle",
handler.ServiceType.Name);
messageHandlerType.Name);
}

return false;
Original file line number Diff line number Diff line change
@@ -498,17 +498,20 @@ protected override Task PreProcessMessageAsync<TMessageContext>(MessageHandler m
Guard.NotNull(messageHandler, nameof(messageHandler), "Requires a message handler instance to pre-process the message");
Guard.NotNull(messageContext, nameof(messageContext), "Requires a message context to pre-process the message");

Logger.LogTrace("Start pre-processing message handler {MessageHandlerType}...", messageHandler.ServiceType.Name);
object messageHandlerInstance = messageHandler.GetMessageHandlerInstance();
Type messageHandlerType = messageHandlerInstance.GetType();

if (messageHandler.Service is AzureServiceBusMessageHandlerTemplate template
Logger.LogTrace("Start pre-processing message handler {MessageHandlerType}...", messageHandlerType.Name);

if (messageHandlerInstance is AzureServiceBusMessageHandlerTemplate template
&& messageContext is AzureServiceBusMessageContext serviceBusMessageContext)
{
template.SetLockToken(serviceBusMessageContext.SystemProperties.LockToken);
template.SetMessageReceiver(_messageReceiver);
}
else
{
Logger.LogTrace("Nothing to pre-process for message handler type '{MessageHandlerType}'", messageHandler.ServiceType.Name);
Logger.LogTrace("Nothing to pre-process for message handler type '{MessageHandlerType}'", messageHandlerType.Name);
}

return Task.CompletedTask;
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using System;
using System.Collections.Generic;
using System.Text;
using Arcus.EventGrid.Publishing;
using Arcus.Messaging.Pumps.ServiceBus;
using Arcus.Messaging.Tests.Core.Messages.v1;
using Arcus.Messaging.Tests.Workers.MessageHandlers;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace Arcus.Messaging.Tests.Integration.Fixture
{
public class ServiceBusQueueContextPredicateSelectionWithDeadLetterProgram
{
public static void main(string[] args)
{
CreateHostBuilder(args)
.Build()
.Run();
}

public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureAppConfiguration(configuration =>
{
configuration.AddCommandLine(args);
configuration.AddEnvironmentVariables();
})
.ConfigureLogging(loggingBuilder => loggingBuilder.AddConsole(options => options.IncludeScopes = true))
.ConfigureServices((hostContext, services) =>
{
services.AddServiceBusQueueMessagePump(configuration => configuration["ARCUS_SERVICEBUS_CONNECTIONSTRING"], options => options.AutoComplete = false)
.WithMessageHandler<PassThruOrderMessageHandler, Order, AzureServiceBusMessageContext>(context => false)
.WithServiceBusMessageHandler<CustomerMessageHandler, Customer>(context => context.Properties["Topic"].ToString() == "Customers")
.WithServiceBusMessageHandler<OrdersAzureServiceBusDeadLetterMessageHandler, Order>(context => context.Properties["Topic"].ToString() == "Orders");

services.AddTcpHealthProbes("ARCUS_HEALTH_PORT");
});
}
}
Loading

0 comments on commit d7185b9

Please sign in to comment.