Skip to content

Commit

Permalink
(wip) runtime: Dispatching events back to user channels
Browse files Browse the repository at this point in the history
  • Loading branch information
rjeczalik committed Oct 18, 2014
1 parent 235fc01 commit 597770c
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 43 deletions.
5 changes: 0 additions & 5 deletions runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ func (sub Subscriber) Unsubscribe(c chan<- EventInfo) (diff EventDiff) {
return
}

// Total TODO
func (sub Subscriber) Total() Event {
return sub[nil]
}

// Interface TODO
type Interface interface {
// Watcher provides a minimum functionality required, that must be implemented
Expand Down
32 changes: 23 additions & 9 deletions runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
)

func TestRuntime_DirectorySimple(t *testing.T) {
ch := test.Channels(3)
cases := [...]test.CallCase{{
scope, ch := test.R(t), test.Channels(3)
calls := [...]test.CallCase{{
Call: test.Call{
F: test.Watch,
C: ch(0),
C: ch[0],
P: "/github.com/rjeczalik/fakerpc/",
E: Create | Delete | Move,
},
Expand All @@ -25,23 +25,23 @@ func TestRuntime_DirectorySimple(t *testing.T) {
}, {
Call: test.Call{
F: test.Watch,
C: ch(1),
C: ch[1],
P: "/github.com/rjeczalik/fakerpc/",
E: Delete | Move,
},
Record: nil,
}, {
Call: test.Call{
F: test.Watch,
C: ch(2),
C: ch[2],
P: "/github.com/rjeczalik/fakerpc/",
E: Move,
},
Record: nil,
}, {
Call: test.Call{
F: test.Watch,
C: ch(0),
C: ch[0],
P: "/github.com/rjeczalik/fs/",
E: Create | Delete,
},
Expand All @@ -54,15 +54,15 @@ func TestRuntime_DirectorySimple(t *testing.T) {
}, {
Call: test.Call{
F: test.Watch,
C: ch(0),
C: ch[0],
P: "/github.com/rjeczalik/fs/",
E: Create,
},
Record: nil,
}, {
Call: test.Call{
F: test.Stop,
C: ch(0),
C: ch[0],
},
Record: test.Record{
test.Watcher: {{
Expand All @@ -87,5 +87,19 @@ func TestRuntime_DirectorySimple(t *testing.T) {
}},
},
}}
test.ExpectCalls(t, cases[:])
events := [...]test.EventCase{{
Event: test.Event{
P: "/github.com/rjeczalik/fakerpc/.fakerpc.go.swp",
E: Delete,
},
Receiver: test.Chans{ch[1]},
}, {
Event: test.Event{
P: "/github.com/rjeczalik/fakerpc/.travis.yml",
E: Move,
},
Receiver: test.Chans{ch[1], ch[2]},
}}
scope.ExpectCalls(calls[:])
scope.ExpectEvents(events[:])
}
135 changes: 106 additions & 29 deletions test/r.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"reflect"
"testing"
"time"

"github.com/rjeczalik/notify"
)
Expand All @@ -14,7 +15,7 @@ type FuncType string
const (
Watch = FuncType("Watch")
Unwatch = FuncType("Unwatch")
Dispatch = FuncType("Dispatch")
Dispatch = FuncType("Dispatch")
Rewatch = FuncType("Rewatch")
RecursiveWatch = FuncType("RecursiveWatch")
RecursiveUnwatch = FuncType("RecursiveUnwatch")
Expand All @@ -32,7 +33,7 @@ const (

// All TODO
//
// NOTE(rjeczalik): For use with Record type only.
// NOTE(rjeczalik): For use only as a key of Record.
const All RuntimeType = Watcher | Rewatcher | Recursive

// Strings implements fmt.Stringer interface.
Expand All @@ -45,19 +46,17 @@ func (typ RuntimeType) String() string {
case Recursive:
return "RuntimeRecursive"
}

return "<invalid runtime type>"
}

// Channels is a utility function which creates pool of chan <- notify.EventInfo.
func Channels(n int) func(int) chan<- notify.EventInfo {
// Channels is a utility function which creates slice of opened notify.EventInfo
// channels.
func Channels(n int) []chan notify.EventInfo {
ch := make([]chan notify.EventInfo, n)
for i := range ch {
ch[i] = make(chan notify.EventInfo)
}
return func(i int) chan<- notify.EventInfo {
return ch[i]
}
return ch
}

// Call represents single call to notify.Watcher issued by the notify.Runtime
Expand All @@ -66,12 +65,24 @@ func Channels(n int) func(int) chan<- notify.EventInfo {
// TODO(rjeczalik): Merge/embed notify.EventInfo here?
type Call struct {
F FuncType
C chan<- notify.EventInfo
C chan notify.EventInfo
P string
E notify.Event // regular Event argument and old Event from a Rewatch call
N notify.Event // new Event argument from a Rewatch call
}

// Event TODO
type Event struct {
P string
E notify.Event
}

// Event implements notify.EventInfo interface.
func (e Event) Event() notify.Event { return e.E }
func (e Event) Name() string { return e.P }
func (e Event) IsDir() bool { return isDir(e.P) }
func (e Event) Sys() interface{} { return nil }

// Record TODO
type Record map[RuntimeType][]Call

Expand All @@ -83,10 +94,24 @@ type CallCase struct {
Record Record // intermediate calls recorded during above call
}

// Chans TODO
type Chans []<-chan notify.EventInfo

// EventCase TODO
type EventCase struct {
Event Event
Receiver Chans
}

type runtime struct {
Spy
runtime *notify.Runtime
spy Spy
n int
ch chan<- notify.EventInfo
}

func (rt *runtime) Dispatch(ch chan<- notify.EventInfo, _ <-chan struct{}) {
rt.ch = ch
}

func (rt *runtime) invoke(call Call) error {
Expand All @@ -113,13 +138,15 @@ func R(t *testing.T) *r {
r := &r{
t: t,
r: map[RuntimeType]*runtime{
Watcher: &runtime{n: 1},
Rewatcher: &runtime{n: 1},
Recursive: &runtime{n: 1},
Watcher: &runtime{},
Rewatcher: &runtime{},
Recursive: &runtime{},
},
}
for typ, rt := range r.r {
rt.runtime = notify.NewRuntimeWatcher(SpyWatcher(typ, &rt.spy), FS)
// TODO(rjeczalik): Copy FS to allow for modying tree via Create and
// Delete events.
rt.runtime = notify.NewRuntimeWatcher(SpyWatcher(typ, rt), FS)
}
return r
}
Expand All @@ -137,7 +164,7 @@ func (r *r) ExpectCalls(cases []CallCase) {
cas.Call.F, err, i, typ)
}
// Skip if expected and got records were empty.
got := rt.spy[rt.n:]
got := rt.Spy[rt.n:]
if len(cas.Record) == 0 && len(got) == 0 {
continue
}
Expand All @@ -156,38 +183,93 @@ func (r *r) ExpectCalls(cases []CallCase) {
r.t.Errorf("want recorded=%+v; got %+v (i=%d, typ=%v)",
record, got, i, typ)
}
rt.n = len(rt.spy)
rt.n = len(rt.Spy)
}
}
}

// ExpectCalls translates cases' keys into (*Runtime).Watch calls, records calls
// Runtime makes to a Watcher and compares them with the expected list.
func ExpectCalls(t *testing.T, cases []CallCase) {
R(t).ExpectCalls(cases)
var timeout = 50 * time.Millisecond

func str(ei notify.EventInfo) string {
if ei == nil {
return "<nil>"
}
return fmt.Sprintf("{Name()=%q, Event()=%v, IsDir()=%v}", ei.Name(), ei.Event(),
ei.IsDir())
}

func equal(want, got notify.EventInfo) error {
if (want == nil && got != nil) || (want != nil && got == nil) {
return fmt.Errorf("want EventInfo=%s; got %s", str(want), str(got))
}
if want.Name() != got.Name() || want.Event() != got.Event() || want.IsDir() != got.IsDir() {
return fmt.Errorf("want EventInfo=%s; got %s", str(want), str(got))
}
return nil
}

// ExpectEvents TODO
func (r *r) ExpectEvents(cases []EventCase) {
for i, cas := range cases {
for typ, rt := range r.r {
n := len(cas.Receiver)
done := make(chan struct{})
ev := make(map[<-chan notify.EventInfo]notify.EventInfo)
go func() {
cases := make([]reflect.SelectCase, n)
for i := range cases {
cases[i].Chan = reflect.ValueOf(cas.Receiver[i])
cases[i].Dir = reflect.SelectRecv
}
for n := len(cases); n > 0; n = len(cases) {
j, v, ok := reflect.Select(cases)
if !ok {
r.t.Fatalf("notify/test: unexpected chan close (i=%d, "+
"typ=%v, j=%d", i, typ, j)
}
ch := cases[j].Chan.Interface().(<-chan notify.EventInfo)
ev[ch] = v.Interface().(notify.EventInfo)
cases[j], cases = cases[n-1], cases[:n-1]
}
close(done)
}()
rt.ch <- cas.Event
select {
case <-done:
case <-time.After(timeout):
r.t.Fatalf("ExpectEvents has timed out after %v (i=%d, typ=%v)",
timeout, i, typ)
}
for _, got := range ev {
if err := equal(cas.Event, got); err != nil {
r.t.Errorf("want err=nil; got %v (i=%d, typ=%v)", err, i, typ)
}
}
}
}
}

// Spy is a mock for notify.Watcher interface, which records every call.
type Spy []Call

// SpyWatcher TODO
func SpyWatcher(typ RuntimeType, spy *Spy) notify.Watcher {
func SpyWatcher(typ RuntimeType, rt *runtime) notify.Watcher {
switch typ {
case Watcher:
return struct {
notify.Watcher
}{spy}
}{rt}
case Rewatcher:
return struct {
notify.Watcher
notify.Rewatcher
}{spy, spy}
}{rt, rt}
case Recursive:
return struct {
notify.Watcher
notify.Rewatcher
notify.RecursiveWatcher
}{spy, spy, spy}
}{rt, rt, rt}
}
panic(fmt.Sprintf("notify/test: unsupported runtime type: %d (%s)", typ, typ.String()))
}
Expand All @@ -204,11 +286,6 @@ func (s *Spy) Unwatch(p string) (err error) {
return
}

// Dispatch implements notify.Watcher interface.
func (s *Spy) Dispatch(chan<- notify.EventInfo, <-chan struct{}) {
*s = append(*s, Call{F: Dispatch})
}

// Rewatch implements notify.Rewatcher interface.
func (s *Spy) Rewatch(p string, old, new notify.Event) (err error) {
*s = append(*s, Call{F: Rewatch, P: p, E: old, N: new})
Expand Down

0 comments on commit 597770c

Please sign in to comment.