Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some fixes regarding unwatching and rewatching a path #129

Merged
merged 14 commits into from
Oct 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ install:
script:
- "(go version | grep -q 1.4) || go tool vet -all ."
- go install $GOFLAGS ./...
- go test -v -race $GOFLAGS ./...
- go test -v -timeout 60s -race $GOFLAGS ./...
2 changes: 1 addition & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ install:
build_script:
- go tool vet -all .
- go build ./...
- go test -v -race ./...
- go test -v -timeout 60s -race ./...

test: off

Expand Down
4 changes: 4 additions & 0 deletions testing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ func newWatcherTest(t *testing.T, tree string) *W {
if err != nil {
t.Fatalf(`tmptree("", %q)=%v`, tree, err)
}
root, _, err = cleanpath(root)
if err != nil {
t.Fatalf(`cleanpath(%q)=%v`, root, err)
}
Sync()
return &W{
t: t,
Expand Down
11 changes: 2 additions & 9 deletions watcher_fen.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,7 @@ type fen struct {

// watched is a data structure representing watched file/directory.
type watched struct {
// p is a path to watched file/directory
p string
// fi provides information about watched file/dir
fi os.FileInfo
// eDir represents events watched directly
eDir Event
// eNonDir represents events watched indirectly
eNonDir Event
trgWatched
}

// Stop implements trigger.
Expand All @@ -55,7 +48,7 @@ func (f *fen) Close() (err error) {

// NewWatched implements trigger.
func (*fen) NewWatched(p string, fi os.FileInfo) (*watched, error) {
return &watched{p: p, fi: fi}, nil
return &watched{trgWatched{p: p, fi: fi}}, nil
}

// Record implements trigger.
Expand Down
8 changes: 0 additions & 8 deletions watcher_fsevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"sync/atomic"
)

// TODO(rjeczalik): get rid of calls to canonical, it's tree responsibility

const (
failure = uint32(FSEventsMustScanSubDirs | FSEventsUserDropped | FSEventsKernelDropped)
filter = uint32(FSEventsCreated | FSEventsRemoved | FSEventsRenamed |
Expand Down Expand Up @@ -189,9 +187,6 @@ func newWatcher(c chan<- EventInfo) watcher {
}

func (fse *fsevents) watch(path string, event Event, isrec int32) (err error) {
if path, err = canonical(path); err != nil {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can remove this yet, until the functionality is moved to tree.

Morever I think it'd need to stay here because of fse.watches - if we put a watch on a dir and a symlink to it, fse.watches must not hold two watches but only one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it already in the tree:
https://github.com/rjeczalik/notify/blob/master/tree_recursive.go#L164

Cleanpath subsequently calls canonical on the path and no other watcher implementation uses canonical.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is, but fse.watches maintains its own path -> watches mapping, if we get rid of canonical, we may have duplicated watches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't understand it: The paths in fse.watches that were previously passed through canonical are those given as arguments to the exported (Recursive-)Re-/Watch functions, which are called from the tree, where the paths were already canonicalized in the tree - how else can duplicated paths enter fse.watches?

Other watchers maintains such a map of paths to structs as well, but don't canonicalize paths (e.g. readdcw in r.m, trigger in pthLkp).

Also the new tests fail when canonical is reintroduced, as it won't remove a non-existing path, because canonical stats the path and thus fails.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which are called from the tree, where the paths were already canonicalized in the tree

You're right, I take back my comment about duplicates - canonical is not needed here.

Also the new tests fail when canonical is reintroduced, as it won't remove a non-existing path, because canonical stats the path and thus fails.

Good point.

return err
}
if _, ok := fse.watches[path]; ok {
return errAlreadyWatched
}
Expand All @@ -211,9 +206,6 @@ func (fse *fsevents) watch(path string, event Event, isrec int32) (err error) {
}

func (fse *fsevents) unwatch(path string) (err error) {
if path, err = canonical(path); err != nil {
return
}
w, ok := fse.watches[path]
if !ok {
return errNotWatched
Expand Down
12 changes: 10 additions & 2 deletions watcher_inotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func (i *inotify) Unwatch(path string) (err error) {
return errors.New("notify: path " + path + " is already watched")
}
fd := atomic.LoadInt32(&i.fd)
if _, err = unix.InotifyRmWatch(int(fd), uint32(iwd)); err != nil {
if err = removeInotifyWatch(fd, iwd); err != nil {
return
}
i.Lock()
Expand All @@ -378,7 +378,7 @@ func (i *inotify) Close() (err error) {
return nil
}
for iwd := range i.m {
if _, e := unix.InotifyRmWatch(int(i.fd), uint32(iwd)); e != nil && err == nil {
if e := removeInotifyWatch(i.fd, iwd); e != nil && err == nil {
err = e
}
delete(i.m, iwd)
Expand All @@ -395,3 +395,11 @@ func (i *inotify) Close() (err error) {
}
return
}

// if path was removed, notify already removed the watch and returns EINVAL error
func removeInotifyWatch(fd int32, iwd int32) (err error) {
if _, err = unix.InotifyRmWatch(int(fd), uint32(iwd)); err != nil && err != unix.EINVAL {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return
}
return nil
}
14 changes: 5 additions & 9 deletions watcher_kqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,9 @@ type kq struct {

// watched is a data structure representing watched file/directory.
type watched struct {
// p is a path to watched file/directory.
p string
trgWatched
// fd is a file descriptor for watched file/directory.
fd int
// fi provides information about watched file/dir.
fi os.FileInfo
// eDir represents events watched directly.
eDir Event
// eNonDir represents events watched indirectly.
eNonDir Event
}

// Stop implements trigger.
Expand All @@ -66,7 +59,10 @@ func (*kq) NewWatched(p string, fi os.FileInfo) (*watched, error) {
if err != nil {
return nil, err
}
return &watched{fd: fd, p: p, fi: fi}, nil
return &watched{
trgWatched: trgWatched{p: p, fi: fi},
fd: fd,
}, nil
}

// Record implements trigger.
Expand Down
50 changes: 29 additions & 21 deletions watcher_readdcw.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,16 +284,18 @@ func (r *readdcw) watch(path string, event Event, recursive bool) (err error) {
return
}
r.Lock()
defer r.Unlock()
if wd, ok = r.m[path]; ok {
r.Unlock()
dbgprint("watch: exists already")
return
}
if wd, err = newWatched(r.cph, uint32(event), recursive, path); err != nil {
r.Unlock()
return
}
r.m[path] = wd
r.Unlock()
dbgprint("watch: new watch added")
} else {
dbgprint("watch: exists already")
}
return nil
}
Expand Down Expand Up @@ -337,33 +339,32 @@ func (r *readdcw) loop() {
continue
}
overEx := (*overlappedEx)(unsafe.Pointer(overlapped))
if n == 0 {
r.loopstate(overEx)
} else {
if n != 0 {
r.loopevent(n, overEx)
if err = overEx.parent.readDirChanges(); err != nil {
// TODO: error handling
}
}
r.loopstate(overEx)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/cc @ppknap

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍

}
}

// TODO(pknap) : doc
func (r *readdcw) loopstate(overEx *overlappedEx) {
filter := atomic.LoadUint32(&overEx.parent.parent.filter)
r.Lock()
defer r.Unlock()
filter := overEx.parent.parent.filter
if filter&onlyMachineStates == 0 {
return
}
if overEx.parent.parent.count--; overEx.parent.parent.count == 0 {
switch filter & onlyMachineStates {
case stateRewatch:
r.Lock()
dbgprint("loopstate rewatch")
overEx.parent.parent.recreate(r.cph)
r.Unlock()
case stateUnwatch:
r.Lock()
dbgprint("loopstate unwatch")
delete(r.m, syscall.UTF16ToString(overEx.parent.pathw))
r.Unlock()
case stateCPClose:
default:
panic(`notify: windows loopstate logic error`)
Expand Down Expand Up @@ -450,8 +451,8 @@ func (r *readdcw) rewatch(path string, oldevent, newevent uint32, recursive bool
}
var wd *watched
r.Lock()
if wd, err = r.nonStateWatched(path); err != nil {
r.Unlock()
defer r.Unlock()
if wd, err = r.nonStateWatchedLocked(path); err != nil {
return
}
if wd.filter&(onlyNotifyChanges|onlyNGlobalEvents) != oldevent {
Expand All @@ -462,21 +463,19 @@ func (r *readdcw) rewatch(path string, oldevent, newevent uint32, recursive bool
if err = wd.closeHandle(); err != nil {
wd.filter = oldevent
wd.recursive = recursive
r.Unlock()
return
}
r.Unlock()
return
}

// TODO : pknap
func (r *readdcw) nonStateWatched(path string) (wd *watched, err error) {
func (r *readdcw) nonStateWatchedLocked(path string) (wd *watched, err error) {
wd, ok := r.m[path]
if !ok || wd == nil {
err = errors.New(`notify: ` + path + ` path is unwatched`)
return
}
if filter := atomic.LoadUint32(&wd.filter); filter&onlyMachineStates != 0 {
if wd.filter&onlyMachineStates != 0 {
err = errors.New(`notify: another re/unwatching operation in progress`)
return
}
Expand All @@ -497,17 +496,26 @@ func (r *readdcw) RecursiveUnwatch(path string) error {
func (r *readdcw) unwatch(path string) (err error) {
var wd *watched
r.Lock()
if wd, err = r.nonStateWatched(path); err != nil {
r.Unlock()
defer r.Unlock()
if wd, err = r.nonStateWatchedLocked(path); err != nil {
return
}
wd.filter |= stateUnwatch
if err = wd.closeHandle(); err != nil {
wd.filter &^= stateUnwatch
r.Unlock()
return
}
r.Unlock()
if _, attrErr := syscall.GetFileAttributes(&wd.pathw[0]); attrErr != nil {
for _, g := range wd.digrip {
if g != nil {
dbgprint("unwatch: posting")
if err = syscall.PostQueuedCompletionStatus(r.cph, 0, 0, (*syscall.Overlapped)(unsafe.Pointer(g.ovlapped))); err != nil {
wd.filter &^= stateUnwatch
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/cc @ppknap

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return
}
}
}
}
return
}

Expand Down
56 changes: 55 additions & 1 deletion watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@

package notify

import "testing"
import (
"os"
"testing"
"time"
)

// NOTE Set DEBUG env var for extra debugging info.

Expand All @@ -30,3 +34,53 @@ func TestWatcher(t *testing.T) {

w.ExpectAny(cases[:])
}

// Simulates the scenario, where outside of the programs control the base dir
// is removed. This is detected and the watch removed. Then the directory is
// restored and a new watch set up.
func TestStopPathNotExists(t *testing.T) {
w := NewWatcherTest(t, "testdata/vfs.txt")
defer w.Close()

if err := os.RemoveAll(w.root); err != nil {
panic(err)
}
Sync()

// Don't check the returned error, as the public function (notify.Stop)
// does not return a potential error. As long as everything later on
// works as inteded, that's fine
time.Sleep(time.Duration(100) * time.Millisecond)
w.Watcher.Unwatch(w.root)
time.Sleep(time.Duration(100) * time.Millisecond)

if err := os.Mkdir(w.root, 0777); err != nil {
panic(err)
}
Sync()
w.Watch("", All)

drainall(w.C)
cases := [...]WCase{
create(w, "file"),
create(w, "dir/"),
}
w.ExpectAny(cases[:])
}

func TestWatcherUnwatch(t *testing.T) {
w := NewWatcherTest(t, "testdata/vfs.txt")
defer w.Close()

remove(w, "src/github.com/ppknap/link/test/test_circular_calls.cpp").Action()
w.Unwatch("")

w.Watch("", All)

drainall(w.C)
cases := [...]WCase{
create(w, "file"),
create(w, "dir/"),
}
w.ExpectAny(cases[:])
}
21 changes: 19 additions & 2 deletions watcher_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package notify

import (
"fmt"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -56,6 +57,19 @@ type trigger interface {
IsStop(n interface{}, err error) bool
}

// trgWatched is a the base data structure representing watched file/directory.
// The platform specific full data structure (watched) must embed this type.
type trgWatched struct {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Pawel's review is not needed here since this struct is just a "base" for watched type from Kqueue's and FEN impls.

// p is a path to watched file/directory.
p string
// fi provides information about watched file/dir.
fi os.FileInfo
// eDir represents events watched directly.
eDir Event
// eNonDir represents events watched indirectly.
eNonDir Event
}

// encode Event to native representation. Implementation is to be provided by
// platform specific implementation.
var encode func(Event, bool) int64
Expand Down Expand Up @@ -117,6 +131,9 @@ func (t *trg) Close() (err error) {
dbgprintf("trg: closing native watch failed: %q\n", e)
err = nonil(err, e)
}
if remaining := len(t.pthLkp); remaining != 0 {
err = nonil(err, fmt.Errorf("Not all watches were removed: len(t.pthLkp) == %v", len(t.pthLkp)))
}
t.Unlock()
return
}
Expand Down Expand Up @@ -175,7 +192,7 @@ func decode(o int64, w Event) (e Event) {
func (t *trg) watch(p string, e Event, fi os.FileInfo) error {
if err := t.singlewatch(p, e, dir, fi); err != nil {
if err != errAlreadyWatched {
return nil
return err
}
}
if fi.IsDir() {
Expand Down Expand Up @@ -361,7 +378,7 @@ func (t *trg) singleunwatch(p string, direct mode) error {
}
if w.eNonDir|w.eDir != 0 {
mod := dir
if w.eNonDir == 0 {
if w.eNonDir != 0 {
mod = ndir
}
if err := t.singlewatch(p, w.eNonDir|w.eDir, mod,
Expand Down