Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.

Commit 4e6ca73

Browse files
committedMar 21, 2019
Add the possibility to modify properties when abandoning a message
This is useful to store exception details when abandoning a message and still keeping the convenience of registering a message or session handler with AutoComplete = true
1 parent 02c03d6 commit 4e6ca73

6 files changed

+61
-18
lines changed
 

‎src/Microsoft.Azure.ServiceBus/MessageHandlerOptions.cs

+22-3
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44
namespace Microsoft.Azure.ServiceBus
55
{
66
using System;
7+
using System.Collections.Generic;
78
using System.Threading;
89
using System.Threading.Tasks;
10+
using Core;
911
using Primitives;
1012

1113
/// <summary>Provides options associated with message pump processing using
@@ -26,6 +28,22 @@ public sealed class MessageHandlerOptions
2628
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
2729
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
2830
public MessageHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler)
31+
: this(async args => { await exceptionReceivedHandler(args); return null; })
32+
{
33+
}
34+
35+
/// <summary>Initializes a new instance of the <see cref="MessageHandlerOptions" /> class.
36+
/// Default Values:
37+
/// <see cref="MaxConcurrentCalls"/> = 1
38+
/// <see cref="AutoComplete"/> = true
39+
/// <see cref="ReceiveTimeOut"/> = 1 minute
40+
/// <see cref="MaxAutoRenewDuration"/> = 5 minutes
41+
/// </summary>
42+
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
43+
/// When the exception happens during user callback, the returned dictionary is passed to <see cref="IReceiverClient.AbandonAsync"/>.
44+
/// For other actions, the returned dictionary is ignored.
45+
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
46+
public MessageHandlerOptions(Func<ExceptionReceivedEventArgs, Task<IDictionary<string, object>>> exceptionReceivedHandler)
2947
{
3048
this.MaxConcurrentCalls = 1;
3149
this.AutoComplete = true;
@@ -36,7 +54,7 @@ public MessageHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionRec
3654

3755
/// <summary>Occurs when an exception is received. Enables you to be notified of any errors encountered by the message pump.
3856
/// When errors are received calls will automatically be retried, so this is informational. </summary>
39-
public Func<ExceptionReceivedEventArgs, Task> ExceptionReceivedHandler { get; }
57+
public Func<ExceptionReceivedEventArgs, Task<IDictionary<string, object>>> ExceptionReceivedHandler { get; }
4058

4159
/// <summary>Gets or sets the maximum number of concurrent calls to the callback the message pump should initiate.</summary>
4260
/// <value>The maximum number of concurrent calls to the callback.</value>
@@ -79,15 +97,16 @@ public TimeSpan MaxAutoRenewDuration
7997

8098
internal TimeSpan ReceiveTimeOut { get; }
8199

82-
internal async Task RaiseExceptionReceived(ExceptionReceivedEventArgs eventArgs)
100+
internal async Task<IDictionary<string, object>> RaiseExceptionReceived(ExceptionReceivedEventArgs eventArgs)
83101
{
84102
try
85103
{
86-
await this.ExceptionReceivedHandler(eventArgs).ConfigureAwait(false);
104+
return await this.ExceptionReceivedHandler(eventArgs).ConfigureAwait(false);
87105
}
88106
catch (Exception exception)
89107
{
90108
MessagingEventSource.Log.ExceptionReceivedHandlerThrewException(exception);
109+
return null;
91110
}
92111
}
93112
}

‎src/Microsoft.Azure.ServiceBus/MessageReceivePump.cs

+6-5
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
namespace Microsoft.Azure.ServiceBus
55
{
66
using System;
7+
using System.Collections.Generic;
78
using System.Diagnostics;
89
using System.Threading;
910
using System.Threading.Tasks;
@@ -47,7 +48,7 @@ bool ShouldRenewLock()
4748
this.registerHandlerOptions.AutoRenewLock;
4849
}
4950

50-
Task RaiseExceptionReceived(Exception e, string action)
51+
Task<IDictionary<string, object>> RaiseExceptionReceived(Exception e, string action)
5152
{
5253
var eventArgs = new ExceptionReceivedEventArgs(e, action, this.endpoint, this.messageReceiver.Path, this.messageReceiver.ClientId);
5354
return this.registerHandlerOptions.RaiseExceptionReceived(eventArgs);
@@ -147,12 +148,12 @@ async Task MessageDispatchTask(Message message)
147148
catch (Exception exception)
148149
{
149150
MessagingEventSource.Log.MessageReceiverPumpUserCallbackException(this.messageReceiver.ClientId, message, exception);
150-
await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).ConfigureAwait(false);
151+
var propertiesToModify = await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).ConfigureAwait(false);
151152

