Skip to content

Commit

Permalink
Merge pull request #129 from imsodin/unwatch
Browse files Browse the repository at this point in the history
Some fixes regarding unwatching and rewatching a path
  • Loading branch information
rjeczalik authored Oct 4, 2017
2 parents 88a54d9 + 9a90a47 commit 1aa3b9d
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 54 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ install:
script:
- "(go version | grep -q 1.4) || go tool vet -all ."
- go install $GOFLAGS ./...
- go test -v -race $GOFLAGS ./...
- go test -v -timeout 60s -race $GOFLAGS ./...
2 changes: 1 addition & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ install:
build_script:
- go tool vet -all .
- go build ./...
- go test -v -race ./...
- go test -v -timeout 60s -race ./...

test: off

Expand Down
4 changes: 4 additions & 0 deletions testing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ func newWatcherTest(t *testing.T, tree string) *W {
if err != nil {
t.Fatalf(`tmptree("", %q)=%v`, tree, err)
}
root, _, err = cleanpath(root)
if err != nil {
t.Fatalf(`cleanpath(%q)=%v`, root, err)
}
Sync()
return &W{
t: t,
Expand Down
11 changes: 2 additions & 9 deletions watcher_fen.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,7 @@ type fen struct {

// watched is a data structure representing watched file/directory.
type watched struct {
// p is a path to watched file/directory
p string
// fi provides information about watched file/dir
fi os.FileInfo
// eDir represents events watched directly
eDir Event
// eNonDir represents events watched indirectly
eNonDir Event
trgWatched
}

// Stop implements trigger.
Expand All @@ -55,7 +48,7 @@ func (f *fen) Close() (err error) {

// NewWatched implements trigger.
func (*fen) NewWatched(p string, fi os.FileInfo) (*watched, error) {
return &watched{p: p, fi: fi}, nil
return &watched{trgWatched{p: p, fi: fi}}, nil
}

// Record implements trigger.
Expand Down
8 changes: 0 additions & 8 deletions watcher_fsevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"sync/atomic"
)

// TODO(rjeczalik): get rid of calls to canonical, it's tree responsibility

const (
failure = uint32(FSEventsMustScanSubDirs | FSEventsUserDropped | FSEventsKernelDropped)
filter = uint32(FSEventsCreated | FSEventsRemoved | FSEventsRenamed |
Expand Down Expand Up @@ -189,9 +187,6 @@ func newWatcher(c chan<- EventInfo) watcher {
}

func (fse *fsevents) watch(path string, event Event, isrec int32) (err error) {
if path, err = canonical(path); err != nil {
return err
}
if _, ok := fse.watches[path]; ok {
return errAlreadyWatched
}
Expand All @@ -211,9 +206,6 @@ func (fse *fsevents) watch(path string, event Event, isrec int32) (err error) {
}

func (fse *fsevents) unwatch(path string) (err error) {
if path, err = canonical(path); err != nil {
return
}
w, ok := fse.watches[path]
if !ok {
return errNotWatched
Expand Down
12 changes: 10 additions & 2 deletions watcher_inotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func (i *inotify) Unwatch(path string) (err error) {
return errors.New("notify: path " + path + " is already watched")
}
fd := atomic.LoadInt32(&i.fd)
if _, err = unix.InotifyRmWatch(int(fd), uint32(iwd)); err != nil {
if err = removeInotifyWatch(fd, iwd); err != nil {
return
}
i.Lock()
Expand All @@ -378,7 +378,7 @@ func (i *inotify) Close() (err error) {
return nil
}
for iwd := range i.m {
if _, e := unix.InotifyRmWatch(int(i.fd), uint32(iwd)); e != nil && err == nil {
if e := removeInotifyWatch(i.fd, iwd); e != nil && err == nil {
err = e
}
delete(i.m, iwd)
Expand All @@ -395,3 +395,11 @@ func (i *inotify) Close() (err error) {
}
return
}

// if path was removed, notify already removed the watch and returns EINVAL error
func removeInotifyWatch(fd int32, iwd int32) (err error) {
if _, err = unix.InotifyRmWatch(int(fd), uint32(iwd)); err != nil && err != unix.EINVAL {
return
}
return nil
}
14 changes: 5 additions & 9 deletions watcher_kqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,9 @@ type kq struct {

// watched is a data structure representing watched file/directory.
type watched struct {
// p is a path to watched file/directory.
p string
trgWatched
// fd is a file descriptor for watched file/directory.
fd int
// fi provides information about watched file/dir.
fi os.FileInfo
// eDir represents events watched directly.
eDir Event
// eNonDir represents events watched indirectly.
eNonDir Event
}

// Stop implements trigger.
Expand All @@ -66,7 +59,10 @@ func (*kq) NewWatched(p string, fi os.FileInfo) (*watched, error) {
if err != nil {
return nil, err
}
return &watched{fd: fd, p: p, fi: fi}, nil
return &watched{
trgWatched: trgWatched{p: p, fi: fi},
fd: fd,
}, nil
}

// Record implements trigger.
Expand Down
50 changes: 29 additions & 21 deletions watcher_readdcw.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,16 +284,18 @@ func (r *readdcw) watch(path string, event Event, recursive bool) (err error) {
return
}
r.Lock()
defer r.Unlock()
if wd, ok = r.m[path]; ok {
r.Unlock()
dbgprint("watch: exists already")
return
}
if wd, err = newWatched(r.cph, uint32(event), recursive, path); err != nil {
r.Unlock()
return
}
r.m[path] = wd
r.Unlock()
dbgprint("watch: new watch added")
} else {
dbgprint("watch: exists already")
}
return nil
}
Expand Down Expand Up @@ -337,33 +339,32 @@ func (r *readdcw) loop() {
continue
}
overEx := (*overlappedEx)(unsafe.Pointer(overlapped))
if n == 0 {
r.loopstate(overEx)
} else {
if n != 0 {
r.loopevent(n, overEx)
if err = overEx.parent.readDirChanges(); err != nil {
// TODO: error handling
}
}
r.loopstate(overEx)
}
}

// TODO(pknap) : doc
func (r *readdcw) loopstate(overEx *overlappedEx) {
filter := atomic.LoadUint32(&overEx.parent.parent.filter)
r.Lock()
defer r.Unlock()
filter := overEx.parent.parent.filter
if filter&onlyMachineStates == 0 {
return
}
if overEx.parent.parent.count--; overEx.parent.parent.count == 0 {
switch filter & onlyMachineStates {
case stateRewatch:
r.Lock()
dbgprint("loopstate rewatch")
overEx.parent.parent.recreate(r.cph)
r.Unlock()
case stateUnwatch:
r.Lock()
dbgprint("loopstate unwatch")
delete(r.m, syscall.UTF16ToString(overEx.parent.pathw))
r.Unlock()
case stateCPClose:
default:
panic(`notify: windows loopstate logic error`)
Expand Down Expand Up @@ -450,8 +451,8 @@ func (r *readdcw) rewatch(path string, oldevent, newevent uint32, recursive bool
}
var wd *watched
r.Lock()
if wd, err = r.nonStateWatched(path); err != nil {
r.Unlock()
defer r.Unlock()
if wd, err = r.nonStateWatchedLocked(path); err != nil {
return
}
if wd.filter&(onlyNotifyChanges|onlyNGlobalEvents) != oldevent {
Expand All @@ -462,21 +463,19 @@ func (r *readdcw) rewatch(path string, oldevent, newevent uint32, recursive bool
if err = wd.closeHandle(); err != nil {
wd.filter = oldevent
wd.recursive = recursive
r.Unlock()
return
}
r.Unlock()
return
}

// TODO : pknap
func (r *readdcw) nonStateWatched(path string) (wd *watched, err error) {
func (r *readdcw) nonStateWatchedLocked(path string) (wd *watched, err error) {
wd, ok := r.m[path]
if !ok || wd == nil {
err = errors.New(`notify: ` + path + ` path is unwatched`)
return
}
if filter := atomic.LoadUint32(&wd.filter); filter&onlyMachineStates != 0 {
if wd.filter&onlyMachineStates != 0 {
err = errors.New(`notify: another re/unwatching operation in progress`)
return
}
Expand All @@ -497,17 +496,26 @@ func (r *readdcw) RecursiveUnwatch(path string) error {
func (r *readdcw) unwatch(path string) (err error) {
var wd *watched
r.Lock()
if wd, err = r.nonStateWatched(path); err != nil {
r.Unlock()
defer r.Unlock()
if wd, err = r.nonStateWatchedLocked(path); err != nil {
return
}
wd.filter |= stateUnwatch
if err = wd.closeHandle(); err != nil {
wd.filter &^= stateUnwatch
r.Unlock()
return
}
r.Unlock()
if _, attrErr := syscall.GetFileAttributes(&wd.pathw[0]); attrErr != nil {
for _, g := range wd.digrip {
if g != nil {
dbgprint("unwatch: posting")
if err = syscall.PostQueuedCompletionStatus(r.cph, 0, 0, (*syscall.Overlapped)(unsafe.Pointer(g.ovlapped))); err != nil {
wd.filter &^= stateUnwatch
return
}
}
}
}
return
}

Expand Down
56 changes: 55 additions & 1 deletion watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@

package notify

import "testing"
import (
"os"
"testing"
"time"
)

// NOTE Set DEBUG env var for extra debugging info.

Expand All @@ -30,3 +34,53 @@ func TestWatcher(t *testing.T) {

w.ExpectAny(cases[:])
}

// Simulates the scenario, where outside of the programs control the base dir
// is removed. This is detected and the watch removed. Then the directory is
// restored and a new watch set up.
func TestStopPathNotExists(t *testing.T) {
w := NewWatcherTest(t, "testdata/vfs.txt")
defer w.Close()

if err := os.RemoveAll(w.root); err != nil {
panic(err)
}
Sync()

// Don't check the returned error, as the public function (notify.Stop)
// does not return a potential error. As long as everything later on
// works as inteded, that's fine
time.Sleep(time.Duration(100) * time.Millisecond)
w.Watcher.Unwatch(w.root)
time.Sleep(time.Duration(100) * time.Millisecond)

if err := os.Mkdir(w.root, 0777); err != nil {
panic(err)
}
Sync()
w.Watch("", All)

drainall(w.C)
cases := [...]WCase{
create(w, "file"),
create(w, "dir/"),
}
w.ExpectAny(cases[:])
}

func TestWatcherUnwatch(t *testing.T) {
w := NewWatcherTest(t, "testdata/vfs.txt")
defer w.Close()

remove(w, "src/github.com/ppknap/link/test/test_circular_calls.cpp").Action()
w.Unwatch("")

w.Watch("", All)

drainall(w.C)
cases := [...]WCase{
create(w, "file"),
create(w, "dir/"),
}
w.ExpectAny(cases[:])
}
21 changes: 19 additions & 2 deletions watcher_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package notify

import (
"fmt"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -56,6 +57,19 @@ type trigger interface {
IsStop(n interface{}, err error) bool
}

// trgWatched is a the base data structure representing watched file/directory.
// The platform specific full data structure (watched) must embed this type.
type trgWatched struct {
// p is a path to watched file/directory.
p string
// fi provides information about watched file/dir.
fi os.FileInfo
// eDir represents events watched directly.
eDir Event
// eNonDir represents events watched indirectly.
eNonDir Event
}

// encode Event to native representation. Implementation is to be provided by
// platform specific implementation.
var encode func(Event, bool) int64
Expand Down Expand Up @@ -117,6 +131,9 @@ func (t *trg) Close() (err error) {
dbgprintf("trg: closing native watch failed: %q\n", e)
err = nonil(err, e)
}
if remaining := len(t.pthLkp); remaining != 0 {
err = nonil(err, fmt.Errorf("Not all watches were removed: len(t.pthLkp) == %v", len(t.pthLkp)))
}
t.Unlock()
return
}
Expand Down Expand Up @@ -175,7 +192,7 @@ func decode(o int64, w Event) (e Event) {
func (t *trg) watch(p string, e Event, fi os.FileInfo) error {
if err := t.singlewatch(p, e, dir, fi); err != nil {
if err != errAlreadyWatched {
return nil
return err
}
}
if fi.IsDir() {
Expand Down Expand Up @@ -361,7 +378,7 @@ func (t *trg) singleunwatch(p string, direct mode) error {
}
if w.eNonDir|w.eDir != 0 {
mod := dir
if w.eNonDir == 0 {
if w.eNonDir != 0 {
mod = ndir
}
if err := t.singlewatch(p, w.eNonDir|w.eDir, mod,
Expand Down

0 comments on commit 1aa3b9d

Please sign in to comment.