forked from ccfos/nightingale
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpush.go
143 lines (118 loc) · 3.66 KB
/
push.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package trans
import (
"bytes"
"fmt"
"sort"
"sync"
"time"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/str"
"github.com/didi/nightingale/v5/backend"
"github.com/didi/nightingale/v5/cache"
"github.com/didi/nightingale/v5/models"
"github.com/didi/nightingale/v5/naming"
"github.com/didi/nightingale/v5/vos"
)
func Push(points []*vos.MetricPoint) error {
if points == nil {
return fmt.Errorf("param(points) is nil")
}
count := len(points)
if count == 0 {
return fmt.Errorf("param(points) is empty")
}
var reterr error
// 把ident->alias做成map,放内存里,后续要周期性与DB中的数据对比,更新resource表
aliasMapper := make(map[string]interface{})
now := time.Now().Unix()
validPoints := make([]*vos.MetricPoint, 0, count)
for i := 0; i < count; i++ {
logger.Debugf("recv %+v", points[i])
// 如果tags中发现有__ident__和__alias__就提到外层,这个逻辑是为了应对snmp之类的场景
if val, has := points[i].TagsMap["__ident__"]; has {
points[i].Ident = val
// 如果后面没有发现__alias__,那alias就给改成空
points[i].Alias = ""
delete(points[i].TagsMap, "__ident__")
}
if val, has := points[i].TagsMap["__alias__"]; has {
points[i].Alias = val
delete(points[i].TagsMap, "__alias__")
}
if err := points[i].Tidy(now); err != nil {
// 如果有部分point校验失败,没关系,把error返回即可,正常的可以继续往下走
logger.Warningf("point %+v is invalid, err:%v ", points[i], err)
reterr = err
} else {
if points[i].Ident != "" {
// 把当前时间也带上,处理的时候只处理最近的数据,避免alias发生变化且数据分散在多个server造成的alias不一致的问题
aliasMapper[points[i].Ident] = &models.AliasTime{Alias: points[i].Alias, Time: now}
}
// 将resource的tag追加到曲线的tag中,根据tagsmap生成tagslst,排序,生成primarykey
enrich(points[i])
validPoints = append(validPoints, points[i])
}
}
models.AliasMapper.MSet(aliasMapper)
// 路由数据,做转发的逻辑可以做成异步,这个过程如果有错,都是系统内部错误,不需要暴露给client侧
go DispatchPoints(validPoints)
return reterr
}
func DispatchPoints(points []*vos.MetricPoint) {
// send to push endpoints
pushEndpoints, err := backend.GetPushEndpoints()
if err != nil {
logger.Errorf("could not find pushendpoint:%v", err)
} else {
for _, pushendpoint := range pushEndpoints {
go pushendpoint.Push2Queue(points)
}
}
// send to judge queue
for i := range points {
node, err := naming.HashRing.GetNode(points[i].PK)
if err != nil {
logger.Errorf("could not find node:%v", err)
continue
}
q, exists := queues.Get(node)
if !exists {
logger.Errorf("could not find queue by %s", node)
continue
}
q.PushFront(points[i])
}
}
var bufferPool = sync.Pool{New: func() interface{} { return new(bytes.Buffer) }}
func enrich(point *vos.MetricPoint) {
// 把res的tags附到point上
resAndTags, exists := cache.ResTags.Get(point.Ident)
if exists {
for k, v := range resAndTags.Tags {
point.TagsMap[k] = v
}
}
var tagsLst []string
// 根据tagsmap生成tagslst,sort
count := len(point.TagsMap)
if count == 0 {
tagsLst = []string{}
} else {
lst := make([]string, 0, count)
for k, v := range point.TagsMap {
lst = append(lst, k+"="+v)
}
sort.Strings(lst)
tagsLst = lst
}
// ident metric tagslst 生成 pk
ret := bufferPool.Get().(*bytes.Buffer)
ret.Reset()
defer bufferPool.Put(ret)
ret.WriteString(point.Ident)
ret.WriteString(point.Metric)
for i := 0; i < len(tagsLst); i++ {
ret.WriteString(tagsLst[i])
}
point.PK = str.MD5(ret.String())
}