Skip to content

Commit

Permalink
DBSize,ScriptLoad,ScriptFlush and ScriptExists should use hook (redis…
Browse files Browse the repository at this point in the history
…#1811)

Signed-off-by: monkey <[email protected]>
  • Loading branch information
monkey92t authored Jul 5, 2021
1 parent dd4b7eb commit c1b63a6
Showing 1 changed file with 56 additions and 46 deletions.
102 changes: 56 additions & 46 deletions cluster_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,55 +8,63 @@ import (

func (c *ClusterClient) DBSize(ctx context.Context) *IntCmd {
cmd := NewIntCmd(ctx, "dbsize")
var size int64
err := c.ForEachMaster(ctx, func(ctx context.Context, master *Client) error {
n, err := master.DBSize(ctx).Result()
_ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error {
var size int64
err := c.ForEachMaster(ctx, func(ctx context.Context, master *Client) error {
n, err := master.DBSize(ctx).Result()
if err != nil {
return err
}
atomic.AddInt64(&size, n)
return nil
})
if err != nil {
return err
cmd.SetErr(err)
} else {
cmd.val = size
}
atomic.AddInt64(&size, n)
return nil
})
if err != nil {
cmd.SetErr(err)
return cmd
}
cmd.val = size
return cmd
}

func (c *ClusterClient) ScriptLoad(ctx context.Context, script string) *StringCmd {
cmd := NewStringCmd(ctx, "script", "load", script)
mu := &sync.Mutex{}
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
val, err := shard.ScriptLoad(ctx, script).Result()
_ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error {
mu := &sync.Mutex{}
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
val, err := shard.ScriptLoad(ctx, script).Result()
if err != nil {
return err
}

mu.Lock()
if cmd.Val() == "" {
cmd.val = val
}
mu.Unlock()

return nil
})
if err != nil {
return err
}

mu.Lock()
if cmd.Val() == "" {
cmd.val = val
cmd.SetErr(err)
}
mu.Unlock()

return nil
})
if err != nil {
cmd.SetErr(err)
}

return cmd
}

func (c *ClusterClient) ScriptFlush(ctx context.Context) *StatusCmd {
cmd := NewStatusCmd(ctx, "script", "flush")
_ = c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
shard.ScriptFlush(ctx)

_ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error {
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
return shard.ScriptFlush(ctx).Err()
})
if err != nil {
cmd.SetErr(err)
}
return nil
})

return cmd
}

Expand All @@ -74,26 +82,28 @@ func (c *ClusterClient) ScriptExists(ctx context.Context, hashes ...string) *Boo
result[i] = true
}

mu := &sync.Mutex{}
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
val, err := shard.ScriptExists(ctx, hashes...).Result()
_ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error {
mu := &sync.Mutex{}
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
val, err := shard.ScriptExists(ctx, hashes...).Result()
if err != nil {
return err
}

mu.Lock()
for i, v := range val {
result[i] = result[i] && v
}
mu.Unlock()

return nil
})
if err != nil {
return err
}

mu.Lock()
for i, v := range val {
result[i] = result[i] && v
cmd.SetErr(err)
} else {
cmd.val = result
}
mu.Unlock()

return nil
})
if err != nil {
cmd.SetErr(err)
}

cmd.val = result

return cmd
}

0 comments on commit c1b63a6

Please sign in to comment.