Skip to content

Commit

Permalink
refactor: redis mset and mget (ccfos#1446)
Browse files Browse the repository at this point in the history
* refactor redis mset
  • Loading branch information
710leo authored Mar 28, 2023
1 parent 64e1085 commit a2a0b41
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 13 deletions.
7 changes: 3 additions & 4 deletions center/metas/metas.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,10 @@ func (s *Set) updateTargets(m map[string]models.HostMeta) error {
return nil
}

var values []interface{}
newMap := make(map[string]interface{}, count)
for ident, meta := range m {
values = append(values, models.WrapIdent(ident))
values = append(values, meta)
newMap[models.WrapIdent(ident)] = meta
}
err := s.redis.MSet(context.Background(), values...).Err()
err := storage.MSet(context.Background(), s.redis, newMap)
return err
}
5 changes: 3 additions & 2 deletions center/router/router_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/storage"

"github.com/gin-gonic/gin"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -64,13 +65,13 @@ func (rt *Router) targetGets(c *gin.Context) {

if len(keys) > 0 {
metaMap := make(map[string]*models.HostMeta)
vals := rt.Redis.MGet(context.Background(), keys...).Val()
vals, _ := storage.MGet(context.Background(), rt.Redis, keys)
for _, value := range vals {
var meta models.HostMeta
if value == nil {
continue
}
err := json.Unmarshal([]byte(value.(string)), &meta)
err := json.Unmarshal(value, &meta)
if err != nil {
logger.Warningf("unmarshal %v host meta failed: %v", value, err)
continue
Expand Down
18 changes: 14 additions & 4 deletions memsto/target_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,19 @@ func (tc *TargetCacheType) GetHostMetas(targets []*models.Target) map[string]*mo
keys = append(keys, models.WrapIdent(targets[i].Ident))
num++
if num == 100 {
vals := tc.redis.MGet(context.Background(), keys...).Val()
vals, err := storage.MGet(context.Background(), tc.redis, keys)
if err != nil {
logger.Warningf("keys:%v get host meta err:%v", keys, err)
continue
}

for _, value := range vals {
var meta models.HostMeta
if value == nil {
continue
}
err := json.Unmarshal([]byte(value.(string)), &meta)

err := json.Unmarshal(value, &meta)
if err != nil {
logger.Errorf("failed to unmarshal host meta: %s value:%v", err, value)
continue
Expand All @@ -196,13 +202,17 @@ func (tc *TargetCacheType) GetHostMetas(targets []*models.Target) map[string]*mo
}
}

vals := tc.redis.MGet(context.Background(), keys...).Val()
vals, err := storage.MGet(context.Background(), tc.redis, keys)
if err != nil {
logger.Warningf("keys:%v get host meta err:%v", keys, err)
}
for _, value := range vals {
var meta models.HostMeta
if value == nil {
continue
}
err := json.Unmarshal([]byte(value.(string)), &meta)

err := json.Unmarshal(value, &meta)
if err != nil {
continue
}
Expand Down
39 changes: 36 additions & 3 deletions storage/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"time"

"github.com/ccfos/nightingale/v6/pkg/tlsx"

"github.com/redis/go-redis/v9"
"github.com/toolkits/pkg/logger"
)

type RedisConfig struct {
Expand All @@ -26,11 +26,10 @@ type RedisConfig struct {
}

type Redis interface {
Pipeline() redis.Pipeliner
Del(ctx context.Context, keys ...string) *redis.IntCmd
Get(ctx context.Context, key string) *redis.StringCmd
Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd
MSet(ctx context.Context, values ...interface{}) *redis.StatusCmd
MGet(ctx context.Context, keys ...string) *redis.SliceCmd
HGetAll(ctx context.Context, key string) *redis.MapStringStringCmd
HSet(ctx context.Context, key string, values ...interface{}) *redis.IntCmd
HDel(ctx context.Context, key string, fields ...string) *redis.IntCmd
Expand Down Expand Up @@ -113,3 +112,37 @@ func NewRedis(cfg RedisConfig) (Redis, error) {
}
return redisClient, nil
}

func MGet(ctx context.Context, r Redis, keys []string) ([][]byte, error) {
var vals [][]byte
pipe := r.Pipeline()
for _, key := range keys {
pipe.Get(ctx, key)
}
cmds, err := pipe.Exec(ctx)
if err != nil {
logger.Errorf("failed to exec pipeline: %s", err)
return vals, err
}

for i, key := range keys {
cmd := cmds[i]
if cmd.Err() != nil {
logger.Errorf("failed to get key: %s, err: %s", key, cmd.Err())
continue
}
val := []byte(cmd.(*redis.StringCmd).Val())
vals = append(vals, val)
}

return vals, err
}

func MSet(ctx context.Context, r Redis, m map[string]interface{}) error {
pipe := r.Pipeline()
for k, v := range m {
pipe.Set(ctx, k, v, 0)
}
_, err := pipe.Exec(ctx)
return err
}

0 comments on commit a2a0b41

Please sign in to comment.