Skip to content

Commit

Permalink
feat: support for 'create' event from the inotify watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
neelayu committed Dec 9, 2022
1 parent 19b97bf commit 97db2a1
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 59 deletions.
11 changes: 10 additions & 1 deletion watch/filechanges.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@ type FileChanges struct {
Modified chan bool // Channel to get notified of modifications
Truncated chan bool // Channel to get notified of truncations
Deleted chan bool // Channel to get notified of deletions/renames
Created chan bool // Channel to get notified of creations
}

func NewFileChanges() *FileChanges {
return &FileChanges{
make(chan bool, 1), make(chan bool, 1), make(chan bool, 1)}
make(chan bool, 1),
make(chan bool, 1),
make(chan bool, 1),
make(chan bool, 1),
}
}

func (fc *FileChanges) NotifyModified() {
Expand All @@ -23,6 +28,10 @@ func (fc *FileChanges) NotifyDeleted() {
sendOnlyIfEmpty(fc.Deleted)
}

func (fc *FileChanges) NotifyCreated() {
sendOnlyIfEmpty(fc.Created)
}

// sendOnlyIfEmpty sends on a bool channel only if the channel has no
// backlog to be read by other goroutines. This concurrency pattern
// can be used to notify other goroutines if and only if they are
Expand Down
2 changes: 2 additions & 0 deletions watch/inotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) (*FileChange
} else {
changes.NotifyModified()
}
case evt.Op&fsnotify.Create == fsnotify.Create:
changes.NotifyCreated()
}
}
}()
Expand Down
25 changes: 19 additions & 6 deletions watch/inotify_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"sync"
"syscall"

"github.com/influxdata/tail/util"

"gopkg.in/fsnotify.v1"

"github.com/influxdata/tail/util"
)

