Skip to content

Commit

Permalink
implements XREADGROUP command
Browse files Browse the repository at this point in the history
  • Loading branch information
pepelev committed Jul 20, 2020
1 parent e68c6c3 commit 5a9c217
Show file tree
Hide file tree
Showing 12 changed files with 389 additions and 33 deletions.
172 changes: 172 additions & 0 deletions Rediska.Tests/Commands/Streams/XREADGROUP_Should.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
namespace Rediska.Tests.Commands.Streams
{
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Fixtures;
using FluentAssertions;
using NUnit.Framework;
using Protocol;
using Protocol.Visitors;
using Rediska.Commands;
using Rediska.Commands.Auxiliary;
using Rediska.Commands.Streams;
using static Rediska.Commands.Streams.XGROUP.CREATE.Mode;
using static Rediska.Commands.Streams.XREADGROUP.Mode;
using Id = Rediska.Commands.Streams.Id;

[TestFixtureSource(typeof(ConnectionCollection))]
public sealed class XREADGROUP_Should
{
private readonly Connection connection;
private Fixture fixture;

public XREADGROUP_Should(Connection connection)
{
this.connection = connection;
}

[SetUp]
public void SetUp()
{
fixture = new Fixture(connection);
}

[TearDown]
public async Task TearDownAsync()
{
await fixture.TearDownAsync().ConfigureAwait(false);
}

[Test]
public async Task Test()
{
var key = fixture.NewKey();
var group = new GroupName("group-1");
var xadd = new XADD(key, ("name", "value"));
var xgroupCreate = new XGROUP.CREATE(key, group, Offset.FromId(Id.Minimum), NotCreateStream);
await fixture.ExecuteAsync(xadd).ConfigureAwait(false);
await fixture.ExecuteAsync(xgroupCreate).ConfigureAwait(false);

var sut = new XREADGROUP(group, "consumer-1", Count.Unbound, RequireAcknowledgment, (key, Id.Minimum));
var entries = await fixture.ExecuteAsync(sut).ConfigureAwait(false);
Equals(entries, new Stream(key, new Entry(("name", "value"))));
}

[Test]
public async Task Useless_Dollar()
{
var key = fixture.NewKey();
const string group = "group";
var xgroupCreate = new XGROUP.CREATE(key, group, Offset.FromId(Id.Minimum), CreateStreamIfNotExists);
await fixture.ExecuteAsync(xgroupCreate).ConfigureAwait(false);

var sut = new PlainCommand("XREADGROUP", "GROUP", group, "consumer", "STREAMS", key.ToBytes(), "$");
var response = await fixture.ExecuteAsync(sut).ConfigureAwait(false);
response.Should().Be(
new Error(
"ERR The $ ID is meaningless in the context of XREADGROUP: "
+ "you want to read the history of this consumer by specifying "
+ "a proper ID, or use the > ID to get new messages. The $ ID "
+ "would just return an empty result set."
)
);
}

[Test]
public async Task Return_Error_When_Run_Against_Not_Existing_Stream()
{
var sut = new XREADGROUP("group", "consumer", Count.Unbound, RequireAcknowledgment, ("key", Id.Minimum))
.WithRawResponse();
var response = await fixture.ExecuteAsync(sut).ConfigureAwait(false);
response.Should().Be(
new Error("NOGROUP No such key 'key' or consumer group 'group' in XREADGROUP with GROUP option")
);
}

[Test]
public async Task Return_Error_When_Any_Of_Streams_Does_Not_Exists()
{
var key = fixture.NewKey();
const string group = "group";
var xgroupCreate = new XGROUP.CREATE(key, group, Offset.FromId(Id.Minimum), CreateStreamIfNotExists);
await fixture.ExecuteAsync(xgroupCreate).ConfigureAwait(false);

var sut = new XREADGROUP(
group,
"consumer",
Count.Unbound,
RequireAcknowledgment,
(key, Id.Minimum),
("non-existing-key", Id.Minimum)
).WithRawResponse();

var response = await fixture.ExecuteAsync(sut).ConfigureAwait(false);
response.Should().Be(
new Error("NOGROUP No such key 'non-existing-key' or consumer group 'group' in XREADGROUP with GROUP option")
);
}

[Test]
public async Task Exploratory_1()
{
var key = fixture.NewKey();
var group = new GroupName("group-1");
var xadd = new XADD(key, ("name", "value"));
var xgroupCreate = new XGROUP.CREATE(key, group, Offset.FromId(Id.Minimum), NotCreateStream);
await fixture.ExecuteAsync(xadd).ConfigureAwait(false);
await fixture.ExecuteAsync(xgroupCreate).ConfigureAwait(false);

var @new = new PlainCommand("XREADGROUP", "GROUP", "group-1", "consumer", "STREAMS", key.ToBytes(), ">")
.WithResponseStructure(CompositeVisitors.StreamEntriesList);
var entries = await fixture.ExecuteAsync(@new).ConfigureAwait(false);
Equals(entries, new Stream(key, new Entry(("name", "value"))));
}

private static void Equals(IReadOnlyList<Entries> entries, params Stream[] streams)
{
entries.Should().HaveCount(streams.Length);
for (var i = 0; i < entries.Count; i++)
{
var expected = streams[i];
var actual = entries[i];
actual.Stream.ToBytes().Should().Equal(expected.Key.ToBytes());
actual.Should().HaveCount(expected.Entries.Length);
for (var j = 0; j < actual.Count; j++)
{
var expectedEntry = expected.Entries[j];
var actualEntry = actual[j];
actualEntry.Should().Equal(expectedEntry);
}
}
}

private sealed class Stream
{
public Key Key { get; }
public Entry[] Entries { get; }

public Stream(Key key, params Entry[] entries)
{
Key = key;
Entries = entries;
}
}

private sealed class Entry : IReadOnlyList<(BulkString Field, BulkString Value)>
{
private readonly (BulkString Field, BulkString Value)[] content;

public Entry(params (BulkString Field, BulkString Value)[] content)
{
this.content = content;
}

public IEnumerator<(BulkString Field, BulkString Value)> GetEnumerator() => content.AsEnumerable().GetEnumerator();
IEnumerator IEnumerable.GetEnumerator() => content.GetEnumerator();
public int Count => content.Length;
public (BulkString Field, BulkString Value) this[int index] => content[index];
}
}
}
5 changes: 5 additions & 0 deletions Rediska.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@
<s:Boolean x:Key="/Default/UserDictionary/Words/=movablekeys/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=MSET/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=MSETNX/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=NOACK/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=NOGROUP/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=NOKEY/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=NOSAVE/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=noscript/@EntryIndexedValue">True</s:Boolean>
Expand Down Expand Up @@ -388,12 +390,15 @@
<s:Boolean x:Key="/Default/UserDictionary/Words/=WITHSCORES/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=XACK/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=XADD/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=XCLAIM/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=XDEL/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=XGROUP/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=XINFO/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=XLEN/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=XPENDING/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=XRANGE/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=XREAD/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=XREADGROUP/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=XREVRANGE/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=XTRIM/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=ZADD/@EntryIndexedValue">True</s:Boolean>
Expand Down
4 changes: 4 additions & 0 deletions Rediska/Commands/CommandExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace Rediska.Commands
{
using Auxiliary;
using Protocol;
using Protocol.Visitors;
using Utils;

Expand All @@ -12,5 +13,8 @@ public static Command<TResult> WithResponseStructure<TInner, TResult>(
this Command<TInner> command,
Visitor<TResult> responseStructure)
=> new CustomResponseHandling<TInner, TResult>(command, responseStructure);

public static Command<DataType> WithRawResponse<TInner>(this Command<TInner> command)
=> new CustomResponseHandling<TInner, DataType>(command, Id.Singleton);
}
}
2 changes: 1 addition & 1 deletion Rediska/Commands/Streams/Entries.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public Entries(Array reply)

