Skip to content

Commit

Permalink
修复dir会重复读取的问题
Browse files Browse the repository at this point in the history
  • Loading branch information
PapaPiya committed Apr 1, 2021
1 parent d60f1b5 commit 93c2c43
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 114 deletions.
4 changes: 2 additions & 2 deletions reader/bufreader/bufreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (b *BufReader) readBytes(delim []byte) ([]byte, error) {
}
nolog = true
lineLen = 0
full = make([][]byte, 0, len(full)) // 重新申请空间
full = make([][]byte, 0, len(full)) // 重新申请空间
continue
}
buf := make([]byte, fragLen)
Expand Down Expand Up @@ -646,7 +646,7 @@ func NewFileDirReader(meta *reader.Meta, conf conf.MapConf) (reader reader.Reade
newfileNewLine, _ := conf.GetBoolOr(KeyNewFileNewLine, false)
skipFirstLine, _ := conf.GetBoolOr(KeySkipFileFirstLine, false)
readSameInode, _ := conf.GetBoolOr(KeyReadSameInode, false)
fr, err := seqfile.NewSeqFile(meta, logpath, ignoreHidden, newfileNewLine, ignoreFileSuffix, validFilesRegex, whence, nil, inodeSensitive)
fr, err := seqfile.NewSeqFile(meta, logpath, ignoreHidden, newfileNewLine, ignoreFileSuffix, validFilesRegex, whence, inodeSensitive)
if err != nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion reader/cloudtrail/cloudtrail.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func NewReader(meta *reader.Meta, conf conf.MapConf) (reader.Reader, error) {
validFilePattern, _ := conf.GetStringOr(KeyValidFilePattern, "*")
bufSize, _ := conf.GetIntOr(KeyBufSize, bufreader.DefaultBufSize)
skipFirstLine, _ := conf.GetBoolOr(KeySkipFileFirstLine, false)
sf, err := seqfile.NewSeqFile(meta, opts.directory, true, true, ignoredSuffixes, validFilePattern, WhenceOldest, nil, true)
sf, err := seqfile.NewSeqFile(meta, opts.directory, true, true, ignoredSuffixes, validFilePattern, WhenceOldest, true)
if err != nil {
return nil, err
}
Expand Down
4 changes: 1 addition & 3 deletions reader/dirx/dir_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,6 @@ type newReaderOptions struct {
Whence string
BufferSize int

expireMap map[string]int64

MsgChan chan<- message
ErrChan chan<- error

Expand Down Expand Up @@ -330,7 +328,7 @@ func (drs *dirReaders) NewReader(opts newReaderOptions, notFirstTime bool, maxLi
return nil, fmt.Errorf("new extract reader: %v", err)
}
} else {
fr, err := seqfile.NewSeqFile(subMeta, opts.LogPath, opts.IgnoreHidden, opts.NewFileNewLine, opts.IgnoreFileSuffixes, opts.ValidFilesRegex, opts.Whence, opts.expireMap, true)
fr, err := seqfile.NewSeqFile(subMeta, opts.LogPath, opts.IgnoreHidden, opts.NewFileNewLine, opts.IgnoreFileSuffixes, opts.ValidFilesRegex, opts.Whence, true)
if err != nil {
return nil, fmt.Errorf("new sequence file: %v", err)
}
Expand Down
1 change: 0 additions & 1 deletion reader/dirx/dirx.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,6 @@ func (r *Reader) statLogPath() {
MsgChan: r.msgChan,
ErrChan: r.errChan,
ReadSameInode: r.readSameInode,
expireMap: r.expireMap,
}, r.notFirstTime, r.maxLineLen)
if err != nil {
if err == ErrAlreadyExist {
Expand Down
12 changes: 7 additions & 5 deletions reader/dirx/dirx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

func createFileWithContent(filepathn, lines string) {
file, err := os.OpenFile(filepathn, os.O_CREATE|os.O_WRONLY, DefaultFilePerm)
file, err := os.OpenFile(filepathn, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, DefaultFilePerm)
if err != nil {
log.Error(err)
return
Expand Down Expand Up @@ -641,7 +641,6 @@ func multiReaderNewestOffsetTest(t *testing.T) {

createDirWithName(dir1)
createFileWithContent(dir1file1, "abc123\nabc124\nabc125\nabc126\nabc127\n")
time.Sleep(15 * time.Second)
expectResults := map[string]int{
"abc\nx\n": 1,
"abc\ny\n": 1,
Expand Down Expand Up @@ -728,6 +727,10 @@ func multiReaderNewestOffsetTest(t *testing.T) {
}

func multiReaderSameInodeTest(t *testing.T) {

}

func TestMultiReaderSameInodeTest(t *testing.T) {
dirname := "multiReaderSameInodeTest"
dir1 := filepath.Join(dirname, "logs/abc")
dir2 := filepath.Join(dirname, "logs/xyz")
Expand All @@ -745,8 +748,7 @@ func multiReaderSameInodeTest(t *testing.T) {
"abc124\n": 3,
"abc125\n": 3,
"abc126\n": 3,
"abc127\n": 3,
"abc128\n": 1,
"abc127\n": 2,
"abc\nx\n": 1,
"abc\ny\n": 1,
"abc\nz\n": 1,
Expand Down Expand Up @@ -823,7 +825,7 @@ func multiReaderSameInodeTest(t *testing.T) {
}
}
t.Log("Reader has finished reading two")
createFileWithContent(dir1file1, "abc123\nabc124\nabc125\nabc126\nabc127\nabc128\n")
createFileWithContent(dir1file1, "abc123\nabc124\nabc125\nabc126\n")
time.Sleep(5 * time.Second)
assert.Equal(t, 2, dr.dirReaders.Num(), "Number of readers")

Expand Down
20 changes: 20 additions & 0 deletions reader/meta.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package reader

import (
"bytes"
"errors"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -515,6 +516,25 @@ func (m *Meta) AppendDoneFileInode(path string, inode uint64, offset int64) (err
return
}

func (m *Meta) SyncDoneFileInode(inodeOffset map[string]int64) (err error) {
f, err := os.OpenFile(m.DoneFile(), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, DefaultFilePerm)
if err != nil {
return
}
defer f.Close()

var data bytes.Buffer
for inodeFile, offset := range inodeOffset {
str := strings.Split(inodeFile, "_")
if len(str) != 2 {
continue
}
data.WriteString(fmt.Sprintf("%s\t%v\t%v\t%s\n", str[0], str[1], offset, time.Now().Format(time.RFC3339Nano)))
}
_, err = fmt.Fprintf(f, data.String())
return
}

func (m *Meta) GetDoneFileContent() ([]string, error) {
return m.getDoneFileContent()
}
Expand Down
60 changes: 20 additions & 40 deletions reader/seqfile/seqfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,12 @@ type SeqFile struct {
SkipFileFirstLine bool //跳过新文件的第一行,常用于带title的csv文件,title与实际格式不同
hasSkiped bool

inodeOffset map[string]int64 //记录filename_inode是否已经读过
inodeSensitive bool // 是否以inode信息作为 inodeDone 和 expireMap 的key值
inodeOffset map[string]int64 //记录filename_inode是否已经读过
inodeSensitive bool // 是否以inode信息作为 inodeDone 和 inodeOffset 的key值

lastSyncPath string
lastSyncOffset int64

expireMap map[string]int64

ReadSameInode bool //记录已经读过的filename_inode是否继续读
}

Expand Down Expand Up @@ -106,7 +104,7 @@ func getStartFile(path, whence string, meta *reader.Meta, sf *SeqFile) (f *os.Fi
return
}

func NewSeqFile(meta *reader.Meta, path string, ignoreHidden, newFileNewLine bool, suffixes []string, validFileRegex, whence string, expireMap map[string]int64, inodeSensitive bool) (sf *SeqFile, err error) {
func NewSeqFile(meta *reader.Meta, path string, ignoreHidden, newFileNewLine bool, suffixes []string, validFileRegex, whence string, inodeSensitive bool) (sf *SeqFile, err error) {
sf = &SeqFile{
ignoreFileSuffix: suffixes,
ignoreHidden: ignoreHidden,
Expand All @@ -115,7 +113,6 @@ func NewSeqFile(meta *reader.Meta, path string, ignoreHidden, newFileNewLine boo
newFileAsNewLine: newFileNewLine,
meta: meta,
inodeOffset: make(map[string]int64),
expireMap: expireMap,
inodeSensitive: inodeSensitive,
}
//原来的for循环替换成单次执行,启动的时候出错就直接报错给用户即可,不需要等待重试。
Expand Down Expand Up @@ -465,7 +462,7 @@ func (sf *SeqFile) getNextFileCondition() (condition func(os.FileInfo) bool, err
key = filepath.Base(f.Name())
}
offset, ok := sf.inodeOffset[key]
return !ok || (sf.ReadSameInode && offset != -1 && f.Size() != offset)
return !ok || (sf.ReadSameInode && offset != -1 && f.Size() != offset)
}

condition = reader.AndCondition(reader.AndCondition(newerThanCurrFile, sf.getIgnoreCondition()), isNewFile)
Expand Down Expand Up @@ -582,7 +579,7 @@ func (sf *SeqFile) open(fi os.FileInfo) (err error) {
log.Warnf("Runner[%v] os.Open %s: %v", sf.meta.RunnerName, fname, err)
return err
}
sf.f = f

//开新的之前关掉老的
if sf.ratereader != nil {
sf.ratereader.Close()
Expand All @@ -593,6 +590,7 @@ func (sf *SeqFile) open(fi os.FileInfo) (err error) {
sf.ratereader = f
}
sf.offset = sf.getOffset(f, 0, true)
sf.f = f
sf.inode, err = utilsos.GetIdentifyIDByPath(sf.currFile)
if err != nil {
return err
Expand All @@ -610,7 +608,7 @@ func (sf *SeqFile) open(fi os.FileInfo) (err error) {
sf.inodeOffset[key] = doneFileOffset
tryTime := 0
for {
err := sf.meta.AppendDoneFileInode(doneFile, doneFileInode, doneFileOffset)
err := sf.meta.SyncDoneFileInode(sf.inodeOffset)
if err != nil {
if tryTime > 3 {
log.Errorf("Runner[%v] cannot write done file %s, err:%v, ignore this noefi", sf.meta.RunnerName, doneFile, err)
Expand Down Expand Up @@ -692,59 +690,41 @@ type LineSkipper interface {
}

func (sf *SeqFile) getOffset(f *os.File, offset int64, seek bool) int64 {
if len(sf.expireMap) == 0 || offset != 0 || f == nil {
if len(sf.inodeOffset) == 0 || offset != 0 || f == nil {
return offset
}

if sf.meta.IsExist() {
deleteNotExist(filepath.Dir(f.Name()), sf.expireMap, sf.inodeSensitive)
fileName := f.Name()
fileInfo, err := f.Stat()
if err != nil {
log.Errorf("Runner[%s] NewSeqFile get file %s info error %v, ignore...", sf.meta.RunnerName, fileName, err)
return offset
}

fileName := f.Name()
inode, err := utilsos.GetIdentifyIDByPath(fileName)
if err != nil {
log.Errorf("Runner[%s] NewSeqFile get file %s inode error %v, ignore...", sf.meta.RunnerName, fileName, err)
return offset
}
inodeStr := strconv.FormatUint(inode, 10)
var key string
if sf.inodeSensitive {
offset = sf.expireMap[inodeStr+"_"+fileName]
key = reader.JoinFileInode(fileName, strconv.FormatUint(inode, 10))
} else {
offset = sf.expireMap[fileName]
key = filepath.Base(fileName)
}
offset = sf.inodeOffset[key]
if fileInfo.Size() < offset {
offset = 0
}
if seek {
_, err = f.Seek(sf.offset, io.SeekStart)
_, err = f.Seek(offset, io.SeekStart)
if err != nil {
log.Errorf("Runner[%s] file: %s seek offset: %d failed: %v", sf.meta.RunnerName, f.Name(), sf.offset, err)
}
}
return offset
}

func deleteNotExist(dir string, expireMap map[string]int64, inodeSensitive bool) {
if dir == "" {
return
}
var arr []string
for inodeFile := range expireMap {
if inodeSensitive {
arr = strings.SplitN(inodeFile, "_", 2)
if len(arr) < 2 {
continue
}
if filepath.Dir(arr[1]) != dir {
continue
}
} else {
if filepath.Dir(inodeFile) != dir {
continue
}
}
delete(expireMap, inodeFile)
}
}

var (
_ LineSkipper = new(SeqFile)
_ reader.NewSourceRecorder = new(SeqFile)
Expand Down
Loading

0 comments on commit 93c2c43

Please sign in to comment.