Skip to content

Commit

Permalink
refactor: ident heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
710leo committed Mar 17, 2023
1 parent 381654d commit e2b5739
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 54 deletions.
12 changes: 7 additions & 5 deletions center/center.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/ccfos/nightingale/v6/alert/astats"
"github.com/ccfos/nightingale/v6/alert/process"
"github.com/ccfos/nightingale/v6/center/cconf"
"github.com/ccfos/nightingale/v6/center/idents"
"github.com/ccfos/nightingale/v6/center/metas"
"github.com/ccfos/nightingale/v6/center/sso"
"github.com/ccfos/nightingale/v6/conf"
"github.com/ccfos/nightingale/v6/memsto"
Expand All @@ -18,6 +18,7 @@ import (
"github.com/ccfos/nightingale/v6/pkg/i18nx"
"github.com/ccfos/nightingale/v6/pkg/logx"
"github.com/ccfos/nightingale/v6/prom"
"github.com/ccfos/nightingale/v6/pushgw/idents"
"github.com/ccfos/nightingale/v6/pushgw/writer"
"github.com/ccfos/nightingale/v6/storage"

Expand Down Expand Up @@ -54,11 +55,12 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
return nil, err
}

idents := idents.New(db, redis)
metas := metas.New(redis)
idents := idents.New(db)

syncStats := memsto.NewSyncStats()
alertStats := astats.NewSyncStats()
// idents := idents.New(db, config.Pushgw.DatasourceId, config.Pushgw.MaxOffset)

sso := sso.Init(config.Center, ctx)

busiGroupCache := memsto.NewBusiGroupCache(ctx, syncStats)
Expand All @@ -76,8 +78,8 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
writers := writer.NewWriters(config.Pushgw)

alertrtRouter := alertrt.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors)
centerRouter := centerrt.New(config.HTTP, config.Center, cconf.Operations, dsCache, notifyConfigCache, promClients, redis, sso, ctx, idents)
pushgwRouter := pushgwrt.New(config.HTTP, config.Pushgw, targetCache, busiGroupCache, writers, ctx)
centerRouter := centerrt.New(config.HTTP, config.Center, cconf.Operations, dsCache, notifyConfigCache, promClients, redis, sso, ctx, metas)
pushgwRouter := pushgwrt.New(config.HTTP, config.Pushgw, targetCache, busiGroupCache, idents, writers, ctx)

r := httpx.GinEngine(config.Global.RunMode, config.HTTP)

Expand Down
45 changes: 6 additions & 39 deletions center/idents/idents.go → center/metas/metas.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package idents
package metas

