Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
update to R/R sample
Browse files Browse the repository at this point in the history
  • Loading branch information
clemensv committed Apr 4, 2016
1 parent 4ae9231 commit 909ef1c
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 60 deletions.
6 changes: 2 additions & 4 deletions QueuesRequestResponse/Client/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,15 @@ public async Task Run(
TransportType = TransportType.Amqp,
TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(sendKeyName, sendKey)
});
senderFactory.RetryPolicy = new RetryExponential(TimeSpan.FromSeconds(1), TimeSpan.FromMinutes(5), 10);


var receiverFactory = MessagingFactory.Create(
namespaceAddress,
new MessagingFactorySettings
{
TransportType = TransportType.Amqp,
TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(receiveKeyName, receiveKey)
});
receiverFactory.RetryPolicy = new RetryExponential(TimeSpan.FromSeconds(1), TimeSpan.FromMinutes(5), 10);


var sender = senderFactory.CreateMessageSender(basicQueueName);
var receiver = receiverFactory.CreateMessageReceiver(basicQueue2Name);
var rr = new RequestReplySender(sender, receiver);
Expand Down
36 changes: 36 additions & 0 deletions QueuesRequestResponse/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Request/Response Pattern with Queues

The request/response message exchange pattern is very common, and with many protocols, including the
dominant HTTP protocol, it is the only supported pattern: The client sends a request, and the server
replies with a response.

This sample shows how to implement the request/response pattern over a pair of Service Bus Queues.

## Considerations

We assume that the requesting party and the responding party are not part of the same application
and may indeed use request and reply queues on different Service Bus namespaces in different
datacenters. That assumption influences how we will deal with access control.

We will also assume that the client expects a somewhat timely response within its current application
instance lifetime. "Timely" may mean under a second, but it may also mean 15 minutes. The point of
running a request/response pattern over queues is commonly that the transfers need to be reliable,
and that the work required to satisfy the request is non-trivial.

To keep the sample complexity manageable, we will not persist the information about pending requests;
responses that arrive back at the requesting party and that cannot be matched to requests of the
current application instance will simply be dead-lettered.

## Modeling Request/Response

Service Bus Queues are one-way communication entities that route messages from a sender to a receiver
via the Service Bus message broker. To create a feedback path from receiver back to the sender, we
must therefore use a separate queue.

[TBD...]