152153
// Nothing much to do if UserCallback throws, Abandon message and Release semaphore.
153154
if (!(exception is MessageLockLostException))
154155
{
155-
await this.AbandonMessageIfNeededAsync(message).ConfigureAwait(false);
156+
await this.AbandonMessageIfNeededAsync(message, propertiesToModify).ConfigureAwait(false);
156157
}
157158

158159
if (ServiceBusDiagnosticSource.IsEnabled())
@@ -191,13 +192,13 @@ void CancelAutoRenewLock(object state)
191192
}
192193
}
193194

194-
async Task AbandonMessageIfNeededAsync(Message message)
195+
async Task AbandonMessageIfNeededAsync(Message message, IDictionary<string, object> propertiesToModify)
195196
{
196197
try
197198
{
198199
if (this.messageReceiver.ReceiveMode == ReceiveMode.PeekLock)
199200
{
200-
await this.messageReceiver.AbandonAsync(message.SystemProperties.LockToken).ConfigureAwait(false);
201+
await this.messageReceiver.AbandonAsync(message.SystemProperties.LockToken, propertiesToModify).ConfigureAwait(false);
201202
}
202203
}
203204
catch (Exception exception)

‎src/Microsoft.Azure.ServiceBus/SessionHandlerOptions.cs

+22-3
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44
namespace Microsoft.Azure.ServiceBus
55
{
66
using System;
7+
using System.Collections.Generic;
78
using System.Threading;
89
using System.Threading.Tasks;
10+
using Core;
911
using Primitives;
1012

1113
/// <summary>Provides options associated with session pump processing using
@@ -27,6 +29,22 @@ public sealed class SessionHandlerOptions
2729
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
2830
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
2931
public SessionHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler)
32+
: this(async args => { await exceptionReceivedHandler(args); return null; })
33+
{
34+
}
35+
36+
/// <summary>Initializes a new instance of the <see cref="SessionHandlerOptions" /> class.
37+
/// Default Values:
38+
/// <see cref="MaxConcurrentSessions"/> = 2000
39+
/// <see cref="AutoComplete"/> = true
40+
/// <see cref="MessageWaitTimeout"/> = 1 minute
41+
/// <see cref="MaxAutoRenewDuration"/> = 5 minutes
42+
/// </summary>
43+
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
44+
/// When the exception happens during user callback, the returned dictionary is passed to <see cref="IReceiverClient.AbandonAsync"/>.
45+
/// For other actions, the returned dictionary is ignored.
46+
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
47+
public SessionHandlerOptions(Func<ExceptionReceivedEventArgs, Task<IDictionary<string, object>>> exceptionReceivedHandler)
3048
{
3149
// These are default values
3250
this.AutoComplete = true;
@@ -38,7 +56,7 @@ public SessionHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionRec
3856

3957
/// <summary>Occurs when an exception is received. Enables you to be notified of any errors encountered by the session pump.
4058
/// When errors are received calls will automatically be retried, so this is informational. </summary>
41-
public Func<ExceptionReceivedEventArgs, Task> ExceptionReceivedHandler { get; }
59+
public Func<ExceptionReceivedEventArgs, Task<IDictionary<string, object>>> ExceptionReceivedHandler { get; }
4260

4361
/// <summary>Gets or sets the duration for which the session lock will be renewed automatically.</summary>
4462
/// <value>The duration for which the session renew its state.</value>
@@ -92,15 +110,16 @@ public int MaxConcurrentSessions
92110

93111
internal int MaxConcurrentAcceptSessionCalls { get; set; }
94112

95-
internal async Task RaiseExceptionReceived(ExceptionReceivedEventArgs eventArgs)
113+
internal async Task<IDictionary<string, object>> RaiseExceptionReceived(ExceptionReceivedEventArgs eventArgs)
96114
{
97115
try
98116
{
99-
await this.ExceptionReceivedHandler(eventArgs).ConfigureAwait(false);
117+
return await this.ExceptionReceivedHandler(eventArgs).ConfigureAwait(false);
100118
}
101119
catch (Exception exception)
102120
{
103121
MessagingEventSource.Log.ExceptionReceivedHandlerThrewException(exception);
122+
return null;
104123
}
105124
}
106125
}

‎src/Microsoft.Azure.ServiceBus/SessionReceivePump.cs

+6-5
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
namespace Microsoft.Azure.ServiceBus
55
{
66
using System;
7+
using System.Collections.Generic;
78
using System.Diagnostics;
89
using System.Threading;
910
using System.Threading.Tasks;
@@ -74,7 +75,7 @@ bool ShouldRenewSessionLock()
7475
this.sessionHandlerOptions.AutoRenewLock;
7576
}
7677

77-
Task RaiseExceptionReceived(Exception e, string action)
78+
Task<IDictionary<string, object>> RaiseExceptionReceived(Exception e, string action)
7879
{
7980
var eventArgs = new ExceptionReceivedEventArgs(e, action, this.endpoint, this.entityPath, this.clientId);
8081
return this.sessionHandlerOptions.RaiseExceptionReceived(eventArgs);
@@ -96,13 +97,13 @@ async Task CompleteMessageIfNeededAsync(IMessageSession session, Message message
9697
}
9798
}
9899

99-
async Task AbandonMessageIfNeededAsync(IMessageSession session, Message message)
100+
async Task AbandonMessageIfNeededAsync(IMessageSession session, Message message, IDictionary<string, object> propertiesToModify)
100101
{
101102
try
102103
{
103104
if (session.ReceiveMode == ReceiveMode.PeekLock)
104105
{
105-
await session.AbandonAsync(message.SystemProperties.LockToken).ConfigureAwait(false);
106+
await session.AbandonAsync(message.SystemProperties.LockToken, propertiesToModify).ConfigureAwait(false);
106107
}
107108
}
108109
catch (Exception exception)
@@ -239,11 +240,11 @@ async Task MessagePumpTaskAsync(IMessageSession session)
239240
}
240241

241242
MessagingEventSource.Log.MessageReceivePumpTaskException(this.clientId, session.SessionId, exception);
242-
await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).ConfigureAwait(false);
243+
var propertiesToModify = await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).ConfigureAwait(false);
243244
callbackExceptionOccurred = true;
244245
if (!(exception is MessageLockLostException || exception is SessionLockLostException))
245246
{
246-
await this.AbandonMessageIfNeededAsync(session, message).ConfigureAwait(false);
247+
await this.AbandonMessageIfNeededAsync(session, message, propertiesToModify).ConfigureAwait(false);
247248
}
248249
}
249250
finally

