Skip to content

Commit

Permalink
redis: Handle nil messages in PEL
Browse files Browse the repository at this point in the history
  • Loading branch information
suraciii committed Apr 14, 2023
1 parent 9947f85 commit e0b8597
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 11 deletions.
1 change: 1 addition & 0 deletions perf/CloudEventTester/Ping.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public class PingHandler : ICloudEventHandler<Ping>
public static long Count => s_count;
public Task HandleAsync(CloudEvent<Ping> cloudEvent, CancellationToken token)
{
//throw new NotImplementedException();
_ = Interlocked.Increment(ref s_count);
return Task.CompletedTask;
}
Expand Down
5 changes: 3 additions & 2 deletions perf/CloudEventTester/Tester.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ public abstract class Tester
public Tester()
{
Services = new ServiceCollection()
//.AddLogging(logging => logging.AddConsole().SetMinimumLevel(LogLevel.Warning));
.AddLogging(logging => logging.AddConsole().SetMinimumLevel(LogLevel.Critical));
.AddLogging(logging => logging.AddConsole().SetMinimumLevel(LogLevel.Information));
//.AddLogging(logging => logging.AddConsole().SetMinimumLevel(LogLevel.Warning));
//.AddLogging(logging => logging.AddConsole().SetMinimumLevel(LogLevel.Critical));

string providerName = Environment.GetEnvironmentVariable("PROVIDER")!.ToLowerInvariant();
if (providerName == "kafka")
Expand Down
34 changes: 32 additions & 2 deletions src/CloudEventDotNet.Redis/RedisMessageChannel.Writer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -173,17 +173,47 @@ async Task ClaimPendingMessages()

_telemetry.OnMessagesClaimed(claimedMessages.Length);
await DispatchMessages(claimedMessages).ConfigureAwait(false);

var messageToRemoveIds = messagesToClaim
.Except(claimedMessages.Where(msg => !msg.IsNull).Select(msg => msg.Id));
await RemoveMessagesThatNoLongerExistFromPending(messageToRemoveIds);
}
}
}

private async Task RemoveMessagesThatNoLongerExistFromPending(IEnumerable<RedisValue> messageIds)
{
foreach (var messageId in messageIds)
{
_telemetry.Logger.LogInformation("Claiming nil message {messageId}", messageId);
StreamEntry[] claimedMessages = await _database.StreamClaimAsync(
_channelContext.Topic,
_channelContext.ConsumerGroup,
_channelContext.ConsumerGroup,
(long)_options.ProcessingTimeout.TotalMilliseconds,
new[] { messageId }
).ConfigureAwait(false);
if (claimedMessages.Length == 0 || claimedMessages[0].IsNull)
{
await _database.StreamAcknowledgeAsync(
_channelContext.Topic,
_channelContext.ConsumerGroup,
messageId).ConfigureAwait(false);
_telemetry.Logger.LogInformation("Acked nil message {messageId}", messageId);
}
else
{
await DispatchMessages(claimedMessages);
}
}
}

private async ValueTask DispatchMessages(StreamEntry[] messages, [CallerMemberName]string caller = "")
private async ValueTask DispatchMessages(StreamEntry[] messages, [CallerMemberName] string caller = "")
{
foreach (StreamEntry message in messages)
{
if (message.IsNull)
{
_telemetry.OnNullMessageFetched(caller);
continue;
}

Expand Down
8 changes: 1 addition & 7 deletions src/CloudEventDotNet.Redis/RedisTelemetry.MessageChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,6 @@ public void OnMessagesFetched(int count)

// ..

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Fetched null message on {caller}"
)]
public partial void OnNullMessageFetched(string caller);

[LoggerMessage(
Level = LogLevel.Trace,
Message = "Message {id} dispatched to process"
Expand Down Expand Up @@ -144,7 +138,7 @@ public void OnMessagesDispatched(int count)
public partial void OnNoTimeoutedMessagesToClaim(string id, long idle, int dc);

[LoggerMessage(
Level = LogLevel.Debug,
Level = LogLevel.Information,
Message = "Claimed {count} messages"
)]
private partial void LogOnMessagesClaimed(int count);
Expand Down

0 comments on commit e0b8597

Please sign in to comment.