public IEnumerator<Entry> GetEnumerator() => Content.GetEnumerator();
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
public int Count => reply.Count;
public int Count => Content.Count;
public Entry this[int index] => Content[index];
}
}
1 change: 1 addition & 0 deletions Rediska/Commands/Streams/GroupName.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public GroupName(string value)
public bool Equals(GroupName other) => equality.Equals(Value, other.Value);
public static bool operator ==(GroupName left, GroupName right) => left.Equals(right);
public static implicit operator GroupName(string value) => new GroupName(value);
public static implicit operator string(GroupName name) => name.Value;
public static bool operator !=(GroupName left, GroupName right) => !left.Equals(right);
public BulkString ToBulkString(BulkStringFactory factory) => factory.Utf8(Value);
public override bool Equals(object obj) => obj is GroupName other && Equals(other);
Expand Down
3 changes: 2 additions & 1 deletion Rediska/Commands/Streams/Offset.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

public readonly struct Offset
{
private static readonly PlainBulkString dollar = new PlainBulkString("$");
public static Offset EndOfStream => new Offset(Type.EndOfStream, default);
private readonly Type type;
private readonly Id id;
Expand All @@ -20,7 +21,7 @@ private Offset(Type type, Id id)

public BulkString ToBulkString(BulkStringFactory factory) => type switch
{
Type.EndOfStream => new PlainBulkString("$"),
Type.EndOfStream => dollar,
_ => id.ToBulkString(factory, Id.Print.SkipMinimalLow)
};

Expand Down
6 changes: 1 addition & 5 deletions Rediska/Commands/Streams/XREAD.BLOCK.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ public sealed partial class XREAD
public sealed class BLOCK : Command<BLOCK.Response>
{
private static readonly PlainBulkString block = new PlainBulkString("BLOCK");

private static readonly Visitor<Response> blockResponseStructure = Protocol.Visitors.Id.Singleton
.Then(reply => new Response(reply));

private readonly Count count;
private readonly MillisecondsTimeout blockTimeout;
private readonly IReadOnlyList<(Key Key, Offset Offset)> streams;
Expand Down Expand Up @@ -56,7 +52,7 @@ public override IEnumerable<BulkString> Request(BulkStringFactory factory)
}
}