type InotifyTracker struct {
Expand All @@ -31,8 +31,8 @@ type watchInfo struct {
fname string
}

func (this *watchInfo) isCreate() bool {
return this.op == fsnotify.Create
func (winfo *watchInfo) isCreate() bool {
return winfo.op == fsnotify.Create
}

var (
Expand Down Expand Up @@ -199,8 +199,21 @@ func (shared *InotifyTracker) sendEvent(event fsnotify.Event) {
name := filepath.Clean(event.Name)

shared.mux.Lock()
ch := shared.chans[name]
done := shared.done[name]

// since the watcher could be defined on a directory, we check if the directory is present in the channels map
dir := filepath.Dir(name)
var ch chan fsnotify.Event
var done chan bool
var ok bool

if ch, ok = shared.chans[dir]; ok {
// watcher on directory present, only need to initialize "done"
done = shared.done[dir]
} else {
ch = shared.chans[name]
done = shared.done[name]
}

shared.mux.Unlock()

if ch != nil && done != nil {
Expand Down
3 changes: 2 additions & 1 deletion watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ type FileWatcher interface {
BlockUntilExists(*tomb.Tomb) error

// ChangeEvents reports on changes to a file, be it modification,
// deletion, renames or truncations. Returned FileChanges group of
// deletion, creation, renames or truncations. Returned FileChanges group of
// channels will be closed, thus become unusable, after a deletion
// or truncation event.
// In order to properly report truncations, ChangeEvents requires
// the caller to pass their current offset in the file.
// File creations are reported when the watcher is initialized on the parent directory.
ChangeEvents(*tomb.Tomb, int64) (*FileChanges, error)
}
145 changes: 94 additions & 51 deletions watch/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package watch
import (
"errors"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
Expand All @@ -16,64 +15,105 @@ import (
)

func TestWatchNotify(t *testing.T) {
tmpDir := t.TempDir()
testCases := []struct {
name string
poll bool
name string
poll bool
toWatch func() []string
}{
{"Test watch inotify", false},
{"Test watch poll", true},
{
name: "Test watch inotify with directory",
poll: false,
toWatch: func() []string {
dirPath := filepath.Join(tmpDir, "testDir")
err := os.Mkdir(dirPath, 0755)
if err != nil {
t.Fatal(err)
}
xyzFile := filepath.Join(tmpDir, "xyz")
f, err := os.Create(xyzFile)
if err != nil {
t.Fatal(err)
}
f.Close()
return []string{dirPath, xyzFile}
},
},
{
name: "Test watch poll",
poll: true,
toWatch: func() []string {
abcFile := filepath.Join(tmpDir, "abc")
f, err := os.Create(abcFile)
if err != nil {
t.Fatal(err)
}
f.Close()
xyzFile := filepath.Join(tmpDir, "xyz")
f, err = os.Create(xyzFile)
if err != nil {
t.Fatal(err)
}
f.Close()
return []string{abcFile, xyzFile}
},
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "watch-")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpDir)
filePath := filepath.Join(tmpDir, "a")
// create file
file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0777)
if err != nil {
t.Fatal(err)
}
err = file.Close()
if err != nil {
t.Fatal(err)
}
filesToWatch := test.toWatch()

var wg sync.WaitGroup
var werr error
changes := 0
chanClose := make(chan struct{})
wg.Add(1)
go func() {
changes, werr = watchFile(filePath, test.poll, chanClose)
wg.Done()
}()

writeToFile(t, filePath, "hello", true)
<-time.After(time.Second)
writeToFile(t, filePath, "world", true)
<-time.After(time.Second)
writeToFile(t, filePath, "end", false)
<-time.After(time.Second)
//err = os.Remove(filePath)
//if err != nil {
// t.Fatal(err)
//}
rmFile(t, filePath)
chanClose <- struct{}{}
wg.Wait()
close(chanClose)
// each file path test is done synchronously, but watcher works async
for _, filePath := range filesToWatch {
changes := 0
var werr error
var wg sync.WaitGroup
chanClose := make(chan struct{})
wg.Add(1)
go func(filePath string) {
changes, werr = watchFile(filePath, test.poll, chanClose)
wg.Done()
}(filePath)
wait := make(chan bool)

if werr != nil {
t.Fatal(werr)
}
// ideally, there should be 4 changes (2xmodified,1xtruncaed and 1xdeleted)
// but, notifications from fsnotify are usually 2 (2xmodify) and 3x from poll (2xmodify, 1xtruncated)
if changes < 1 || changes > 4 {
t.Errorf("Invalid changes count: %d\n", changes)
// check if file is a directory, if yes, create a file
if fi, err := os.Stat(filePath); err == nil && fi.IsDir() {
time.AfterFunc(time.Second, func() {
f, err := os.Create(filepath.Join(filePath, "a"))
if err != nil {
t.Fatal(err)
}
f.Close()
filePath, _ = filepath.Abs(f.Name())
wait <- true
})
<-wait
}
writeToFile(t, filePath, "hello", true)
<-time.After(time.Second)
writeToFile(t, filePath, "world", true)
<-time.After(time.Second)
writeToFile(t, filePath, "end", false)
<-time.After(time.Second)
//err = os.Remove(filePath)
//if err != nil {
// t.Fatal(err)
//}
rmFile(t, filePath)
chanClose <- struct{}{}
close(chanClose)
close(wait)
wg.Wait()
if werr != nil {
t.Fatal(werr)
}
// ideally, there should be 4 changes (2xmodified,1xtruncaed and 1xdeleted)
// but, notifications from fsnotify are usually 2 (2xmodify) and 3x from poll (2xmodify, 1xtruncated)
if changes < 1 || changes > 4 {
t.Errorf("Invalid changes count: %d\n", changes)
}
}

})
}
}
Expand Down Expand Up @@ -151,6 +191,9 @@ func watchFile(path string, poll bool, close <-chan struct{}) (int, error) {
case <-changes.Truncated:
fmt.Println("Truncated")
changesCount++
case <-changes.Created:
fmt.Println("Created")
changesCount++
case <-mytomb.Dying():
return -1, errors.New("dying")
case <-close:
Expand Down

0 comments on commit 97db2a1

Please sign in to comment.