Skip to content

Commit

Permalink
Implement fire and forget send (#117)
Browse files Browse the repository at this point in the history
  • Loading branch information
Havret authored Jun 1, 2024
1 parent 6f0dccf commit f80b8d5
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 0 deletions.
27 changes: 27 additions & 0 deletions src/ArtemisNetCoreClient/IProducer.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,33 @@
namespace ActiveMQ.Artemis.Core.Client;

/// <summary>
/// A producer sends messages to ActiveMQ Artemis addresses.
/// </summary>
public interface IProducer : IAsyncDisposable
{
/// <summary>
/// Sends a message synchronously to the broker. This method is primarily used for non-durable
/// message delivery since it does not wait for a confirmation from the broker and operates
/// in a fire-and-forget manner.
/// </summary>
/// <remarks>
/// This method should typically be used when message delivery speed is prioritized over reliability.
/// The message will be sent with 'at most once' delivery guarantee, as there's no acknowledgment
/// from the broker that the message has been received or persisted.
/// </remarks>
void SendMessage(Message message);

/// <summary>
/// Sends a message asynchronously to the broker. This method supports both durable and non-durable message
/// delivery modes, as specified by the message's durable property. It awaits for a confirmation
/// from the broker, ensuring that the message is either stored (for durable messages) or acknowledged
/// (for non-durable messages) before completing.
/// </summary>
/// <remarks>
/// This method should be used when reliability is required, and it supports awaiting the acknowledgment
/// from the broker. The delivery semantics are 'at least once' for durable messages, where the broker
/// confirms the persistence of the message. For non-durable messages, the completion of the task
/// indicates that the broker has received the message.
/// </remarks>
ValueTask SendMessageAsync(Message message, CancellationToken cancellationToken = default);
}
11 changes: 11 additions & 0 deletions src/ArtemisNetCoreClient/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,24 @@ public ValueTask DisposeAsync()
return session.RemoveProducerAsync(ProducerId);
}

public void SendMessage(Message message)
{
message.Address = Address;
if (RoutingType.HasValue)
{
message.RoutingType = RoutingType.Value;
}
session.SendMessage(message: message, producerId: ProducerId);
}

public async ValueTask SendMessageAsync(Message message, CancellationToken cancellationToken)
{
message.Address = Address;
if (RoutingType.HasValue)
{
message.RoutingType = RoutingType.Value;
}

await session.SendMessageAsync(message: message, producerId: ProducerId, cancellationToken: cancellationToken);
}
}
12 changes: 12 additions & 0 deletions src/ArtemisNetCoreClient/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,18 @@ internal ValueTask RemoveProducerAsync(int producerId)
return ValueTask.CompletedTask;
}

internal void SendMessage(Message message, int producerId)
{
var request = new SessionSendMessage
{
Message = message,
ProducerId = producerId,
RequiresResponse = false,
CorrelationId = -1,
};
connection.Send(request, ChannelId);
}

internal async ValueTask SendMessageAsync(Message message, int producerId, CancellationToken cancellationToken)
{
var request = new SessionSendMessage
Expand Down
52 changes: 52 additions & 0 deletions test/ArtemisNetCoreClient.Tests/ProducerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -239,4 +239,56 @@ await consumer.ReceiveMessageAsync(testFixture.CancellationToken)
});
});
}

[Fact]
public async Task Should_send_message_in_a_fire_and_forget_manner()
{
await using var testFixture = await TestFixture.CreateAsync(testOutputHelper);
var scenario = TestScenarioFactory.Default(new XUnitOutputAdapter(testOutputHelper));

await using var connection = await testFixture.CreateConnectionAsync();
await using var session = await connection.CreateSessionAsync();

var (addressName, queueName) = await scenario.Step("Create address and queue", async () =>
{
var addressName = await testFixture.CreateAddressAsync(RoutingType.Anycast);
var queueName = await testFixture.CreateQueueAsync(addressName, RoutingType.Anycast);
return (addressName, queueName);
});

await scenario.Step("Send a message in a fire-and-forget manner", async () =>
{
await using var producer = await session.CreateProducerAsync(new ProducerConfiguration
{
Address = addressName,
RoutingType = RoutingType.Anycast
}, testFixture.CancellationToken);

// ReSharper disable once MethodHasAsyncOverload
producer.SendMessage(new Message
{
Body = "fire_and_forget_msg"u8.ToArray(),
});
});

await scenario.Step("Confirm message count (one message should be available on the queue)", async () =>
{
await RetryUtil.RetryUntil(
func: () => session.GetQueueInfoAsync(queueName, testFixture.CancellationToken),
until: info => info?.MessageCount == 1,
cancellationToken: testFixture.CancellationToken
);
});

await scenario.Step("Verify message payload", async () =>
{
await using var consumer = await session.CreateConsumerAsync(new ConsumerConfiguration
{
QueueName = queueName
}, testFixture.CancellationToken);
var message = await consumer.ReceiveMessageAsync(testFixture.CancellationToken);
Assert.NotNull(message);
Assert.Equal("fire_and_forget_msg"u8.ToArray(), message.Body.ToArray());
});
}
}

0 comments on commit f80b8d5

Please sign in to comment.