Skip to content

Commit

Permalink
update redis support
Browse files Browse the repository at this point in the history
  • Loading branch information
callmefisher committed Jul 31, 2018
1 parent 9f8750e commit 146e555
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 29 deletions.
28 changes: 12 additions & 16 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"strings"
"time"

"github.com/go-redis/redis/internal"
"github.com/go-redis/redis/internal"
"github.com/go-redis/redis/internal/pool"
"github.com/go-redis/redis/internal/proto"
"github.com/go-redis/redis/internal/util"
Expand Down Expand Up @@ -762,7 +762,6 @@ func (cmd *XStreamSliceCmd) readReply(cn *pool.Conn) error {
return nil
}


type XStreamSliceMapCmd struct {
baseCmd
val map[string]*XStreamSliceCmd
Expand All @@ -776,44 +775,43 @@ func NewXStreamSliceMapCmd(args ...interface{}) *XStreamSliceMapCmd {
}
}

func (cmd* XStreamSliceMapCmd) Val() map[string] *XStreamSliceCmd {
func (cmd *XStreamSliceMapCmd) Val() map[string]*XStreamSliceCmd {
return cmd.val
}

func (cmd* XStreamSliceMapCmd) Result() (map[string]*XStreamSliceCmd, error) {
return cmd.val, cmd.err
func (cmd *XStreamSliceMapCmd) Result() (map[string]*XStreamSliceCmd, error) {
return cmd.val, cmd.err
}

func (cmd* XStreamSliceMapCmd) String() string {
func (cmd *XStreamSliceMapCmd) String() string {
return cmdString(cmd, cmd.val)
}

func (cmd* XStreamSliceMapCmd) readReply(cn * pool.Conn) error {
func (cmd *XStreamSliceMapCmd) readReply(cn *pool.Conn) error {
var v interface{}
v, cmd.err = cn.Rd.ReadArrayReply(xStreamSliceMapParse)
if cmd.err != nil {
return cmd.err
}
cmd.val = v.(map[string]* XStreamSliceCmd)
cmd.val = v.(map[string]*XStreamSliceCmd)
return nil
}


func xStreamSliceMapParse(rd* proto.Reader, n int64) (interface{}, error) {
func xStreamSliceMapParse(rd *proto.Reader, n int64) (interface{}, error) {
ret := map[string][]*XStreamSliceCmd{}
for i:= int64(0); i < n; i++ {
for i := int64(0); i < n; i++ {
val, err := rd.ReadArrayReply(xStreamSliceWithKeyParser)
if err != nil {
return nil, err
}
for k, v := range val.(map[string][]*XStreamSliceCmd) {
for k, v := range val.(map[string][]*XStreamSliceCmd) {
ret[k] = v
}
}
return ret, nil
}

func xStreamSliceWithKeyParser(rd* proto.Reader, n int64) (interface{}, error){
func xStreamSliceWithKeyParser(rd *proto.Reader, n int64) (interface{}, error) {
var key string
key, err := rd.ReadStringReply()
if err != nil {
Expand All @@ -823,11 +821,9 @@ func xStreamSliceWithKeyParser(rd* proto.Reader, n int64) (interface{}, error){
if err != nil {
return nil, err
}
return map[string][]*XStreamSliceCmd{key:v.([]*XStreamSliceCmd)}, nil
return map[string][]*XStreamSliceCmd{key: v.([]*XStreamSliceCmd)}, nil
}



// Implements proto.MultiBulkParse
func xStreamSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
xx := make([]*XStream, n)
Expand Down
21 changes: 8 additions & 13 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,11 @@ type Cmdable interface {
XRead(streams ...string) *XStreamSliceCmd
XReadN(count int64, streams ...string) *XStreamSliceCmd
XReadExt(opt *XReadExt) *XStreamSliceCmd

XGroupCreate(key string, group, start string) *StatusCmd
XReadGroupN(group_name, consumer string, count int64, keys ...string) *XStreamSliceMapCmd
XAck(key, group_name string, stream_ids ...string) *IntCmd



ZAdd(key string, members ...Z) *IntCmd
ZAddNX(key string, members ...Z) *IntCmd
ZAddXX(key string, members ...Z) *IntCmd
Expand Down Expand Up @@ -1427,19 +1426,16 @@ func (c *cmdable) XReadBlock(block time.Duration, streams ...string) *XStreamSli
})
}

func (c *cmdable) XGroupCreate(key, group, start string) *StatusCmd {

func(c * cmdable) XGroupCreate(key, group, start string) * StatusCmd {

args := [] interface{} {"xgroup", "create", key, group, start}
args := []interface{}{"xgroup", "create", key, group, start}
cmd := NewStatusCmd(args...)
c.process(cmd)
return cmd
}



func (c * cmdable ) XReadGroupN(group, consumer string, count int64, keys ...string) *XStreamSliceMapCmd {
args := [] interface{} {"xreadgroup", "group", group, consumer, "count", count, "streams"}
func (c *cmdable) XReadGroupN(group, consumer string, count int64, keys ...string) *XStreamSliceMapCmd {
args := []interface{}{"xreadgroup", "group", group, consumer, "count", count, "streams"}
for _, v := range keys {
args = append(args, v)
}
Expand All @@ -1448,8 +1444,8 @@ func (c * cmdable ) XReadGroupN(group, consumer string, count int64, keys ...str
return cmd
}

func (c * cmdable) XAck(key, group string, streams ...string) *IntCmd {
args := [] interface{} {"xack", key, group}
func (c *cmdable) XAck(key, group string, streams ...string) *IntCmd {
args := []interface{}{"xack", key, group}
for _, v := range streams {
args = append(args, v)
}
Expand All @@ -1458,7 +1454,6 @@ func (c * cmdable) XAck(key, group string, streams ...string) *IntCmd {
return cmd
}


//------------------------------------------------------------------------------

// Z represents sorted set member.
Expand Down

0 comments on commit 146e555

Please sign in to comment.