public override Visitor<Response> ResponseStructure => blockResponseStructure;
public override Visitor<Response> ResponseStructure => CompositeVisitors.StreamBlockingRead;

public readonly struct Response : IReadOnlyList<Entries>
{
Expand Down
2 changes: 1 addition & 1 deletion Rediska/Commands/Streams/XREAD.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public sealed partial class XREAD : Command<IReadOnlyList<Entries>>

private static readonly ListVisitor<Entries> responseStructure = new ListVisitor<Entries>(
ArrayExpectation.Singleton,
ArrayExpectation2.Singleton.Then(array => new Entries(array))
CompositeVisitors.StreamEntries
);

private readonly Count count;
Expand Down
63 changes: 63 additions & 0 deletions Rediska/Commands/Streams/XREADGROUP.BLOCK.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
namespace Rediska.Commands.Streams
{
using System;
using System.Collections.Generic;
using Protocol;
using Protocol.Visitors;

public sealed partial class XREADGROUP
{
public sealed class BLOCK : Command<XREAD.BLOCK.Response>
{
private static readonly PlainBulkString block = new PlainBulkString("BLOCK");
private static readonly PlainBulkString greaterThan = new PlainBulkString(">");
private readonly GroupName groupName;
private readonly ConsumerName consumerName;
private readonly Count count;
private readonly MillisecondsTimeout blockTimeout;
private readonly IReadOnlyList<Key> streams;

public BLOCK(
GroupName groupName,
ConsumerName consumerName,
Count count,
MillisecondsTimeout blockTimeout,
IReadOnlyList<Key> streams)
{
this.groupName = groupName;
this.consumerName = consumerName;
this.count = count ?? throw new ArgumentNullException(nameof(count));
this.blockTimeout = blockTimeout;
this.streams = streams ?? throw new ArgumentNullException(nameof(streams));
}

public override IEnumerable<BulkString> Request(BulkStringFactory factory)
{
yield return name;
yield return group;
yield return groupName.ToBulkString(factory);
yield return consumerName.ToBulkString(factory);
foreach (var argument in count.Arguments(factory))
{
yield return argument;
}

yield return block;
yield return blockTimeout.ToBulkString(factory);

yield return streamsArgument;
foreach (var key in streams)
{
yield return key.ToBulkString(factory);
}

for (var i = 0; i < streams.Count; i++)
{
yield return greaterThan;
}
}

public override Visitor<XREAD.BLOCK.Response> ResponseStructure => CompositeVisitors.StreamBlockingRead;
}
}
}
Loading

0 comments on commit 5a9c217

Please sign in to comment.