Skip to content

Commit

Permalink
Merge pull request #688 from Unknwon/PDR-7006-clean-submeta
Browse files Browse the repository at this point in the history
reader/{tailx,dirx}: clean expired submeta after sync
  • Loading branch information
wonderflow authored Aug 6, 2018
2 parents bd078bf + a804251 commit cdfcc8a
Show file tree
Hide file tree
Showing 11 changed files with 298 additions and 63 deletions.
48 changes: 26 additions & 22 deletions cleaner/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"path/filepath"
"time"

"github.com/qiniu/log"

"github.com/qiniu/logkit/conf"
"github.com/qiniu/logkit/reader"
. "github.com/qiniu/logkit/utils/models"

"github.com/qiniu/log"
)

type Cleaner struct {
Expand All @@ -34,47 +34,52 @@ const (
KeyCleanInterval = "delete_interval"
KeyReserveFileNumber = "reserve_file_number"
KeyReserveFileSize = "reserve_file_size"
clean_name = "cleaner_name"
cleanerName = "cleaner_name"

default_delete_interval = 300 //5分钟
default_reserve_file_number = 10 //默认保存是个文件
default_reserve_file_size = 2048 //单位MB,默认删除保存2G
MB = 1024 * 1024
defaultDeleteInterval = 300 //5分钟
defaultReserveFileNumber = 10 //默认保存是个文件
defaultReserveFileSize = 2048 //单位MB,默认删除保存2G
MB = 1024 * 1024
// 如果两项任意一项达到要求,就执行删除;如果两项容易一项有值设置,但是另一项为0,就认为另一项不做限制
)

// 删除文件时遍历全部
// 删除时生成filedeleted文件
func NewCleaner(conf conf.MapConf, meta *reader.Meta, cleanChan chan<- CleanSignal, logdir string) (c *Cleaner, err error) {
func NewCleaner(conf conf.MapConf, meta *reader.Meta, cleanChan chan<- CleanSignal, logdir string) (*Cleaner, error) {
enable, _ := conf.GetBoolOr(KeyCleanEnable, false)
if !enable {
return
return nil, nil
}
mode := meta.GetMode()
if mode != reader.ModeDir && mode != reader.ModeFile && mode != reader.ModeCloudTrail && mode != reader.ModeTailx {
log.Errorf("cleaner only support reader mode dir|file|clocktrail|tailx, now mode is %v, cleaner disabled", meta.GetMode())
return
if mode != reader.ModeDir &&
mode != reader.ModeFile &&
mode != reader.ModeCloudTrail &&
mode != reader.ModeTailx &&
mode != reader.ModeDirx {
log.Errorf("Cleaner only supports reader mode dir|file|cloudtrail|tailx|dirx, current mode is %v, cleaner disabled", meta.GetMode())
return nil, nil
}
interval, _ := conf.GetIntOr(KeyCleanInterval, 0) //单位,秒
if interval <= 0 {
interval = default_delete_interval
interval = defaultDeleteInterval
}
name, _ := conf.GetStringOr(clean_name, "unknow")
name, _ := conf.GetStringOr(cleanerName, "unknown")
reserveNumber, _ := conf.GetInt64Or(KeyReserveFileNumber, 0)
reserveSize, _ := conf.GetInt64Or(KeyReserveFileSize, 0)
if reserveNumber <= 0 && reserveSize <= 0 {
reserveNumber = default_reserve_file_number
reserveSize = default_reserve_file_size
reserveNumber = defaultReserveFileNumber
reserveSize = defaultReserveFileSize
}
reserveSize = reserveSize * MB
if mode != reader.ModeTailx {
if mode != reader.ModeTailx && mode != reader.ModeDirx {
var err error
logdir, _, err = GetRealPath(logdir)
if err != nil {
log.Errorf("GetRealPath for %v error %v", logdir, err)
return
log.Errorf("Failed to get real path of %q: %v", logdir, err)
return nil, err
}
}
c = &Cleaner{
return &Cleaner{
cleanTicker: time.NewTicker(time.Duration(interval) * time.Second).C,
reserveNumber: reserveNumber,
reserveSize: reserveSize,
Expand All @@ -83,8 +88,7 @@ func NewCleaner(conf conf.MapConf, meta *reader.Meta, cleanChan chan<- CleanSign
cleanChan: cleanChan,
name: name,
logdir: logdir,
}
return
}, nil
}

func (c *Cleaner) Run() {
Expand Down
29 changes: 11 additions & 18 deletions mgr/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1808,19 +1808,16 @@ func TestTailxCleaner(t *testing.T) {
log.Fatalf("TestTailxCleaner error mkdir %v %v", dir, err)
}
defer os.RemoveAll(dir)
defer os.RemoveAll(metaDir)

dira := filepath.Join(dir, "a")
os.MkdirAll(dira, DefaultDirPerm)
logPatha := filepath.Join(dira, "a.log")
err = ioutil.WriteFile(logPatha, []byte("a\n"), 0666)
assert.NoError(t, err)
assert.NoError(t, ioutil.WriteFile(logPatha, []byte("a\n"), 0666))

dirb := filepath.Join(dir, "b")
os.MkdirAll(dirb, DefaultDirPerm)
logPathb := filepath.Join(dirb, "b.log")
err = ioutil.WriteFile(logPathb, []byte("b\n"), 0666)
assert.NoError(t, err)
assert.NoError(t, ioutil.WriteFile(logPathb, []byte("b\n"), 0666))

readfile := filepath.Join(dir, "*", "*.log")
config := `
Expand Down Expand Up @@ -1855,8 +1852,7 @@ func TestTailxCleaner(t *testing.T) {
}`

rc := RunnerConfig{}
err = jsoniter.Unmarshal([]byte(config), &rc)
assert.NoError(t, err)
assert.NoError(t, jsoniter.Unmarshal([]byte(config), &rc))
cleanChan := make(chan cleaner.CleanSignal)
rr, err := NewLogExportRunner(rc, cleanChan, reader.NewRegistry(), parser.NewRegistry(), sender.NewRegistry())
assert.NoError(t, err)
Expand All @@ -1866,34 +1862,31 @@ func TestTailxCleaner(t *testing.T) {
time.Sleep(2 * time.Second)

logPatha1 := filepath.Join(dira, "a.log.1")
err = os.Rename(logPatha, logPatha1)
assert.NoError(t, err)
assert.NoError(t, os.Rename(logPatha, logPatha1))

err = ioutil.WriteFile(logPatha, []byte("bbbb\n"), 0666)
assert.NoError(t, err)
assert.NoError(t, ioutil.WriteFile(logPatha, []byte("bbbb\n"), 0666))

time.Sleep(5 * time.Second)

logPatha2 := filepath.Join(dira, "a.log.2")
err = os.Rename(logPatha, logPatha2)
assert.NoError(t, err)
assert.NoError(t, os.Rename(logPatha, logPatha2))

err = ioutil.WriteFile(logPatha, []byte("cccc\n"), 0666)
assert.NoError(t, err)
assert.NoError(t, ioutil.WriteFile(logPatha, []byte("cccc\n"), 0666))

time.Sleep(2 * time.Second)

assert.NotNil(t, rr.Cleaner())
var ret, dft int

var ret, dft int
DONE:
for {
select {
case sig := <-cleanChan:
ret++
assert.Equal(t, "a.log.1", sig.Filename)
err = os.Remove(filepath.Join(sig.Logdir, sig.Filename))
assert.NoError(t, err)
assert.NoError(t, os.Remove(filepath.Join(sig.Logdir, sig.Filename)))
assert.Equal(t, reader.ModeTailx, sig.ReadMode)
break DONE
default:
dft++
}
Expand Down
6 changes: 5 additions & 1 deletion reader/dirx/dir_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,12 @@ func HasDirExpired(dir string, expire time.Duration) bool {
var latestModTime time.Time
if err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
log.Errorf("Failed to walk directory file[%v]: %v", path, err)
log.Errorf("Failed to get directory entry[%v] info: %v", path, err)
return nil
} else if dir == path {
return nil
} else if info.IsDir() {
return filepath.SkipDir // 过滤子目录
}

if info.ModTime().After(latestModTime) {
Expand Down
33 changes: 33 additions & 0 deletions reader/dirx/dirx.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type Reader struct {
// 以下为传入参数
logPathPattern string
statInterval time.Duration
expire time.Duration
submetaExpire time.Duration
maxOpenFiles int
ignoreHidden bool
skipFirstLine bool
Expand Down Expand Up @@ -83,6 +85,14 @@ func NewReader(meta *reader.Meta, conf conf.MapConf) (reader.Reader, error) {
return nil, err
}

submetaExpireDur, _ := conf.GetStringOr(reader.KeySubmetaExpire, "720h")
submetaExpire, err := time.ParseDuration(submetaExpireDur)
if err != nil {
return nil, err
} else if submetaExpire < expire {
return nil, fmt.Errorf("%q valus is less than %q", reader.KeySubmetaExpire, reader.KeyExpire)
}

maxOpenFiles, _ := conf.GetIntOr(reader.KeyMaxOpenFiles, 256)
ignoreHidden, _ := conf.GetBoolOr(reader.KeyIgnoreHiddenFile, true) // 默认不读取隐藏文件
skipFirstLine, _ := conf.GetBoolOr(reader.KeySkipFileFirstLine, false)
Expand Down Expand Up @@ -128,6 +138,8 @@ func NewReader(meta *reader.Meta, conf conf.MapConf) (reader.Reader, error) {
dirReaders: newDirReaders(meta, expire, cachedLines),
logPathPattern: strings.TrimSuffix(logPathPattern, "/"),
statInterval: statInterval,
expire: expire,
submetaExpire: submetaExpire,
maxOpenFiles: maxOpenFiles,
ignoreHidden: ignoreHidden,
skipFirstLine: skipFirstLine,
Expand Down Expand Up @@ -291,6 +303,23 @@ func (r *Reader) Start() error {
}
}
}()

if r.expire.Nanoseconds() > 0 {
go func() {
ticker := time.NewTicker(time.Hour)
defer ticker.Stop()
for {
r.meta.CheckExpiredSubMetas(r.submetaExpire)

select {
case <-r.stopChan:
return
case <-ticker.C:
}
}
}()
}

log.Infof("Runner[%v] %q daemon has started", r.meta.RunnerName, r.Name())
return nil
}
Expand Down Expand Up @@ -341,6 +370,10 @@ func (r *Reader) SyncMeta() {
log.Errorf("Runner[%v] write meta[%v] failed: %v", r.meta.RunnerName, string(data), err)
return
}

if r.expire.Nanoseconds() > 0 {
r.meta.CleanExpiredSubMetas(r.submetaExpire)
}
}

func (r *Reader) Close() error {
Expand Down
26 changes: 22 additions & 4 deletions reader/dirx/dirx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ func multiReaderOneLineTest(t *testing.T) {
assert.NoError(t, dr.Start())
t.Log("Reader has started")

assert.Equal(t, 5*time.Second, dr.expire)
assert.Equal(t, 720*time.Hour, dr.submetaExpire)

maxNum := 0
emptyNum := 0
for {
Expand All @@ -121,7 +124,7 @@ func multiReaderOneLineTest(t *testing.T) {
// 确保上个 reader 已过期,新的 reader 已经探测到并创建成功
createDirWithName(dir2)
createFileWithContent(dir2file1, "hahaha\nhahaha\nhahaha\n")
time.Sleep(10 * time.Second)
time.Sleep(6 * time.Second)
assert.Equal(t, 1, dr.dirReaders.Num(), "Number of readers")
assert.Equal(t, "TestMultiReaderOneLine/logs/xyz", dr.dirReaders.getReaders()[0].originalPath)

Expand Down Expand Up @@ -190,6 +193,9 @@ func multiReaderMultiLineTest(t *testing.T) {
assert.NoError(t, dr.Start())
t.Log("Reader has started")

assert.Equal(t, 5*time.Second, dr.expire)
assert.Equal(t, 720*time.Hour, dr.submetaExpire)

maxNum := 0
emptyNum := 0
for {
Expand All @@ -213,7 +219,7 @@ func multiReaderMultiLineTest(t *testing.T) {
// 确保上个 reader 已过期,新的 reader 已经探测到并创建成功
createDirWithName(dir2)
createFileWithContent(dir2file1, "abc\nx\nabc\nx\nabc\nx\n")
time.Sleep(10 * time.Second)
time.Sleep(6 * time.Second)
assert.Equal(t, 1, dr.dirReaders.Num(), "Number of readers")
assert.Equal(t, "TestMultiReaderMultiLine/logs/xyz", dr.dirReaders.getReaders()[0].originalPath)

Expand Down Expand Up @@ -290,6 +296,9 @@ func multiReaderSyncMetaOneLineTest(t *testing.T) {
assert.NoError(t, dr.Start())
t.Log("Reader has started")

assert.Equal(t, 10*time.Second, dr.expire)
assert.Equal(t, 720*time.Hour, dr.submetaExpire)

maxNum := 0
emptyNum := 0
for {
Expand Down Expand Up @@ -341,7 +350,7 @@ func multiReaderSyncMetaOneLineTest(t *testing.T) {
// 确保上个 reader 已过期,新的 reader 已经探测到并创建成功
createDirWithName(dir2)
createFileWithContent(dir2file1, "ab1\nab2\nab3\n")
time.Sleep(15 * time.Second)
time.Sleep(11 * time.Second)
assert.Equal(t, 1, dr.dirReaders.Num(), "Number of readers")
assert.Equal(t, "TestMultiReaderSyncMetaOneLine/logs/xyz", dr.dirReaders.getReaders()[0].originalPath)

Expand Down Expand Up @@ -417,6 +426,9 @@ func multiReaderSyncMetaMutilineTest(t *testing.T) {
assert.NoError(t, dr.Start())
t.Log("Reader has started")

assert.Equal(t, 10*time.Second, dr.expire)
assert.Equal(t, 720*time.Hour, dr.submetaExpire)

maxNum := 0
emptyNum := 0
for {
Expand Down Expand Up @@ -469,7 +481,7 @@ func multiReaderSyncMetaMutilineTest(t *testing.T) {
// 确保上个 reader 已过期,新的 reader 已经探测到并创建成功
createDirWithName(dir2)
createFileWithContent(dir2file1, "abc\nx\nabc\ny\nabc\nz\n")
time.Sleep(15 * time.Second)
time.Sleep(11 * time.Second)
assert.Equal(t, 1, dr.dirReaders.Num(), "Number of readers")

t.Log("Reader has started to read two")
Expand Down Expand Up @@ -541,6 +553,9 @@ func TestMultiReaderReset(t *testing.T) {
assert.NoError(t, dr.Start())
t.Log("Reader has started")

assert.Equal(t, 0*time.Second, dr.expire)
assert.Equal(t, 720*time.Hour, dr.submetaExpire)

maxNum := 0
emptyNum := 0
for {
Expand Down Expand Up @@ -638,6 +653,9 @@ func TestReaderErrBegin(t *testing.T) {
assert.NoError(t, dr.Start())
t.Log("Reader has started")

assert.Equal(t, 0*time.Second, dr.expire)
assert.Equal(t, 720*time.Hour, dr.submetaExpire)

maxNum := 0
for {
_, err = dr.ReadLine()
Expand Down
Loading

0 comments on commit cdfcc8a

Please sign in to comment.