‎test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt

+4-2
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,9 @@ namespace Microsoft.Azure.ServiceBus
179179
public sealed class MessageHandlerOptions
180180
{
181181
public MessageHandlerOptions(System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> exceptionReceivedHandler) { }
182+
public MessageHandlerOptions(System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task<System.Collections.Generic.IDictionary<string, object>>> exceptionReceivedHandler) { }
182183
public bool AutoComplete { get; set; }
183-
public System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> ExceptionReceivedHandler { get; }
184+
public System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task<System.Collections.Generic.IDictionary<string, object>>> ExceptionReceivedHandler { get; }
184185
public System.TimeSpan MaxAutoRenewDuration { get; set; }
185186
public int MaxConcurrentCalls { get; set; }
186187
}
@@ -384,8 +385,9 @@ namespace Microsoft.Azure.ServiceBus
384385
public sealed class SessionHandlerOptions
385386
{
386387
public SessionHandlerOptions(System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> exceptionReceivedHandler) { }
388+
public SessionHandlerOptions(System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task<System.Collections.Generic.IDictionary<string, object>>> exceptionReceivedHandler) { }
387389
public bool AutoComplete { get; set; }
388-
public System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> ExceptionReceivedHandler { get; }
390+
public System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task<System.Collections.Generic.IDictionary<string, object>>> ExceptionReceivedHandler { get; }
389391
public System.TimeSpan MaxAutoRenewDuration { get; set; }
390392
public int MaxConcurrentSessions { get; set; }
391393
public System.TimeSpan MessageWaitTimeout { get; set; }

‎test/Microsoft.Azure.ServiceBus.UnitTests/Microsoft.Azure.ServiceBus.UnitTests.csproj

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
<SignAssembly>true</SignAssembly>
77
<PublicSign Condition=" '$(OS)' != 'Windows_NT' ">true</PublicSign>
88
<GenerateRuntimeConfigurationFiles>true</GenerateRuntimeConfigurationFiles>
9+
<LangVersion>7.3</LangVersion>
910
</PropertyGroup>
1011

1112
<ItemGroup>

0 commit comments

Comments
 (0)
This repository has been archived.