import (
"context"
Expand All @@ -9,21 +9,17 @@ import (
"github.com/ccfos/nightingale/v6/storage"

"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/slice"
"gorm.io/gorm"
)

type Set struct {
sync.RWMutex
items map[string]models.HostMeta
db *gorm.DB
redis storage.Redis
}

func New(db *gorm.DB, redis storage.Redis) *Set {
func New(redis storage.Redis) *Set {
set := &Set{
items: make(map[string]models.HostMeta),
db: db,
redis: redis,
}

Expand Down Expand Up @@ -74,27 +70,26 @@ func (s *Set) persist() {

func (s *Set) updateMeta(items map[string]models.HostMeta) {
m := make(map[string]models.HostMeta, 100)
now := time.Now().Unix()
num := 0

for _, meta := range items {
m[meta.Hostname] = meta
num++
if num == 100 {
if err := s.updateTargets(m, now); err != nil {
if err := s.updateTargets(m); err != nil {
logger.Errorf("failed to update targets: %v", err)
}
m = make(map[string]models.HostMeta, 100)
num = 0
}
}

if err := s.updateTargets(m, now); err != nil {
if err := s.updateTargets(m); err != nil {
logger.Errorf("failed to update targets: %v", err)
}
}

func (s *Set) updateTargets(m map[string]models.HostMeta, now int64) error {
func (s *Set) updateTargets(m map[string]models.HostMeta) error {
count := int64(len(m))
if count == 0 {
return nil
Expand All @@ -106,33 +101,5 @@ func (s *Set) updateTargets(m map[string]models.HostMeta, now int64) error {
values = append(values, meta)
}
err := s.redis.MSet(context.Background(), values...).Err()
if err != nil {
return err
}
var lst []string
for ident := range m {
lst = append(lst, ident)
}

// there are some idents not found in db, so insert them
var exists []string
err = s.db.Table("target").Where("ident in ?", lst).Pluck("ident", &exists).Error
if err != nil {
return err
}

err = s.db.Table("target").Where("ident in ?", exists).Update("update_at", now).Error
if err != nil {
logger.Error("failed to update target:", exists, "error:", err)
}

news := slice.SubString(lst, exists)
for i := 0; i < len(news); i++ {
err = s.db.Exec("INSERT INTO target(ident, update_at) VALUES(?, ?)", news[i], now).Error
if err != nil {
logger.Error("failed to insert target:", news[i], "error:", err)
}
}

return nil
return err
}
8 changes: 4 additions & 4 deletions center/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

"github.com/ccfos/nightingale/v6/center/cconf"
"github.com/ccfos/nightingale/v6/center/cstats"
"github.com/ccfos/nightingale/v6/center/idents"
"github.com/ccfos/nightingale/v6/center/metas"
"github.com/ccfos/nightingale/v6/center/sso"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/pkg/aop"
Expand All @@ -30,13 +30,13 @@ type Router struct {
NotifyConfigCache *memsto.NotifyConfigCacheType
PromClients *prom.PromClientMap
Redis storage.Redis
IdentSet *idents.Set
MetaSet *metas.Set
Sso *sso.SsoClient
Ctx *ctx.Context
}

func New(httpConfig httpx.Config, center cconf.Center, operations cconf.Operation, ds *memsto.DatasourceCacheType, ncc *memsto.NotifyConfigCacheType,
pc *prom.PromClientMap, redis storage.Redis, sso *sso.SsoClient, ctx *ctx.Context, identSet *idents.Set) *Router {
pc *prom.PromClientMap, redis storage.Redis, sso *sso.SsoClient, ctx *ctx.Context, metaSet *metas.Set) *Router {
return &Router{
HTTP: httpConfig,
Center: center,
Expand All @@ -47,7 +47,7 @@ func New(httpConfig httpx.Config, center cconf.Center, operations cconf.Operatio
Redis: redis,
Sso: sso,
Ctx: ctx,
IdentSet: identSet,
MetaSet: metaSet,
}
}

Expand Down
2 changes: 1 addition & 1 deletion center/router/router_heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ func (rt *Router) heartbeat(c *gin.Context) {
ginx.Dangerous(err)

req.Offset = (time.Now().UnixMilli() - req.UnixTime)
rt.IdentSet.Set(req.Hostname, req)
rt.MetaSet.Set(req.Hostname, req)
ginx.NewRender(c).Message(nil)
}
115 changes: 115 additions & 0 deletions pushgw/idents/idents.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package idents

import (
"sync"
"time"

"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/slice"
"gorm.io/gorm"
)

type Set struct {
sync.Mutex
items map[string]struct{}
db *gorm.DB
}

func New(db *gorm.DB) *Set {
set := &Set{
items: make(map[string]struct{}),
db: db,
}

set.Init()
return set
}

func (s *Set) Init() {
go s.LoopPersist()
}

func (s *Set) MSet(items map[string]struct{}) {
s.Lock()
defer s.Unlock()
for ident := range items {
s.items[ident] = struct{}{}
}
}

func (s *Set) LoopPersist() {
for {
time.Sleep(time.Second)
s.persist()
}
}

func (s *Set) persist() {
var items map[string]struct{}

s.Lock()
if len(s.items) == 0 {
s.Unlock()
return
}

items = s.items
s.items = make(map[string]struct{})
s.Unlock()

s.updateTimestamp(items)
}

func (s *Set) updateTimestamp(items map[string]struct{}) {
lst := make([]string, 0, 100)
now := time.Now().Unix()
num := 0
for ident := range items {
lst = append(lst, ident)
num++
if num == 100 {
if err := s.updateTargets(lst, now); err != nil {
logger.Errorf("failed to update targets: %v", err)
}
lst = lst[:0]
num = 0
}
}

if err := s.updateTargets(lst, now); err != nil {
logger.Errorf("failed to update targets: %v", err)
}
}

func (s *Set) updateTargets(lst []string, now int64) error {
count := int64(len(lst))
if count == 0 {
return nil
}

ret := s.db.Table("target").Where("ident in ?", lst).Update("update_at", now)
if ret.Error != nil {
return ret.Error
}

if ret.RowsAffected == count {
return nil
}

// there are some idents not found in db, so insert them
var exists []string
err := s.db.Table("target").Where("ident in ?", lst).Pluck("ident", &exists).Error
if err != nil {
return err
}

news := slice.SubString(lst, exists)
for i := 0; i < len(news); i++ {
err = s.db.Exec("INSERT INTO target(ident, update_at, cluster) VALUES(?, ?, ?)", news[i], now, "").Error
if err != nil {
logger.Error("failed to insert target:", news[i], "error:", err)
}
}

return nil
}
6 changes: 5 additions & 1 deletion pushgw/pushgw.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/httpx"
"github.com/ccfos/nightingale/v6/pkg/logx"
"github.com/ccfos/nightingale/v6/pushgw/idents"
"github.com/ccfos/nightingale/v6/pushgw/router"
"github.com/ccfos/nightingale/v6/pushgw/writer"
"github.com/ccfos/nightingale/v6/storage"
)

type PushgwProvider struct {
Ident *idents.Set
Router *router.Router
}

Expand All @@ -35,6 +37,8 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
}
ctx := ctx.NewContext(context.Background(), db)

idents := idents.New(db)

stats := memsto.NewSyncStats()

busiGroupCache := memsto.NewBusiGroupCache(ctx, stats)
Expand All @@ -43,7 +47,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
writers := writer.NewWriters(config.Pushgw)

r := httpx.GinEngine(config.Global.RunMode, config.HTTP)
rt := router.New(config.HTTP, config.Pushgw, targetCache, busiGroupCache, writers, ctx)
rt := router.New(config.HTTP, config.Pushgw, targetCache, busiGroupCache, idents, writers, ctx)
rt.Config(r)

httpClean := httpx.Init(config.HTTP, r)
Expand Down
10 changes: 6 additions & 4 deletions pushgw/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/httpx"
"github.com/ccfos/nightingale/v6/pushgw/idents"
"github.com/ccfos/nightingale/v6/pushgw/pconf"
"github.com/ccfos/nightingale/v6/pushgw/writer"
)
Expand All @@ -15,18 +16,19 @@ type Router struct {
Pushgw pconf.Pushgw
TargetCache *memsto.TargetCacheType
BusiGroupCache *memsto.BusiGroupCacheType
// IdentSet *idents.Set
Writers *writer.WritersType
Ctx *ctx.Context
IdentSet *idents.Set
Writers *writer.WritersType
Ctx *ctx.Context
}

func New(httpConfig httpx.Config, pushgw pconf.Pushgw, tc *memsto.TargetCacheType, bg *memsto.BusiGroupCacheType, writers *writer.WritersType, ctx *ctx.Context) *Router {
func New(httpConfig httpx.Config, pushgw pconf.Pushgw, tc *memsto.TargetCacheType, bg *memsto.BusiGroupCacheType, idents *idents.Set, writers *writer.WritersType, ctx *ctx.Context) *Router {
return &Router{
HTTP: httpConfig,
Writers: writers,
Ctx: ctx,
TargetCache: tc,
BusiGroupCache: bg,
IdentSet: idents,
}
}

Expand Down
5 changes: 5 additions & 0 deletions pushgw/router/router_datadog.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ func (r *Router) datadogSeries(c *gin.Context) {
succ int
fail int
msg = "received"
ids = make(map[string]struct{})
)

for i := 0; i < cnt; i++ {
Expand All @@ -245,6 +246,9 @@ func (r *Router) datadogSeries(c *gin.Context) {
}

if ident != "" {
// register host
ids[ident] = struct{}{}

// fill tags
target, has := r.TargetCache.Get(ident)
if has {
Expand All @@ -269,6 +273,7 @@ func (r *Router) datadogSeries(c *gin.Context) {

if succ > 0 {
CounterSampleTotal.WithLabelValues("datadog").Add(float64(succ))
r.IdentSet.MSet(ids)
}

c.JSON(200, gin.H{
Expand Down
Loading

0 comments on commit e2b5739

Please sign in to comment.