Skip to content

Commit

Permalink
Merge pull request #55 from Shuttle/async
Browse files Browse the repository at this point in the history
Async
  • Loading branch information
eben-roux authored Apr 30, 2024
2 parents 1926f80 + 16c53fc commit 27976ac
Show file tree
Hide file tree
Showing 176 changed files with 6,243 additions and 2,034 deletions.
84 changes: 43 additions & 41 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Documentation

Please visit out [official documentation](https://shuttle.github.io/shuttle-esb/) for more information.
Please visit out [official documentation](https://www.pendel.co.za/shuttle-esb/index.html) for more information.

# Getting Started

Start a new **Console Application** project and select a Shuttle.Esb queue implementation from the supported queues:
Start a new **Console Application** project. We'll need to install one of the support queue implementations. For this example we'll use `Shuttle.Esb.AzureStorageQueues` which can be hosted locally using [Azurite](https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azurite?tabs=visual-studio%2Cblob-storage):

```
PM> Install-Package Shuttle.Esb.AzureStorageQueues
Expand All @@ -21,26 +21,27 @@ Next we'll implement our endpoint in order to start listening on our queue:
``` c#
internal class Program
{
private static void Main()
static async Task Main(string[] args)
{
Host.CreateDefaultBuilder()
await Host.CreateDefaultBuilder()
.ConfigureServices(services =>
{
services.AddServiceBus(builder =>
{
builder.Options.Inbox.WorkQueueUri = "azuresq://azure/work";
});

services.AddAzureStorageQueues(builder =>
{
builder.AddOptions("azure", new AzureStorageQueueOptions
services
.AddServiceBus(builder =>
{
ConnectionString = "UseDevelopmentStorage=true;"
builder.Options.Inbox.WorkQueueUri = "azuresq://azure/work";
builder.Options.Asynchronous = true; // NOTE: we'll be using async processing
})
.AddAzureStorageQueues(builder =>
{
builder.AddOptions("azure", new AzureStorageQueueOptions
{
ConnectionString = "UseDevelopmentStorage=true;"
});
});
});
})
.Build()
.Run();
.RunAsync();
}
}
```
Expand All @@ -50,41 +51,42 @@ Even though the options may be set directly as above, typically one would make u
```c#
internal class Program
{
private static void Main()
private static async Task Main(string[] args)
{
Host.CreateDefaultBuilder()
await Host.CreateDefaultBuilder()
.ConfigureServices(services =>
{
var configuration =
var configuration =
new ConfigurationBuilder()
.AddJsonFile("appsettings.json")
.Build();

services.AddSingleton<IConfiguration>(configuration);

services.AddServiceBus(builder =>
{
configuration
.GetSection(ServiceBusOptions.SectionName)
.Bind(builder.Options);
});
services
.AddSingleton<IConfiguration>(configuration)
.AddServiceBus(builder =>
{
configuration
.GetSection(ServiceBusOptions.SectionName)
.Bind(builder.Options);

services.AddAzureStorageQueues(builder =>
{
builder.AddOptions("azure", new AzureStorageQueueOptions
builder.Options.Asynchronous = true; // NOTE: we'll be using async processing
})
.AddAzureStorageQueues(builder =>
{
ConnectionString = configuration
.GetConnectionString("azure")
builder.AddOptions("azure", new AzureStorageQueueOptions
{
ConnectionString = configuration
.GetConnectionString("azure")
});
});
});
})
.Build()
.Run();
.RunAsync();
}
}
```

The `appsettings.json` file would be as follows:
The `appsettings.json` file would be as follows (remember to set to `Copy always`):

```json
{
Expand All @@ -104,7 +106,7 @@ The `appsettings.json` file would be as follows:
### Send a command message for processing

``` c#
bus.Send(new RegisterMember
await serviceBus.SendAsync(new RegisterMember
{
UserName = "user-name",
EMailAddress = "[email protected]"
Expand All @@ -116,7 +118,7 @@ bus.Send(new RegisterMember
Before publishing an event one would need to register an `ISubscrtiptionService` implementation such as [Shuttle.Esb.Sql.Subscription](/implementations/subscription/sql.md).

``` c#
bus.Publish(new MemberRegistered
await serviceBus.PublishAsync(new MemberRegistered
{
UserName = "user-name"
});
Expand All @@ -134,17 +136,17 @@ services.AddServiceBus(builder =>
### Handle any messages

``` c#
public class RegisterMemberHandler : IMessageHandler<RegisterMember>
public class RegisterMemberHandler : IAsyncMessageHandler<RegisterMember>
{
public RegisterMemberHandler(IDependency dependency)
{
}

public void ProcessMessage(IHandlerContext<RegisterMember> context)
public async Task ProcessMessageAsync(IHandlerContext<RegisterMember> context)
{
// perform member registration
context.Publish(new MemberRegistered
await context.PublishAsync(new MemberRegistered
{
UserName = context.Message.UserName
});
Expand All @@ -153,9 +155,9 @@ public class RegisterMemberHandler : IMessageHandler<RegisterMember>
```

``` c#
public class MemberRegisteredHandler : IMessageHandler<MemberRegistered>
public class MemberRegisteredHandler : IAsyncMessageHandler<MemberRegistered>
{
public void ProcessMessage(IHandlerContext<MemberRegistered> context)
public async Task ProcessMessageAsync(IHandlerContext<MemberRegistered> context)
{
// processing
}
Expand Down
112 changes: 49 additions & 63 deletions Shuttle.Esb.Tests/DeferredProcessingFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
using Moq;
using NUnit.Framework;
using Shuttle.Core.Pipelines;
using Shuttle.Core.Serialization;
using Shuttle.Core.System;
using Shuttle.Core.Threading;

namespace Shuttle.Esb.Tests
Expand All @@ -16,94 +18,78 @@ public class DeferredProcessingFixture
[Test]
public void Should_be_able_to_defer_processing()
{
var serializer = new DefaultSerializer();
var pipelineFactory = new Mock<IPipelineFactory>();
var configuration = new Mock<IServiceBusConfiguration>();

var deferredMessageProcessor = new DeferredMessageProcessor(new ServiceBusOptions(), pipelineFactory.Object);

deferredMessageProcessor.DeferredMessageProcessingHalted += (sender, args) =>
var inboxConfiguration = new InboxConfiguration
{
Console.WriteLine(@"[deferred processing halted]");
WorkQueue = new MemoryQueue(new Uri("memory://memory/work-queue")),
DeferredQueue = new MemoryQueue(new Uri("memory://memory/deferred-queue")),
ErrorQueue= new MemoryQueue(new Uri("memory://memory/error-queue"))
};

configuration.Setup(m => m.Inbox).Returns(new InboxConfiguration
{
DeferredMessageProcessor = deferredMessageProcessor
});
configuration.Setup(m => m.Inbox).Returns(inboxConfiguration);

var getDeferredMessageObserver = new Mock<IGetDeferredMessageObserver>();
var deserializeTransportMessageObserver = new Mock<IDeserializeTransportMessageObserver>();
var processDeferredMessageObserver = new Mock<IProcessDeferredMessageObserver>();
var processDeferredMessageObserver = new ProcessDeferredMessageObserver();

var transportMessage1 = new TransportMessage
{
MessageId = new Guid("973808b9-8cc6-433b-b9d2-a08e1236c104"),
IgnoreTillDate = DateTime.Now.AddSeconds(5).ToUniversalTime()
};
pipelineFactory.Setup(m => m.GetPipeline<DeferredMessagePipeline>()).Returns(
new DeferredMessagePipeline(
configuration.Object,
new GetDeferredMessageObserver(),
new DeserializeTransportMessageObserver(Options.Create(new ServiceBusOptions()), serializer, new EnvironmentService(), new ProcessService()),
processDeferredMessageObserver
));

var transportMessage2 = new TransportMessage
{
MessageId = new Guid("d06bf8e5-f81c-4ca6-a3c4-368efae97b0b"),
IgnoreTillDate = DateTime.Now.AddSeconds(4).ToUniversalTime()
};
var deferredMessageProcessor = new DeferredMessageProcessor(Options.Create(new ServiceBusOptions{ Inbox = new InboxOptions { DeferredQueueUri = "memory://memory/deferred-queue", DeferredMessageProcessorResetInterval = TimeSpan.FromMilliseconds(500)}}), pipelineFactory.Object);

var transportMessage3 = new TransportMessage
deferredMessageProcessor.DeferredMessageProcessingHalted += (sender, args) =>
{
MessageId = new Guid("b7d21a52-bb98-4a59-85cb-d1e1e26e72df"),
IgnoreTillDate = DateTime.Now.AddSeconds(3).ToUniversalTime()
Console.WriteLine(@"[deferred processing halted]");
};

var deferredMessages = new Stack<TransportMessage>();

deferredMessages.Push(transportMessage1);
deferredMessages.Push(transportMessage2);
deferredMessages.Push(transportMessage3);
var transportMessage1 = CreateTransportMessage(DateTime.Now.AddSeconds(3).ToUniversalTime());
var transportMessage2 = CreateTransportMessage(DateTime.Now.AddSeconds(2).ToUniversalTime());
var transportMessage3 = CreateTransportMessage(DateTime.Now.AddSeconds(1).ToUniversalTime());

var index = 1;

getDeferredMessageObserver.Setup(m => m.Execute(It.IsAny<OnGetMessage>())).Callback(
(OnGetMessage pipelineEvent) =>
{
if (deferredMessages.Count == 0)
{
pipelineEvent.Pipeline.Abort();
return;
}
inboxConfiguration.DeferredQueue.Enqueue(transportMessage1, serializer.Serialize(transportMessage1));
inboxConfiguration.DeferredQueue.Enqueue(transportMessage2, serializer.Serialize(transportMessage2));
inboxConfiguration.DeferredQueue.Enqueue(transportMessage3, serializer.Serialize(transportMessage3));

var transportMessage = deferredMessages.Pop();
var deferredMessageReturned = !transportMessage.IsIgnoring();
var messagesReturned = new List<TransportMessage>();

Console.WriteLine($@"[processing]: index = {index} / message id = {transportMessage.MessageId}");

pipelineEvent.Pipeline.State.SetTransportMessage(transportMessage);
pipelineEvent.Pipeline.State.SetWorking();
pipelineEvent.Pipeline.State.SetDeferredMessageReturned(deferredMessageReturned);

if (deferredMessageReturned)
{
Console.WriteLine($@"[returned]: index = {index} / message id = {transportMessage.MessageId}");
}

index++;
});
processDeferredMessageObserver.MessageReturned += (sender, e) =>
{
messagesReturned.Add(e.TransportMessage);
};

pipelineFactory.Setup(m => m.GetPipeline<DeferredMessagePipeline>()).Returns(
new DeferredMessagePipeline(
configuration.Object,
getDeferredMessageObserver.Object,
deserializeTransportMessageObserver.Object,
processDeferredMessageObserver.Object
));
var timeout = DateTime.Now.AddMilliseconds(3500);

using (new ProcessorThreadPool("DeferredMessageProcessor", 1,
new DeferredMessageProcessorFactory(configuration.Object),
new DeferredMessageProcessorFactory(deferredMessageProcessor),
new ProcessorThreadOptions()).Start())
{
while (deferredMessages.Count > 0)
while (messagesReturned.Count < 3)
{
Thread.Sleep(250);
}
}

Assert.That(messagesReturned.Find(item => item.MessageId.Equals(transportMessage1.MessageId)), Is.Not.Null);
Assert.That(messagesReturned.Find(item => item.MessageId.Equals(transportMessage2.MessageId)), Is.Not.Null);
Assert.That(messagesReturned.Find(item => item.MessageId.Equals(transportMessage3.MessageId)), Is.Not.Null);
}

private static TransportMessage CreateTransportMessage(DateTime ignoreTillDate)
{
return new TransportMessage
{
MessageId = new Guid("973808b9-8cc6-433b-b9d2-a08e1236c104"),
PrincipalIdentityName = "unit-test",
MessageType = "message-type",
AssemblyQualifiedName = "assembly-qualified-name",
IgnoreTillDate = ignoreTillDate
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace Shuttle.Esb.Tests
{
public class OnTest : PipelineEvent
public class OnException : PipelineEvent
{
}
}
Loading

0 comments on commit 27976ac

Please sign in to comment.