Skip to content

Commit

Permalink
ScriptLoad, ScriptExists and ScriptFlush for ClusterClient
Browse files Browse the repository at this point in the history
  • Loading branch information
skryukov committed Apr 16, 2021
1 parent a47d2c2 commit 5557db4
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 0 deletions.
74 changes: 74 additions & 0 deletions cluster_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package redis

import (
"context"
"sync"
"sync/atomic"
)

Expand All @@ -23,3 +24,76 @@ func (c *ClusterClient) DBSize(ctx context.Context) *IntCmd {
cmd.val = size
return cmd
}

func (c *ClusterClient) ScriptLoad(ctx context.Context, script string) *StringCmd {
cmd := NewStringCmd(ctx, "script", "load", script)
mu := &sync.Mutex{}
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
val, err := shard.ScriptLoad(ctx, script).Result()
if err != nil {
return err
}

mu.Lock()
if cmd.Val() == "" {
cmd.val = val
}
mu.Unlock()

return nil
})
if err != nil {
cmd.SetErr(err)
}

return cmd
}

func (c *ClusterClient) ScriptFlush(ctx context.Context) *StatusCmd {
cmd := NewStatusCmd(ctx, "script", "flush")
_ = c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
shard.ScriptFlush(ctx)

return nil
})

return cmd
}

func (c *ClusterClient) ScriptExists(ctx context.Context, hashes ...string) *BoolSliceCmd {
args := make([]interface{}, 2+len(hashes))
args[0] = "script"
args[1] = "exists"
for i, hash := range hashes {
args[2+i] = hash
}
cmd := NewBoolSliceCmd(ctx, args...)

result := make([]bool, len(hashes))
for i := range result {
result[i] = true
}

mu := &sync.Mutex{}
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
val, err := shard.ScriptExists(ctx, hashes...).Result()
if err != nil {
return err
}

mu.Lock()
for i, v := range val {
result[i] = result[i] && v
}
mu.Unlock()

return nil
})
if err != nil {
cmd.SetErr(err)
}

cmd.val = result

return cmd
}
44 changes: 44 additions & 0 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,50 @@ var _ = Describe("ClusterClient", func() {
})
})

It("distributes scripts when using Script Load", func() {
client.ScriptFlush(ctx)

script := redis.NewScript(`return 'Unique script'`)

script.Load(ctx, client)

client.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
defer GinkgoRecover()

val, _ := script.Exists(ctx, shard).Result()
Expect(val[0]).To(Equal(true))
return nil
})
})

It("checks all shards when using Script Exists", func() {
client.ScriptFlush(ctx)

script := redis.NewScript(`return 'First script'`)
lostScriptSrc := `return 'Lost script'`
lostScript := redis.NewScript(lostScriptSrc)

script.Load(ctx, client)
client.Do(ctx, "script", "load", lostScriptSrc)

val, _ := client.ScriptExists(ctx, script.Hash(), lostScript.Hash()).Result()

Expect(val).To(Equal([]bool{true, false}))
})

It("flushes scripts from all shards when using ScriptFlush", func() {
script := redis.NewScript(`return 'Unnecessary script'`)
script.Load(ctx, client)

val, _ := client.ScriptExists(ctx, script.Hash()).Result()
Expect(val).To(Equal([]bool{true}))

client.ScriptFlush(ctx)

val, _ = client.ScriptExists(ctx, script.Hash()).Result()
Expect(val).To(Equal([]bool{false}))
})

It("supports Watch", func() {
var incr func(string) error

Expand Down

0 comments on commit 5557db4

Please sign in to comment.