From 3871963e2d9a686b5ab1724410da0f54e80f90c5 Mon Sep 17 00:00:00 2001 From: monkey Date: Tue, 18 May 2021 15:41:20 +0800 Subject: [PATCH] fix #1755 Signed-off-by: monkey --- command.go | 295 ++++++++++++++++++++++++++++++++++++++++++++++- commands.go | 13 +++ commands_test.go | 127 ++++++++++++++++++++ 3 files changed, 434 insertions(+), 1 deletion(-) diff --git a/command.go b/command.go index f10c4781a..9ff8a260a 100644 --- a/command.go +++ b/command.go @@ -1512,7 +1512,7 @@ type XInfoConsumer struct { Idle int64 } -var _ Cmder = (*XInfoGroupsCmd)(nil) +var _ Cmder = (*XInfoConsumersCmd)(nil) func NewXInfoConsumersCmd(ctx context.Context, stream string, group string) *XInfoConsumersCmd { return &XInfoConsumersCmd{ @@ -1784,6 +1784,299 @@ func xStreamInfoParser(rd *proto.Reader, n int64) (interface{}, error) { //------------------------------------------------------------------------------ +type XInfoStreamFullCmd struct { + baseCmd + val *XInfoStreamFull +} + +type XInfoStreamFull struct { + Length int64 + RadixTreeKeys int64 + RadixTreeNodes int64 + LastGeneratedID string + Entries []XMessage + Groups []XInfoStreamGroup +} + +type XInfoStreamGroup struct { + Name string + LastDeliveredID string + PelCount int64 + Pending []XInfoStreamGroupPending + Consumers []XInfoStreamConsumer +} + +type XInfoStreamGroupPending struct { + ID string + Consumer string + DeliveryTime time.Time + DeliveryCount int64 +} + +type XInfoStreamConsumer struct { + Name string + SeenTime time.Time + PelCount int64 + Pending []XInfoStreamConsumerPending +} + +type XInfoStreamConsumerPending struct { + ID string + DeliveryTime time.Time + DeliveryCount int64 +} + +var _ Cmder = (*XInfoStreamFullCmd)(nil) + +func NewXInfoStreamFullCmd(ctx context.Context, args ...interface{}) *XInfoStreamFullCmd { + return &XInfoStreamFullCmd{ + baseCmd: baseCmd{ + ctx: ctx, + args: args, + }, + } +} + +func (cmd *XInfoStreamFullCmd) Val() *XInfoStreamFull { + return cmd.val +} + +func (cmd *XInfoStreamFullCmd) Result() (*XInfoStreamFull, error) { + return cmd.val, cmd.err +} + +func (cmd *XInfoStreamFullCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *XInfoStreamFullCmd) readReply(rd *proto.Reader) error { + n, err := rd.ReadArrayLen() + if err != nil { + return err + } + if n != 12 { + return fmt.Errorf("redis: got %d elements in XINFO STREAM FULL reply,"+ + "wanted 12", n) + } + + cmd.val = &XInfoStreamFull{} + + for i := 0; i < 6; i++ { + key, err := rd.ReadString() + if err != nil { + return err + } + + switch key { + case "length": + cmd.val.Length, err = rd.ReadIntReply() + case "radix-tree-keys": + cmd.val.RadixTreeKeys, err = rd.ReadIntReply() + case "radix-tree-nodes": + cmd.val.RadixTreeNodes, err = rd.ReadIntReply() + case "last-generated-id": + cmd.val.LastGeneratedID, err = rd.ReadString() + case "entries": + cmd.val.Entries, err = readXMessageSlice(rd) + case "groups": + groups, err := rd.ReadReply(readStreamGroups) + if err != nil { + return err + } + cmd.val.Groups = groups.([]XInfoStreamGroup) + default: + return fmt.Errorf("redis: unexpected content %s "+ + "in XINFO STREAM reply", key) + } + if err != nil { + return err + } + } + return nil +} + +func readStreamGroups(rd *proto.Reader, n int64) (interface{}, error) { + groups := make([]XInfoStreamGroup, 0, n) + for i := int64(0); i < n; i++ { + nn, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if nn != 10 { + return nil, fmt.Errorf("redis: got %d elements in XINFO STREAM FULL reply,"+ + "wanted 10", nn) + } + key, err := rd.ReadString() + if err != nil { + return nil, err + } + + group := XInfoStreamGroup{} + + switch key { + case "name": + group.Name, err = rd.ReadString() + case "last-delivered-id": + group.LastDeliveredID, err = rd.ReadString() + case "pel-count": + group.PelCount, err = rd.ReadIntReply() + case "pending": + group.Pending, err = readXInfoStreamGroupPending(rd) + case "consumers": + group.Consumers, err = readXInfoStreamConsumers(rd) + default: + return nil, fmt.Errorf("redis: unexpected content %s "+ + "in XINFO STREAM reply", key) + } + + if err != nil { + return nil, err + } + + groups = append(groups, group) + } + + return groups, nil +} + +func readXInfoStreamGroupPending(rd *proto.Reader) ([]XInfoStreamGroupPending, error) { + n, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + + pending := make([]XInfoStreamGroupPending, 0, n) + + for i := 0; i < n; i++ { + nn, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if nn != 4 { + return nil, fmt.Errorf("redis: got %d elements in XINFO STREAM FULL reply,"+ + "wanted 4", nn) + } + + p := XInfoStreamGroupPending{} + + p.ID, err = rd.ReadString() + if err != nil { + return nil, err + } + + p.Consumer, err = rd.ReadString() + if err != nil { + return nil, err + } + + delivery, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + p.DeliveryTime = time.Unix(delivery/1000, delivery%1000*int64(time.Millisecond)) + + p.DeliveryCount, err = rd.ReadIntReply() + if err != nil { + return nil, err + } + + pending = append(pending, p) + } + + return pending, nil +} + +func readXInfoStreamConsumers(rd *proto.Reader) ([]XInfoStreamConsumer, error) { + n, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + + consumers := make([]XInfoStreamConsumer, 0, n) + + for i := 0; i < n; i++ { + nn, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if nn != 8 { + return nil, fmt.Errorf("redis: got %d elements in XINFO STREAM FULL reply,"+ + "wanted 8", nn) + } + + cKey, err := rd.ReadString() + if err != nil { + return nil, err + } + + c := XInfoStreamConsumer{} + + switch cKey { + case "name": + c.Name, err = rd.ReadString() + case "seen-time": + seen, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + c.SeenTime = time.Unix(seen/1000, seen%1000*int64(time.Millisecond)) + case "pel-count": + c.PelCount, err = rd.ReadIntReply() + case "pending": + pendingNumber, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + + c.Pending = make([]XInfoStreamConsumerPending, 0, pendingNumber) + + for f := 0; f < pendingNumber; f++ { + nn, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if nn != 3 { + return nil, fmt.Errorf("redis: got %d elements in XINFO STREAM reply,"+ + "wanted 3", nn) + } + + p := XInfoStreamConsumerPending{} + + p.ID, err = rd.ReadString() + if err != nil { + return nil, err + } + + delivery, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + p.DeliveryTime = time.Unix(delivery/1000, delivery%1000*int64(time.Millisecond)) + + p.DeliveryCount, err = rd.ReadIntReply() + if err != nil { + return nil, err + } + + c.Pending = append(c.Pending, p) + } + default: + return nil, fmt.Errorf("redis: unexpected content %s "+ + "in XINFO STREAM reply", cKey) + } + + if err != nil { + return nil, err + } + + consumers = append(consumers, c) + } + + return consumers, nil +} + +//------------------------------------------------------------------------------ + type ZSliceCmd struct { baseCmd diff --git a/commands.go b/commands.go index 0619675a6..2302db64e 100644 --- a/commands.go +++ b/commands.go @@ -1906,6 +1906,19 @@ func (c cmdable) XInfoStream(ctx context.Context, key string) *XInfoStreamCmd { return cmd } +// XInfoStreamFull XINFO STREAM FULL [COUNT count] +// redis-server >= 6.0. +func (c cmdable) XInfoStreamFull(ctx context.Context, key string, count int) *XInfoStreamFullCmd { + args := make([]interface{}, 0, 6) + args = append(args, "xinfo", "stream", key, "full") + if count > 0 { + args = append(args, "count", count) + } + cmd := NewXInfoStreamFullCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + //------------------------------------------------------------------------------ // Z represents sorted set member. diff --git a/commands_test.go b/commands_test.go index e7648232c..2d89017f7 100644 --- a/commands_test.go +++ b/commands_test.go @@ -4392,6 +4392,133 @@ var _ = Describe("Commands", func() { })) }) + It("should XINFO STREAM FULL", func() { + res, err := client.XInfoStreamFull(ctx, "stream", 2).Result() + Expect(err).NotTo(HaveOccurred()) + res.RadixTreeKeys = 0 + res.RadixTreeNodes = 0 + + // Verify DeliveryTime + now := time.Now() + maxElapsed := 10 * time.Minute + for k, g := range res.Groups { + for k2, p := range g.Pending { + Expect(now.Sub(p.DeliveryTime)).To(BeNumerically("<=", maxElapsed)) + res.Groups[k].Pending[k2].DeliveryTime = time.Time{} + } + for k3, c := range g.Consumers { + Expect(now.Sub(c.SeenTime)).To(BeNumerically("<=", maxElapsed)) + res.Groups[k].Consumers[k3].SeenTime = time.Time{} + + for k4, p := range c.Pending { + Expect(now.Sub(p.DeliveryTime)).To(BeNumerically("<=", maxElapsed)) + res.Groups[k].Consumers[k3].Pending[k4].DeliveryTime = time.Time{} + } + } + } + + Expect(res).To(Equal(&redis.XInfoStreamFull{ + Length: 3, + RadixTreeKeys: 0, + RadixTreeNodes: 0, + LastGeneratedID: "3-0", + Entries: []redis.XMessage{ + {ID: "1-0", Values: map[string]interface{}{"uno": "un"}}, + {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, + }, + Groups: []redis.XInfoStreamGroup{ + { + Name: "group1", + LastDeliveredID: "3-0", + PelCount: 3, + Pending: []redis.XInfoStreamGroupPending{ + { + ID: "1-0", + Consumer: "consumer1", + DeliveryTime: time.Time{}, + DeliveryCount: 1, + }, + { + ID: "2-0", + Consumer: "consumer1", + DeliveryTime: time.Time{}, + DeliveryCount: 1, + }, + }, + Consumers: []redis.XInfoStreamConsumer{ + { + Name: "consumer1", + SeenTime: time.Time{}, + PelCount: 2, + Pending: []redis.XInfoStreamConsumerPending{ + { + ID: "1-0", + DeliveryTime: time.Time{}, + DeliveryCount: 1, + }, + { + ID: "2-0", + DeliveryTime: time.Time{}, + DeliveryCount: 1, + }, + }, + }, + { + Name: "consumer2", + SeenTime: time.Time{}, + PelCount: 1, + Pending: []redis.XInfoStreamConsumerPending{ + { + ID: "3-0", + DeliveryTime: time.Time{}, + DeliveryCount: 1, + }, + }, + }, + }, + }, + { + Name: "group2", + LastDeliveredID: "3-0", + PelCount: 2, + Pending: []redis.XInfoStreamGroupPending{ + { + ID: "2-0", + Consumer: "consumer1", + DeliveryTime: time.Time{}, + DeliveryCount: 1, + }, + { + ID: "3-0", + Consumer: "consumer1", + DeliveryTime: time.Time{}, + DeliveryCount: 1, + }, + }, + Consumers: []redis.XInfoStreamConsumer{ + { + Name: "consumer1", + SeenTime: time.Time{}, + PelCount: 2, + Pending: []redis.XInfoStreamConsumerPending{ + { + ID: "2-0", + DeliveryTime: time.Time{}, + DeliveryCount: 1, + }, + { + ID: "3-0", + DeliveryTime: time.Time{}, + DeliveryCount: 1, + }, + }, + }, + }, + }, + }, + })) + }) + It("should XINFO GROUPS", func() { res, err := client.XInfoGroups(ctx, "stream").Result() Expect(err).NotTo(HaveOccurred())