Skip to content

Commit

Permalink
Fix ReceiveMessage to work without any subscriptions.
Browse files Browse the repository at this point in the history
  • Loading branch information
vmihailenco committed Feb 8, 2017
1 parent ba0b485 commit ce4fd8b
Show file tree
Hide file tree
Showing 18 changed files with 164 additions and 129 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ testdata/redis:
wget -qO- https://github.com/antirez/redis/archive/unstable.tar.gz | tar xvz --strip-components=1 -C $@

testdata/redis/src/redis-server: testdata/redis
sed -i 's/libjemalloc.a/libjemalloc.a -lrt/g' $</src/Makefile
cd $< && make all
2 changes: 1 addition & 1 deletion command.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func writeCmd(cn *pool.Conn, cmds ...Cmder) error {
}
}

_, err := cn.NetConn.Write(cn.Wb.Bytes())
_, err := cn.Write(cn.Wb.Bytes())
return err
}

Expand Down
62 changes: 42 additions & 20 deletions internal/pool/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,56 +2,78 @@ package pool

import (
"net"
"sync/atomic"
"time"

"gopkg.in/redis.v5/internal/proto"
)

const defaultBufSize = 4096

var noDeadline = time.Time{}

type Conn struct {
NetConn net.Conn
Rd *proto.Reader
Wb *proto.WriteBuffer
netConn net.Conn

Rd *proto.Reader
Wb *proto.WriteBuffer

Inited bool
UsedAt time.Time
usedAt atomic.Value
}

func NewConn(netConn net.Conn) *Conn {
buf := make([]byte, 4096)
cn := &Conn{
NetConn: netConn,
Wb: proto.NewWriteBuffer(),

UsedAt: time.Now(),
netConn: netConn,
Wb: proto.NewWriteBuffer(buf),
}
cn.Rd = proto.NewReader(cn.NetConn)
cn.Rd = proto.NewReader(cn.netConn, buf)
cn.SetUsedAt(time.Now())
return cn
}

func (cn *Conn) UsedAt() time.Time {
return cn.usedAt.Load().(time.Time)
}

func (cn *Conn) SetUsedAt(tm time.Time) {
cn.usedAt.Store(tm)
}

func (cn *Conn) SetNetConn(netConn net.Conn) {
cn.netConn = netConn
cn.Rd.Reset(netConn)
}

func (cn *Conn) IsStale(timeout time.Duration) bool {
return timeout > 0 && time.Since(cn.UsedAt) > timeout
return timeout > 0 && time.Since(cn.UsedAt()) > timeout
}

func (cn *Conn) SetReadTimeout(timeout time.Duration) error {
cn.UsedAt = time.Now()
now := time.Now()
cn.SetUsedAt(now)
if timeout > 0 {
return cn.NetConn.SetReadDeadline(cn.UsedAt.Add(timeout))
return cn.netConn.SetReadDeadline(now.Add(timeout))
}
return cn.NetConn.SetReadDeadline(noDeadline)

return cn.netConn.SetReadDeadline(noDeadline)
}

func (cn *Conn) SetWriteTimeout(timeout time.Duration) error {
cn.UsedAt = time.Now()
now := time.Now()
cn.SetUsedAt(now)
if timeout > 0 {
return cn.NetConn.SetWriteDeadline(cn.UsedAt.Add(timeout))
return cn.netConn.SetWriteDeadline(now.Add(timeout))
}
return cn.NetConn.SetWriteDeadline(noDeadline)
return cn.netConn.SetWriteDeadline(noDeadline)
}

func (cn *Conn) Write(b []byte) (int, error) {
return cn.netConn.Write(b)
}

func (cn *Conn) RemoteAddr() net.Addr {
return cn.netConn.RemoteAddr()
}

func (cn *Conn) Close() error {
return cn.NetConn.Close()
return cn.netConn.Close()
}
7 changes: 3 additions & 4 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type Pooler interface {
FreeLen() int
Stats() *Stats
Close() error
Closed() bool
}

