Skip to content

Commit

Permalink
Refactor NewXInfoGroupsCmd
Browse files Browse the repository at this point in the history
  • Loading branch information
vmihailenco committed Sep 23, 2020
1 parent a32502b commit 86326c4
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 45 deletions.
86 changes: 47 additions & 39 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,10 +1395,10 @@ func (cmd *XPendingExtCmd) readReply(rd *proto.Reader) error {

type XInfoGroupsCmd struct {
baseCmd
val []XInfoGroups
val []XInfoGroup
}

type XInfoGroups struct {
type XInfoGroup struct {
Name string
Consumers int64
Pending int64
Expand All @@ -1416,11 +1416,11 @@ func NewXInfoGroupsCmd(ctx context.Context, stream string) *XInfoGroupsCmd {
}
}

func (cmd *XInfoGroupsCmd) Val() []XInfoGroups {
func (cmd *XInfoGroupsCmd) Val() []XInfoGroup {
return cmd.val
}

func (cmd *XInfoGroupsCmd) Result() ([]XInfoGroups, error) {
func (cmd *XInfoGroupsCmd) Result() ([]XInfoGroup, error) {
return cmd.val, cmd.err
}

Expand All @@ -1429,58 +1429,66 @@ func (cmd *XInfoGroupsCmd) String() string {
}

func (cmd *XInfoGroupsCmd) readReply(rd *proto.Reader) error {
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
for i := int64(0); i < n; i++ {
v, err := rd.ReadReply(xGroupInfoParser)
if err != nil {
return nil, err
}
cmd.val = append(cmd.val, v.(XInfoGroups))
n, err := rd.ReadArrayLen()
if err != nil {
return err
}

cmd.val = make([]XInfoGroup, n)

for i := 0; i < n; i++ {
cmd.val[i], err = readXGroupInfo(rd)
if err != nil {
return err
}
return nil, nil
})
return err
}

return nil
}

func xGroupInfoParser(rd *proto.Reader, n int64) (interface{}, error) {
func readXGroupInfo(rd *proto.Reader) (XInfoGroup, error) {
var group XInfoGroup

n, err := rd.ReadArrayLen()
if err != nil {
return group, err
}
if n != 8 {
return nil, fmt.Errorf("redis: got %d elements in XINFO GROUPS reply,"+
"wanted 8", n)
return group, fmt.Errorf("redis: got %d elements in XINFO GROUPS reply, wanted 8", n)
}
var (
err error
grp XInfoGroups
key string
val string
)

for i := 0; i < 4; i++ {
key, err = rd.ReadString()
key, err := rd.ReadString()
if err != nil {
return nil, err
return group, err
}
val, err = rd.ReadString()

val, err := rd.ReadString()
if err != nil {
return nil, err
return group, err
}

switch key {
case "name":
grp.Name = val
group.Name = val
case "consumers":
grp.Consumers, err = strconv.ParseInt(val, 0, 64)
group.Consumers, err = strconv.ParseInt(val, 0, 64)
if err != nil {
return group, err
}
case "pending":
grp.Pending, err = strconv.ParseInt(val, 0, 64)
group.Pending, err = strconv.ParseInt(val, 0, 64)
if err != nil {
return group, err
}
case "last-delivered-id":
grp.LastDeliveredID = val
group.LastDeliveredID = val
default:
return nil, fmt.Errorf("redis: unexpected content %s "+
"in XINFO GROUPS reply", key)
}
if err != nil {
return nil, err
return group, fmt.Errorf("redis: unexpected content %s in XINFO GROUPS reply", key)
}
}
return grp, err

return group, nil
}

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -2257,15 +2265,15 @@ func (cmd *SlowLogCmd) readReply(rd *proto.Reader) error {
}

cmdString := make([]string, cmdLen)
for i := 0; i < int(cmdLen); i++ {
for i := 0; i < cmdLen; i++ {
cmdString[i], err = rd.ReadString()
if err != nil {
return nil, err
}
}

var address, name string
for i := 4; i < int(n); i++ {
for i := 4; i < n; i++ {
str, err := rd.ReadString()
if err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -1517,7 +1517,7 @@ func (c cmdable) XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd {
keyPos += 2
}
args = append(args, "streams")
keyPos += 1
keyPos++
for _, s := range a.Streams {
args = append(args, s)
}
Expand Down Expand Up @@ -1592,10 +1592,10 @@ func (c cmdable) XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSlic
}
if a.NoAck {
args = append(args, "noack")
keyPos += 1
keyPos++
}
args = append(args, "streams")
keyPos += 1
keyPos++
for _, s := range a.Streams {
args = append(args, s)
}
Expand Down
11 changes: 8 additions & 3 deletions internal/proto/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (r *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) {
}
}

func (r *Reader) ReadArrayLen() (int64, error) {
func (r *Reader) ReadArrayLen() (int, error) {
line, err := r.ReadLine()
if err != nil {
return 0, err
Expand All @@ -190,7 +190,11 @@ func (r *Reader) ReadArrayLen() (int64, error) {
case ErrorReply:
return 0, ParseErrorReply(line)
case ArrayReply:
return parseArrayLen(line)
n, err := parseArrayLen(line)
if err != nil {
return 0, err
}
return int(n), nil
default:
return 0, fmt.Errorf("redis: can't parse array reply: %.100q", line)
}
Expand All @@ -216,7 +220,8 @@ func (r *Reader) ReadScanReply() ([]string, uint64, error) {
}

keys := make([]string, n)
for i := int64(0); i < n; i++ {

for i := 0; i < n; i++ {
key, err := r.ReadString()
if err != nil {
return nil, 0, err
Expand Down

0 comments on commit 86326c4

Please sign in to comment.