Skip to content

Commit

Permalink
Implement Close
Browse files Browse the repository at this point in the history
  • Loading branch information
samuel committed Dec 13, 2012
1 parent 14e9108 commit f2d0c37
Showing 1 changed file with 42 additions and 13 deletions.
55 changes: 42 additions & 13 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Conn struct {
conn net.Conn
state State
eventChan chan Event
shouldQuit chan bool
pingInterval time.Duration
recvTimeout time.Duration
connectTimeout time.Duration
Expand Down Expand Up @@ -78,6 +79,7 @@ func Connect(servers []string, recvTimeout time.Duration) (*Conn, <-chan Event,
conn: nil,
state: StateDisconnected,
eventChan: ec,
shouldQuit: make(chan bool),
recvTimeout: recvTimeout,
pingInterval: 10 * time.Second,
connectTimeout: 1 * time.Second,
Expand All @@ -92,11 +94,8 @@ func Connect(servers []string, recvTimeout time.Duration) (*Conn, <-chan Event,
}

func (c *Conn) Close() {
// TODO

// if c.conn != nil {
// c.conn.Close()
// }
close(c.shouldQuit)
c.disconnect()
}

func (c *Conn) connect() {
Expand All @@ -120,25 +119,43 @@ func (c *Conn) connect() {
}
}

func (c *Conn) disconnect() {
c.request(-1, nil, nil)
}

func (c *Conn) loop() {
for {
c.connect()
err := c.authenticate()
if err == nil {
closeChan := make(chan bool)
go c.sendLoop(c.conn, closeChan)
err = c.recvLoop(c.conn)
if err == nil {
panic("zk: recvLoop should never return nil error")
}
close(closeChan)
sendDone := make(chan bool, 1)
go func() {
c.sendLoop(c.conn, closeChan)
c.conn.Close()
close(sendDone)
}()

recvDone := make(chan bool, 1)
go func() {
err = c.recvLoop(c.conn)
if err == nil {
panic("zk: recvLoop should never return nil error")
}
close(closeChan)
<-sendDone // wait for send loop to exit
close(recvDone)
}()

<-recvDone
}
c.conn.Close()

c.state = StateDisconnected
c.eventChan <- Event{EventSession, c.state, ""}

log.Println(err)
if !strings.Contains(err.Error(), "use of closed network connection") {
log.Println(err)
}

c.requestsLock.Lock()
// Error out any pending requests
Expand All @@ -147,6 +164,12 @@ func (c *Conn) loop() {
}
c.requests = make(map[int32]*request)
c.requestsLock.Unlock()

select {
case <-c.shouldQuit:
return
default:
}
}
}

Expand Down Expand Up @@ -223,6 +246,12 @@ func (c *Conn) sendLoop(conn net.Conn, closeChan <-chan bool) error {
for {
select {
case req := <-c.sendChan:
if req.opcode < 0 {
// Asked to quit
req.recvChan <- nil
return nil
}

header := &requestHeader{req.xid, req.opcode}
n, err := encodePacket(buf[4:], header)
if err != nil {
Expand Down

0 comments on commit f2d0c37

Please sign in to comment.