type dialer func() (net.Conn, error)
Expand Down Expand Up @@ -132,7 +131,7 @@ func (p *ConnPool) popFree() *Conn {

// Get returns existed connection from the pool or creates a new one.
func (p *ConnPool) Get() (*Conn, bool, error) {
if p.Closed() {
if p.closed() {
return nil, false, ErrClosed
}

Expand Down Expand Up @@ -241,7 +240,7 @@ func (p *ConnPool) Stats() *Stats {
}
}

func (p *ConnPool) Closed() bool {
func (p *ConnPool) closed() bool {
return atomic.LoadInt32(&p._closed) == 1
}

Expand Down Expand Up @@ -318,7 +317,7 @@ func (p *ConnPool) reaper(frequency time.Duration) {
defer ticker.Stop()

for _ = range ticker.C {
if p.Closed() {
if p.closed() {
break
}
n, err := p.ReapStaleConns()
Expand Down
8 changes: 0 additions & 8 deletions internal/pool/pool_single.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ func NewSingleConnPool(cn *Conn) *SingleConnPool {
}
}

func (p *SingleConnPool) First() *Conn {
return p.cn
}

func (p *SingleConnPool) Get() (*Conn, bool, error) {
return p.cn, false, nil
}
Expand Down Expand Up @@ -49,7 +45,3 @@ func (p *SingleConnPool) Stats() *Stats {
func (p *SingleConnPool) Close() error {
return nil
}

func (p *SingleConnPool) Closed() bool {
return false
}
23 changes: 0 additions & 23 deletions internal/pool/pool_sticky.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,6 @@ func NewStickyConnPool(pool *ConnPool, reusable bool) *StickyConnPool {
}
}

func (p *StickyConnPool) First() *Conn {
p.mu.Lock()
cn := p.cn
p.mu.Unlock()
return cn
}

func (p *StickyConnPool) Get() (*Conn, bool, error) {
p.mu.Lock()
defer p.mu.Unlock()
Expand Down Expand Up @@ -62,9 +55,6 @@ func (p *StickyConnPool) Put(cn *Conn) error {
if p.closed {
return ErrClosed
}
if p.cn != cn {
panic("p.cn != cn")
}
return nil
}

Expand All @@ -81,12 +71,6 @@ func (p *StickyConnPool) Remove(cn *Conn, reason error) error {
if p.closed {
return nil
}
if p.cn == nil {
panic("p.cn == nil")
}
if cn != nil && p.cn != cn {
panic("p.cn != cn")
}
return p.removeUpstream(reason)
}

Expand Down Expand Up @@ -133,10 +117,3 @@ func (p *StickyConnPool) Close() error {
}
return err
}

func (p *StickyConnPool) Closed() bool {
p.mu.Lock()
closed := p.closed
p.mu.Unlock()
return closed
}
2 changes: 1 addition & 1 deletion internal/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ var _ = Describe("conns reaper", func() {
for i := 0; i < 3; i++ {
cn, _, err := connPool.Get()
Expect(err).NotTo(HaveOccurred())
cn.UsedAt = time.Now().Add(-2 * idleTimeout)
cn.SetUsedAt(time.Now().Add(-2 * idleTimeout))
conns = append(conns, cn)
idleConns = append(idleConns, cn)
}
Expand Down
29 changes: 19 additions & 10 deletions internal/proto/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ type Reader struct {
buf []byte
}

func NewReader(rd io.Reader) *Reader {
func NewReader(rd io.Reader, buf []byte) *Reader {
return &Reader{
src: bufio.NewReader(rd),
buf: make([]byte, 0, bufferSize),
buf: buf,
}
}

func (r *Reader) Reset(rd io.Reader) {
r.src.Reset(rd)
}

func (p *Reader) PeekBuffered() []byte {
if n := p.src.Buffered(); n != 0 {
b, _ := p.src.Peek(n)
Expand All @@ -42,7 +46,12 @@ func (p *Reader) PeekBuffered() []byte {
}

func (p *Reader) ReadN(n int) ([]byte, error) {
return readN(p.src, p.buf, n)
b, err := readN(p.src, p.buf, n)
if err != nil {
return nil, err
}
p.buf = b
return b, nil
}

func (p *Reader) ReadLine() ([]byte, error) {
Expand Down Expand Up @@ -72,11 +81,11 @@ func (p *Reader) ReadReply(m MultiBulkParse) (interface{}, error) {
case ErrorReply:
return nil, ParseErrorReply(line)
case StatusReply:
return parseStatusValue(line)
return parseStatusValue(line), nil
case IntReply:
return parseInt(line[1:], 10, 64)
case StringReply:
return p.readBytesValue(line)
return p.readTmpBytesValue(line)
case ArrayReply:
n, err := parseArrayLen(line)
if err != nil {
Expand Down Expand Up @@ -111,9 +120,9 @@ func (p *Reader) ReadTmpBytesReply() ([]byte, error) {
case ErrorReply:
return nil, ParseErrorReply(line)
case StringReply:
return p.readBytesValue(line)
return p.readTmpBytesValue(line)
case StatusReply:
return parseStatusValue(line)
return parseStatusValue(line), nil
default:
return nil, fmt.Errorf("redis: can't parse string reply: %.100q", line)
}
Expand Down Expand Up @@ -210,7 +219,7 @@ func (p *Reader) ReadScanReply() ([]string, uint64, error) {
return keys, cursor, err
}

func (p *Reader) readBytesValue(line []byte) ([]byte, error) {
func (p *Reader) readTmpBytesValue(line []byte) ([]byte, error) {
if isNilReply(line) {
return nil, internal.Nil
}
Expand Down Expand Up @@ -297,8 +306,8 @@ func ParseErrorReply(line []byte) error {
return internal.RedisError(string(line[1:]))
}

func parseStatusValue(line []byte) ([]byte, error) {
return line[1:], nil
func parseStatusValue(line []byte) []byte {
return line[1:]
}

func parseArrayLen(line []byte) (int64, error) {
Expand Down
14 changes: 7 additions & 7 deletions internal/proto/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,27 @@ import (
"strings"
"testing"

"gopkg.in/redis.v5/internal/proto"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"gopkg.in/redis.v5/internal/proto"
)

var _ = Describe("Reader", func() {

It("should read n bytes", func() {
data, err := proto.NewReader(strings.NewReader("ABCDEFGHIJKLMNO")).ReadN(10)
data, err := proto.NewReader(strings.NewReader("ABCDEFGHIJKLMNO"), nil).ReadN(10)
Expect(err).NotTo(HaveOccurred())
Expect(len(data)).To(Equal(10))
Expect(string(data)).To(Equal("ABCDEFGHIJ"))

data, err = proto.NewReader(strings.NewReader(strings.Repeat("x", 8192))).ReadN(6000)
data, err = proto.NewReader(strings.NewReader(strings.Repeat("x", 8192)), nil).ReadN(6000)
Expect(err).NotTo(HaveOccurred())
Expect(len(data)).To(Equal(6000))
})

It("should read lines", func() {
p := proto.NewReader(strings.NewReader("$5\r\nhello\r\n"))
p := proto.NewReader(strings.NewReader("$5\r\nhello\r\n"), nil)

data, err := p.ReadLine()
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -59,11 +59,11 @@ func BenchmarkReader_ParseReply_Slice(b *testing.B) {
}

func benchmarkParseReply(b *testing.B, reply string, m proto.MultiBulkParse, wanterr bool) {
buf := &bytes.Buffer{}
buf := new(bytes.Buffer)
for i := 0; i < b.N; i++ {
buf.WriteString(reply)
}
p := proto.NewReader(buf)
p := proto.NewReader(buf, nil)
b.ResetTimer()

for i := 0; i < b.N; i++ {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (

const bufferSize = 4096

type WriteBuffer struct{ b []byte }
type WriteBuffer struct {
b []byte
}

func NewWriteBuffer() *WriteBuffer {
func NewWriteBuffer(b []byte) *WriteBuffer {
return &WriteBuffer{
b: make([]byte, 0, bufferSize),
b: b,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ import (
"testing"
"time"

"gopkg.in/redis.v5/internal/proto"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"gopkg.in/redis.v5/internal/proto"
)

var _ = Describe("WriteBuffer", func() {
var buf *proto.WriteBuffer

BeforeEach(func() {
buf = proto.NewWriteBuffer()
buf = proto.NewWriteBuffer(nil)
})

It("should reset", func() {
Expand Down Expand Up @@ -53,7 +53,7 @@ var _ = Describe("WriteBuffer", func() {
})

func BenchmarkWriteBuffer_Append(b *testing.B) {
buf := proto.NewWriteBuffer()
buf := proto.NewWriteBuffer(nil)
args := []interface{}{"hello", "world", "foo", "bar"}

for i := 0; i < b.N; i++ {
Expand Down
Loading

0 comments on commit ce4fd8b

Please sign in to comment.