forked from didi/falcon-log-agent
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker.go
300 lines (261 loc) · 6.69 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
package worker
import (
"fmt"
"math"
"regexp"
"strconv"
"time"
"github.com/didi/falcon-log-agent/strategy"
"github.com/didi/falcon-log-agent/common/dlog"
"github.com/didi/falcon-log-agent/common/g"
"github.com/didi/falcon-log-agent/common/proc/metric"
"github.com/didi/falcon-log-agent/common/sample_log"
"github.com/didi/falcon-log-agent/common/scheme"
"github.com/didi/falcon-log-agent/common/utils"
)
// Worker to analysis
// 单个worker对象
type Worker struct {
FilePath string
Counter int64
LatestTms int64 //正在处理的单条日志时间
Close chan struct{}
Stream chan string
Mark string //标记该worker信息,方便打log及上报自监控指标, 追查问题
Analyzing bool //标记当前Worker状态是否在分析中,还是空闲状态
}
// WorkerGroup is group of workers
// worker组
type WorkerGroup struct {
WorkerNum int
LatestTms int64 //保留字段
Workers []*Worker
TimeFormatStrategy string
}
// GetOldestTms to get oldest tms
func (wg WorkerGroup) GetOldestTms() (tms int64, allFree bool) {
allFree = true
var analysingOldestTms int64
var freeNewestTms int64
for _, w := range wg.Workers {
if w.LatestTms > freeNewestTms {
freeNewestTms = w.LatestTms
}
if w.LatestTms >= 0 && w.Analyzing == true {
allFree = false
if analysingOldestTms == 0 {
analysingOldestTms = w.LatestTms
} else if analysingOldestTms > w.LatestTms {
analysingOldestTms = w.LatestTms
}
}
}
if allFree {
tms = freeNewestTms
} else {
tms = analysingOldestTms
}
return tms, allFree
}
// NewWorkerGroup to new a worker group
// filepath和stream依赖外部,其他的都自己创建
func NewWorkerGroup(filePath string, stream chan string, st *scheme.Strategy) *WorkerGroup {
wg := &WorkerGroup{
WorkerNum: g.Conf().Worker.WorkerNum,
Workers: make([]*Worker, 0),
}
dlog.Infof("new worker group, [file:%s][worker_num:%d]", filePath, g.Conf().Worker.WorkerNum)
for i := 0; i < wg.WorkerNum; i++ {
mark := fmt.Sprintf("[worker][file:%s][num:%d][id:%d]", filePath, g.Conf().Worker.WorkerNum, i)
w := Worker{}
w.Close = make(chan struct{})
// w.ParentGroup = wg
w.FilePath = filePath
w.Stream = stream
w.Mark = mark
w.Analyzing = false
w.Counter = 0
wg.Workers = append(wg.Workers, &w)
}
return wg
}
// Start to start a workergroup
func (wg *WorkerGroup) Start() {
for _, worker := range wg.Workers {
worker.Start()
}
}
// Stop to stop a workergroup
func (wg *WorkerGroup) Stop() {
for _, worker := range wg.Workers {
worker.Stop()
}
}
// Start to start a worker
func (w *Worker) Start() {
go func() {
w.Work()
}()
}
// Stop to stop a worker
func (w *Worker) Stop() {
close(w.Close)
}
// Work to analysis logs
func (w *Worker) Work() {
defer func() {
if reason := recover(); reason != nil {
dlog.Infof("%s -- worker quit: panic reason: %v", w.Mark, reason)
} else {
dlog.Infof("%s -- worker quit: normally", w.Mark)
}
}()
dlog.Infof("worker starting...[%s]", w.Mark)
var anaCnt, anaSwp int64
analysClose := make(chan int, 0)
go func() {
for {
//休眠10s
select {
case <-analysClose:
return
case <-time.After(time.Second * 10):
}
a := anaCnt
metric.MetricAnalysis(w.FilePath, a-anaSwp)
anaSwp = a
}
}()
for {
select {
case line := <-w.Stream:
w.Analyzing = true
anaCnt = anaCnt + 1
w.analysis(line)
w.Analyzing = false
case <-w.Close:
analysClose <- 0
return
}
}
}
//内部的分析方法
//轮全局的规则列表
//单次遍历
func (w *Worker) analysis(line string) {
defer func() {
if err := recover(); err != nil {
dlog.Infof("%s[analysis panic] : %v", w.Mark, err)
}
}()
sts := strategy.GetAll()
for _, strategy := range sts {
if strategy.FilePath == w.FilePath && strategy.ParseSucc {
analyspoint, err := w.producer(line, strategy)
if err != nil {
log := fmt.Sprintf("%s[producer error][sid:%d] : %v", w.Mark, strategy.ID, err)
sample_log.Error(log)
continue
} else {
if analyspoint != nil {
metric.MetricAnalysisSucc(w.FilePath, 1)
toCounter(analyspoint, w.Mark)
}
}
}
}
}
func (w *Worker) producer(line string, strategy *scheme.Strategy) (*AnalysPoint, error) {
defer func() {
if err := recover(); err != nil {
dlog.Errorf("%s[producer panic] : %v", w.Mark, err)
}
}()
var reg *regexp.Regexp
_, timeFormat := utils.GetPatAndTimeFormat(strategy.TimeFormat)
reg = strategy.TimeReg
t := reg.FindString(line)
if len(t) <= 0 {
return nil, fmt.Errorf("cannot get timestamp:[sname:%s][sid:%d][timeFormat:%v]", strategy.Name, strategy.ID, timeFormat)
}
// 如果没有年,需添加当前年
// 需干掉内部的多于空格, 如Dec 7,有的有一个空格,有的有两个,这里统一替换成一个
if timeFormat == "Jan 2 15:04:05" {
timeFormat = fmt.Sprintf("2006 %s", timeFormat)
t = fmt.Sprintf("%d %s", time.Now().Year(), t)
reg := regexp.MustCompile(`\s+`)
rep := " "
t = reg.ReplaceAllString(t, rep)
}
// [风险]统一使用东八区
loc, err := time.LoadLocation("Asia/Shanghai")
tms, err := time.ParseInLocation(timeFormat, t, loc)
if err != nil {
return nil, err
}
//
w.LatestTms = tms.Unix()
//处理用户正则
var patternReg, excludeReg *regexp.Regexp
var value float64
patternReg = strategy.PatternReg
if patternReg != nil {
v := patternReg.FindStringSubmatch(line)
var vString string
if v != nil && len(v) != 0 {
if len(v) > 1 {
vString = v[1]
} else {
vString = ""
}
value, err = strconv.ParseFloat(vString, 64)
if err != nil {
value = math.NaN()
}
} else {
//外边匹配err之后,要确保返回值不是nil再推送至counter
//正则有表达式,没匹配到,直接返回
return nil, nil
}
} else {
value = math.NaN()
}
//处理exclude
excludeReg = strategy.ExcludeReg
if excludeReg != nil {
v := excludeReg.FindStringSubmatch(line)
if v != nil && len(v) != 0 {
//匹配到exclude了,需要返回
return nil, nil
}
}
//处理tag 正则
tag := map[string]string{}
for tagk, tagv := range strategy.Tags {
var regTag *regexp.Regexp
regTag, ok := strategy.TagRegs[tagk]
if !ok {
dlog.Errorf("%s[get tag reg error][sid:%d][tagk:%s][tagv:%s]", w.Mark, strategy.ID, tagk, tagv)
return nil, nil
}
t := regTag.FindStringSubmatch(line)
if t != nil && len(t) > 1 {
tag[tagk] = t[1]
} else {
return nil, nil
}
}
ret := &AnalysPoint{
StrategyID: strategy.ID,
Value: value,
Tms: tms.Unix(),
Tags: tag,
}
return ret, nil
}
//将解析数据给counter
func toCounter(analyspoint *AnalysPoint, mark string) {
if err := PushToCount(analyspoint); err != nil {
dlog.Errorf("%s push to counter error: %v", mark, err)
}
}