-
Notifications
You must be signed in to change notification settings - Fork 256
/
Copy pathcleaner.go
242 lines (222 loc) · 6.04 KB
/
cleaner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
package cleaner
import (
"path/filepath"
"runtime/debug"
"sync/atomic"
"time"
"github.com/bmatcuk/doublestar"
"github.com/qiniu/log"
"github.com/qiniu/logkit/conf"
"github.com/qiniu/logkit/reader"
"github.com/qiniu/logkit/reader/config"
. "github.com/qiniu/logkit/utils/models"
)
type Cleaner struct {
cleanTicker <-chan time.Time
reserveNumber int64 //个
reserveSize int64 //byte
meta *reader.Meta
exitChan chan struct{}
cleanChan chan<- CleanSignal
name string
logdir string
status int32
}
type CleanSignal struct {
Logdir string
Filename string
Cleaner string
ReadMode string
}
const (
KeyCleanEnable = "delete_enable"
KeyCleanInterval = "delete_interval"
KeyReserveFileNumber = "reserve_file_number"
KeyReserveFileSize = "reserve_file_size"
cleanerName = "cleaner_name"
defaultDeleteInterval = 300 //5分钟
defaultReserveFileNumber = 10 //默认保存是个文件
defaultReserveFileSize = 2048 //单位MB,默认删除保存2G
// 如果两项任意一项达到要求,就执行删除;如果两项容易一项有值设置,但是另一项为0,就认为另一项不做限制
)
// 删除文件时遍历全部
// 删除时生成filedeleted文件
func NewCleaner(conf conf.MapConf, meta *reader.Meta, cleanChan chan<- CleanSignal, logdir string) (*Cleaner, error) {
enable, _ := conf.GetBoolOr(KeyCleanEnable, false)
if !enable {
return nil, nil
}
mode := meta.GetMode()
if mode != config.ModeDir &&
mode != config.ModeFile &&
mode != config.ModeCloudTrail &&
mode != config.ModeCloudTrailV2 &&
mode != config.ModeTailx &&
mode != config.ModeDirx {
log.Errorf("Cleaner only supports reader mode dir|file|cloudtrail|tailx|dirx, current mode is %v, cleaner disabled", meta.GetMode())
return nil, nil
}
interval, _ := conf.GetIntOr(KeyCleanInterval, 0) //单位,秒
if interval <= 0 {
interval = defaultDeleteInterval
}
name, _ := conf.GetStringOr(cleanerName, "unknown")
reserveNumber, _ := conf.GetInt64Or(KeyReserveFileNumber, 0)
reserveSize, _ := conf.GetInt64Or(KeyReserveFileSize, 0)
if reserveNumber <= 0 && reserveSize <= 0 {
reserveNumber = defaultReserveFileNumber
reserveSize = defaultReserveFileSize
}
reserveSize = reserveSize * MB
if mode != config.ModeTailx && mode != config.ModeDirx {
var err error
logdir, _, err = GetRealPath(logdir)
if err != nil {
log.Errorf("Failed to get real path of %q: %v", logdir, err)
return nil, err
}
}
return &Cleaner{
cleanTicker: time.NewTicker(time.Duration(interval) * time.Second).C,
reserveNumber: reserveNumber,
reserveSize: reserveSize,
meta: meta,
exitChan: make(chan struct{}),
cleanChan: cleanChan,
name: name,
logdir: logdir,
status: config.StatusInit,
}, nil
}
func (c *Cleaner) Run() {
if !atomic.CompareAndSwapInt32(&c.status, config.StatusInit, config.StatusRunning) {
if c.hasStopped() {
log.Warnf("cleaner[%v] has stopped, run operation ignored", c.name)
} else {
log.Warnf("cleaner[%v] has already running, run operation ignored", c.name)
}
return
}
for {
select {
case <-c.exitChan:
log.Warnf("%v receive exit signal, cleaner exiting...", c.name)
return
case <-c.cleanTicker:
}
err := c.Clean()
if err != nil {
log.Error(err)
}
}
}
func (c *Cleaner) Close() {
if !atomic.CompareAndSwapInt32(&c.status, config.StatusRunning, config.StatusStopped) {
log.Warnf("cleaner[%v] is not running, close operation ignored", c.name)
return
}
c.exitChan <- struct{}{}
}
func (c *Cleaner) hasStopped() bool {
return atomic.LoadInt32(&c.status) == config.StatusStopped
}
func (c *Cleaner) Name() string {
return c.name
}
func (c *Cleaner) shouldClean(size, count int64) bool {
if c.reserveNumber > 0 && count > c.reserveNumber {
return true
}
if c.reserveSize > 0 && size > c.reserveSize {
return true
}
return false
}
func (c *Cleaner) checkBelong(path string) bool {
dir := filepath.Dir(path)
dir, _, err := GetRealPath(dir)
if err != nil {
log.Errorf("GetRealPath for %v error %v", path, err)
return false
}
switch c.meta.GetMode() {
case config.ModeTailx:
matched, err := filepath.Match(filepath.Dir(c.logdir), filepath.Dir(path))
if err != nil {
log.Errorf("Failed to check if %q belongs to %q: %v", path, c.logdir, err)
return false
}
return matched
case config.ModeDirx:
matched, err := doublestar.Match(c.logdir, filepath.Dir(path))
if err != nil {
log.Errorf("Failed to check if %q belongs to %q: %v", path, c.logdir, err)
return false
}
return matched
}
if dir != c.logdir {
return false
}
return true
}
func (c *Cleaner) Clean() (err error) {
defer func() {
if rec := recover(); rec != nil {
log.Errorf("cleaner %q was panicked and recovered from %v\nstack: %s", c.Name(), rec, debug.Stack())
}
}()
if c.hasStopped() {
log.Warnf("cleaner[%v] reader %s has stopped, skip clean operation", c.name)
return
}
var size int64 = 0
var count int64 = 0
beginClean := false
doneFiles, err := c.meta.GetDoneFiles()
if err != nil {
return err
}
checked := make(map[string]struct{})
for _, f := range doneFiles {
logFiles := GetLogFiles(f.Path)
allremoved := true
for _, logf := range logFiles {
if !c.checkBelong(logf.Path) {
continue
}
if _, ok := checked[logf.Path]; ok {
continue
}
checked[logf.Path] = struct{}{}
size += logf.Info.Size()
count++
// 一旦符合条件,更老的文件必然都要删除
if beginClean || c.shouldClean(size, count) {
beginClean = true
sig := CleanSignal{
Logdir: filepath.Dir(logf.Path),
Filename: logf.Info.Name(),
Cleaner: c.name,
ReadMode: c.meta.GetMode(),
}
log.Infof("send clean signal %v", sig)
c.cleanChan <- sig
if err = c.meta.AppendDeleteFile(logf.Path); err != nil {
log.Error(err)
}
} else {
allremoved = false
}
}
if allremoved {
if err = c.meta.DeleteDoneFile(f.Path); err != nil {
log.Error(err)
}
}
}
return nil
}
func (c *Cleaner) LogDir() string {
return c.logdir
}