diff --git a/command.go b/command.go index a7bb592df..2ebb85d01 100644 --- a/command.go +++ b/command.go @@ -7,7 +7,7 @@ import ( "strings" "time" - "github.com/go-redis/redis/internal" + "github.com/go-redis/redis/internal" "github.com/go-redis/redis/internal/pool" "github.com/go-redis/redis/internal/proto" "github.com/go-redis/redis/internal/util" @@ -762,7 +762,6 @@ func (cmd *XStreamSliceCmd) readReply(cn *pool.Conn) error { return nil } - type XStreamSliceMapCmd struct { baseCmd val map[string]*XStreamSliceCmd @@ -776,44 +775,43 @@ func NewXStreamSliceMapCmd(args ...interface{}) *XStreamSliceMapCmd { } } -func (cmd* XStreamSliceMapCmd) Val() map[string] *XStreamSliceCmd { +func (cmd *XStreamSliceMapCmd) Val() map[string]*XStreamSliceCmd { return cmd.val } -func (cmd* XStreamSliceMapCmd) Result() (map[string]*XStreamSliceCmd, error) { -return cmd.val, cmd.err +func (cmd *XStreamSliceMapCmd) Result() (map[string]*XStreamSliceCmd, error) { + return cmd.val, cmd.err } -func (cmd* XStreamSliceMapCmd) String() string { +func (cmd *XStreamSliceMapCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd* XStreamSliceMapCmd) readReply(cn * pool.Conn) error { +func (cmd *XStreamSliceMapCmd) readReply(cn *pool.Conn) error { var v interface{} v, cmd.err = cn.Rd.ReadArrayReply(xStreamSliceMapParse) if cmd.err != nil { return cmd.err } - cmd.val = v.(map[string]* XStreamSliceCmd) + cmd.val = v.(map[string]*XStreamSliceCmd) return nil } - -func xStreamSliceMapParse(rd* proto.Reader, n int64) (interface{}, error) { +func xStreamSliceMapParse(rd *proto.Reader, n int64) (interface{}, error) { ret := map[string][]*XStreamSliceCmd{} - for i:= int64(0); i < n; i++ { + for i := int64(0); i < n; i++ { val, err := rd.ReadArrayReply(xStreamSliceWithKeyParser) if err != nil { return nil, err } - for k, v := range val.(map[string][]*XStreamSliceCmd) { + for k, v := range val.(map[string][]*XStreamSliceCmd) { ret[k] = v } } return ret, nil } -func xStreamSliceWithKeyParser(rd* proto.Reader, n int64) (interface{}, error){ +func xStreamSliceWithKeyParser(rd *proto.Reader, n int64) (interface{}, error) { var key string key, err := rd.ReadStringReply() if err != nil { @@ -823,11 +821,9 @@ func xStreamSliceWithKeyParser(rd* proto.Reader, n int64) (interface{}, error){ if err != nil { return nil, err } - return map[string][]*XStreamSliceCmd{key:v.([]*XStreamSliceCmd)}, nil + return map[string][]*XStreamSliceCmd{key: v.([]*XStreamSliceCmd)}, nil } - - // Implements proto.MultiBulkParse func xStreamSliceParser(rd *proto.Reader, n int64) (interface{}, error) { xx := make([]*XStream, n) diff --git a/commands.go b/commands.go index f11950922..eddf0cd03 100644 --- a/commands.go +++ b/commands.go @@ -182,12 +182,11 @@ type Cmdable interface { XRead(streams ...string) *XStreamSliceCmd XReadN(count int64, streams ...string) *XStreamSliceCmd XReadExt(opt *XReadExt) *XStreamSliceCmd - + XGroupCreate(key string, group, start string) *StatusCmd XReadGroupN(group_name, consumer string, count int64, keys ...string) *XStreamSliceMapCmd XAck(key, group_name string, stream_ids ...string) *IntCmd - - + ZAdd(key string, members ...Z) *IntCmd ZAddNX(key string, members ...Z) *IntCmd ZAddXX(key string, members ...Z) *IntCmd @@ -1427,19 +1426,16 @@ func (c *cmdable) XReadBlock(block time.Duration, streams ...string) *XStreamSli }) } +func (c *cmdable) XGroupCreate(key, group, start string) *StatusCmd { -func(c * cmdable) XGroupCreate(key, group, start string) * StatusCmd { - - args := [] interface{} {"xgroup", "create", key, group, start} + args := []interface{}{"xgroup", "create", key, group, start} cmd := NewStatusCmd(args...) c.process(cmd) return cmd } - - -func (c * cmdable ) XReadGroupN(group, consumer string, count int64, keys ...string) *XStreamSliceMapCmd { - args := [] interface{} {"xreadgroup", "group", group, consumer, "count", count, "streams"} +func (c *cmdable) XReadGroupN(group, consumer string, count int64, keys ...string) *XStreamSliceMapCmd { + args := []interface{}{"xreadgroup", "group", group, consumer, "count", count, "streams"} for _, v := range keys { args = append(args, v) } @@ -1448,8 +1444,8 @@ func (c * cmdable ) XReadGroupN(group, consumer string, count int64, keys ...str return cmd } -func (c * cmdable) XAck(key, group string, streams ...string) *IntCmd { - args := [] interface{} {"xack", key, group} +func (c *cmdable) XAck(key, group string, streams ...string) *IntCmd { + args := []interface{}{"xack", key, group} for _, v := range streams { args = append(args, v) } @@ -1458,7 +1454,6 @@ func (c * cmdable) XAck(key, group string, streams ...string) *IntCmd { return cmd } - //------------------------------------------------------------------------------ // Z represents sorted set member.