109 changes: 54 additions & 55 deletions QueuesRequestResponse/Server/RequestReplyResponder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ public class RequestReplyResponder : IDisposable
{
readonly Uri namespaceUri;
readonly MessageReceiver receiver;
readonly Dictionary<Uri, FactoryTokenProviderTuple> responderFactories = new Dictionary<Uri, FactoryTokenProviderTuple>();
readonly object responderFactoriesMutex = new object();
readonly Dictionary<Uri, ReplyDestination> responderFactories = new Dictionary<Uri, ReplyDestination>();
readonly object replyDestinationsMutex = new object();
readonly Func<BrokeredMessage, Task<BrokeredMessage>> responseFunction;

public RequestReplyResponder(Uri namespaceUri, MessageReceiver receiver, Func<BrokeredMessage, Task<BrokeredMessage>> responseFunction)
Expand All @@ -52,13 +52,8 @@ public void Dispose()
public async Task Run(CancellationToken token)
{
var tcs = new TaskCompletionSource<bool>();
token.Register(
() =>
{
this.receiver.Close();
tcs.SetResult(true);
});
this.receiver.OnMessageAsync(rq => this.Respond(rq, this.responseFunction), new OnMessageOptions {AutoComplete = false});
token.Register(() => { this.receiver.Close(); tcs.SetResult(true); });
this.receiver.OnMessageAsync(rq => this.Respond(rq, this.responseFunction), new OnMessageOptions { AutoComplete = false });
await tcs.Task;
}

Expand All @@ -71,71 +66,37 @@ async Task Respond(BrokeredMessage request, Func<BrokeredMessage, Task<BrokeredM

if (Uri.TryCreate(request.ReplyTo, UriKind.RelativeOrAbsolute, out targetUri))
{
string replyToken = null;
// make the URI absolute to this namespace
if (!targetUri.IsAbsoluteUri)
{
targetUri = new Uri(this.namespaceUri, targetUri);
}
var queryPortion = targetUri.Query;
if (!string.IsNullOrEmpty(queryPortion) && queryPortion.Length > 1)
{
var nvm = HttpUtility.ParseQueryString(queryPortion.Substring(1));
var tokenString = nvm["tk"];
if (tokenString != null)
{
replyToken = tokenString;
}
}
var replyToken = GetReplyToken(targetUri);
if (replyToken == null)
{
await request.DeadLetterAsync("NoReplyToToken", "No 'tk' query parameter in ReplyTo field found");
await request.DeadLetterAsync("NoReplyToToken", "No 'tk' query parameter in ReplyTo field URI found");
return;
}
// truncate the query portion of the URI
targetUri = new Uri(targetUri.GetLeftPart(UriPartial.Path));



// now we're reasonably confident that the input message can be
// replied to, so let's execute the message processing

try
{
// call the callback
var reply = await handleRequest(request);
// set the correlation-id on the reply
reply.CorrelationId = request.MessageId;

FactoryTokenProviderTuple factory;

lock (this.responderFactoriesMutex)
{
var signatureTokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(replyToken);
if (this.responderFactories.TryGetValue(targetUri, out factory))
{
factory.TokenProvider.TokenProvider =
signatureTokenProvider;
}
else
{
var tokenProvider = new DelegatingTokenProvider(
signatureTokenProvider
);
var receiverFactory = MessagingFactory.Create(
targetUri.GetLeftPart(UriPartial.Authority),
new MessagingFactorySettings
{
TransportType = TransportType.Amqp,
TokenProvider = tokenProvider
});
factory = new FactoryTokenProviderTuple {Factory = receiverFactory, TokenProvider = tokenProvider};
this.responderFactories.Add(targetUri, factory);
}
}

var sender = await factory.Factory.CreateMessageSenderAsync(targetUri.AbsolutePath.Substring(1));
var replyDestination = this.GetOrCreateReplyDestination(replyToken, targetUri);
var sender = await replyDestination.Factory.CreateMessageSenderAsync(targetUri.AbsolutePath.Substring(1));
await sender.SendAsync(reply);
await request.CompleteAsync();
}
catch
catch (Exception e)
{
await request.DeadLetterAsync();
await request.DeadLetterAsync("ErrorHandlingMessage", e.Message);
}
}
else
Expand All @@ -145,7 +106,45 @@ async Task Respond(BrokeredMessage request, Func<BrokeredMessage, Task<BrokeredM
}
}

class FactoryTokenProviderTuple
ReplyDestination GetOrCreateReplyDestination(string replyToken, Uri targetUri)
{
ReplyDestination replyDestination;
lock (this.replyDestinationsMutex)
{
var signatureTokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(replyToken);
if (this.responderFactories.TryGetValue(targetUri, out replyDestination))
{
replyDestination.TokenProvider.TokenProvider = signatureTokenProvider;
}
else
{
var tokenProvider = new DelegatingTokenProvider(signatureTokenProvider);
var receiverFactory = MessagingFactory.Create(targetUri.GetLeftPart(UriPartial.Authority),
new MessagingFactorySettings { TransportType = TransportType.Amqp, TokenProvider = tokenProvider });
replyDestination = new ReplyDestination { Factory = receiverFactory, TokenProvider = tokenProvider };
this.responderFactories.Add(targetUri, replyDestination);
}
}
return replyDestination;
}

static string GetReplyToken(Uri targetUri)
{
string replyToken = null;
var queryPortion = targetUri.Query;
if (!string.IsNullOrEmpty(queryPortion) && queryPortion.Length > 1)
{
var nvm = HttpUtility.ParseQueryString(queryPortion.Substring(1));
var tokenString = nvm["tk"];
if (tokenString != null)
{
replyToken = tokenString;
}
}
return replyToken;
}

class ReplyDestination
{
public MessagingFactory Factory { get; set; }
public DelegatingTokenProvider TokenProvider { get; set; }
Expand Down
2 changes: 1 addition & 1 deletion QueuesRequestResponse/Server/Server.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ public async Task Run(string namespaceAddress, string queueName, string receiveT
});
try
{
receiverFactory.RetryPolicy = new RetryExponential(TimeSpan.FromSeconds(1), TimeSpan.FromMinutes(5), 10);
var receiver = receiverFactory.CreateMessageReceiver(queueName, ReceiveMode.PeekLock);
try
{
Expand All @@ -45,6 +44,7 @@ public async Task Run(string namespaceAddress, string queueName, string receiveT
receiver,
async m =>
{
Console.WriteLine("Got {0}", m.Label);
switch (m.Label)
{
case "requestA":
Expand Down

0 comments on commit 909ef1c

Please sign in to comment.