Skip to content

Commit

Permalink
Zxid no longer needed by watchers
Browse files Browse the repository at this point in the history
  • Loading branch information
samuel committed May 30, 2013
1 parent 00fe8c3 commit 0188e81
Showing 1 changed file with 38 additions and 47 deletions.
85 changes: 38 additions & 47 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,10 @@ var (
watcherTypeChild = watcherType(3)
)

type watcher struct {
ch chan Event
zxid int64
}

type watchers struct {
dataWatchers []*watcher
existWatchers []*watcher
childWatchers []*watcher
dataWatchers []chan Event
existWatchers []chan Event
childWatchers []chan Event
}

type Conn struct {
Expand Down Expand Up @@ -257,14 +252,14 @@ func (c *Conn) invalidateWatches(err error) {
if len(c.watchers) >= 0 {
for path, wat := range c.watchers {
ev := Event{Type: EventNotWatching, State: StateDisconnected, Path: path, Err: err}
for _, w := range wat.dataWatchers {
w.ch <- ev
for _, ch := range wat.dataWatchers {
ch <- ev
}
for _, w := range wat.childWatchers {
w.ch <- ev
for _, ch := range wat.childWatchers {
ch <- ev
}
for _, w := range wat.existWatchers {
w.ch <- ev
for _, ch := range wat.existWatchers {
ch <- ev
}
}
c.watchers = make(map[string]*watchers, 0)
Expand Down Expand Up @@ -364,6 +359,7 @@ func (c *Conn) authenticate() error {
if r.SessionId == 0 {
c.sessionId = 0
c.passwd = emptyPassword
c.lastZxid = 0
c.setState(StateExpired)
return ErrSessionExpired
}
Expand Down Expand Up @@ -441,21 +437,6 @@ func (c *Conn) sendLoop(conn net.Conn, closeChan <-chan bool) error {
panic("not reached")
}

func signalWatchers(watchers []*watcher, zxid int64, ev Event) []*watcher {
n := len(watchers)
for i := 0; i < n; {
w := watchers[i]
if w.zxid > 0 && w.zxid <= zxid {
watchers[i] = watchers[n-1]
n--
w.ch <- ev
} else {
i++
}
}
return watchers[:n]
}

func (c *Conn) recvLoop(conn net.Conn) error {
buf := make([]byte, bufferSize)
for {
Expand Down Expand Up @@ -501,15 +482,26 @@ func (c *Conn) recvLoop(conn net.Conn) error {
}
c.watchersLock.Lock()
if wat := c.watchers[res.Path]; wat != nil {
zxid := c.lastZxid
switch res.Type {
case EventNodeCreated:
wat.existWatchers = signalWatchers(wat.existWatchers, zxid, ev)
for _, ch := range wat.existWatchers {
ch <- ev
}
wat.existWatchers = wat.existWatchers[:0]
case EventNodeDeleted, EventNodeDataChanged:
wat.existWatchers = signalWatchers(wat.existWatchers, zxid, ev)
wat.dataWatchers = signalWatchers(wat.dataWatchers, zxid, ev)
for _, ch := range wat.existWatchers {
ch <- ev
}
wat.existWatchers = wat.existWatchers[:0]
for _, ch := range wat.dataWatchers {
ch <- ev
}
wat.dataWatchers = wat.dataWatchers[:0]
case EventNodeChildrenChanged:
wat.childWatchers = signalWatchers(wat.childWatchers, zxid, ev)
for _, ch := range wat.childWatchers {
ch <- ev
}
wat.childWatchers = wat.childWatchers[:0]
}
if len(wat.childWatchers)+len(wat.dataWatchers)+len(wat.existWatchers) == 0 {
delete(c.watchers, res.Path)
Expand Down Expand Up @@ -557,30 +549,29 @@ func (c *Conn) nextXid() int32 {
return atomic.AddInt32(&c.xid, 1)
}

func (c *Conn) addWatcher(path string, watcherType watcherType, zxid int64) <-chan Event {
func (c *Conn) addWatcher(path string, watcherType watcherType) <-chan Event {
c.watchersLock.Lock()
defer c.watchersLock.Unlock()

ch := make(chan Event, 1)
wat := c.watchers[path]
if wat == nil {
wat = &watchers{
dataWatchers: make([]*watcher, 0),
existWatchers: make([]*watcher, 0),
childWatchers: make([]*watcher, 0),
dataWatchers: make([]chan Event, 0),
existWatchers: make([]chan Event, 0),
childWatchers: make([]chan Event, 0),
}
c.watchers[path] = wat
}
w := &watcher{ch, zxid}
switch watcherType {
case watcherTypeChild:
wat.childWatchers = append(wat.childWatchers, w)
wat.childWatchers = append(wat.childWatchers, ch)
case watcherTypeData:
wat.dataWatchers = append(wat.dataWatchers, w)
wat.dataWatchers = append(wat.dataWatchers, ch)
case watcherTypeExist:
wat.existWatchers = append(wat.existWatchers, w)
wat.existWatchers = append(wat.existWatchers, ch)
}
return w.ch
return ch
}

func (c *Conn) queueRequest(opcode int32, req interface{}, res interface{}, recvFunc func(*request, *responseHeader, error)) <-chan response {
Expand Down Expand Up @@ -617,7 +608,7 @@ func (c *Conn) ChildrenW(path string) ([]string, *Stat, <-chan Event, error) {
res := &getChildren2Response{}
_, err := c.request(opGetChildren2, &getChildren2Request{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
if err == nil {
ech = c.addWatcher(path, watcherTypeChild, res.Zxid)
ech = c.addWatcher(path, watcherTypeChild)
}
})
if err != nil {
Expand All @@ -637,7 +628,7 @@ func (c *Conn) GetW(path string) ([]byte, *Stat, <-chan Event, error) {
res := &getDataResponse{}
_, err := c.request(opGetData, &getDataRequest{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
if err == nil {
ech = c.addWatcher(path, watcherTypeData, res.Zxid)
ech = c.addWatcher(path, watcherTypeData)
}
})
if err != nil {
Expand Down Expand Up @@ -723,9 +714,9 @@ func (c *Conn) ExistsW(path string) (bool, *Stat, <-chan Event, error) {
res := &existsResponse{}
_, err := c.request(opExists, &existsRequest{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
if err == nil {
ech = c.addWatcher(path, watcherTypeData, res.Zxid)
ech = c.addWatcher(path, watcherTypeData)
} else if err == ErrNoNode {
ech = c.addWatcher(path, watcherTypeExist, res.Zxid)
ech = c.addWatcher(path, watcherTypeExist)
}
})
exists := true
Expand Down

0 comments on commit 0188e81

Please sign in to comment.