Skip to content

Commit

Permalink
Move logger to internal package.
Browse files Browse the repository at this point in the history
  • Loading branch information
vmihailenco committed Apr 9, 2016
1 parent d89a58a commit 31abb18
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 42 deletions.
9 changes: 5 additions & 4 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync/atomic"
"time"

"gopkg.in/redis.v4/internal"
"gopkg.in/redis.v4/internal/hashtag"
"gopkg.in/redis.v4/internal/pool"
)
Expand Down Expand Up @@ -273,13 +274,13 @@ func (c *ClusterClient) reloadSlots() {

client, err := c.randomClient()
if err != nil {
Logger.Printf("randomClient failed: %s", err)
internal.Logf("randomClient failed: %s", err)
return
}

slots, err := client.ClusterSlots().Result()
if err != nil {
Logger.Printf("ClusterSlots failed: %s", err)
internal.Logf("ClusterSlots failed: %s", err)
return
}
c.setSlots(slots)
Expand All @@ -306,14 +307,14 @@ func (c *ClusterClient) reaper(frequency time.Duration) {
for _, client := range c.getClients() {
nn, err := client.connPool.(*pool.ConnPool).ReapStaleConns()
if err != nil {
Logger.Printf("ReapStaleConns failed: %s", err)
internal.Logf("ReapStaleConns failed: %s", err)
} else {
n += nn
}
}

s := c.PoolStats()
Logger.Printf(
internal.Logf(
"reaper: removed %d stale conns (TotalConns=%d FreeConns=%d Requests=%d Hits=%d Timeouts=%d)",
n, s.TotalConns, s.FreeConns, s.Requests, s.Hits, s.Timeouts,
)
Expand Down
6 changes: 4 additions & 2 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"io"
"strconv"
"time"

"gopkg.in/redis.v4/internal"
)

func formatInt(i int64) string {
Expand Down Expand Up @@ -31,7 +33,7 @@ func usePrecise(dur time.Duration) bool {

func formatMs(dur time.Duration) string {
if dur > 0 && dur < time.Millisecond {
Logger.Printf(
internal.Logf(
"specified duration is %s, but minimal supported value is %s",
dur, time.Millisecond,
)
Expand All @@ -41,7 +43,7 @@ func formatMs(dur time.Duration) string {

func formatSec(dur time.Duration) string {
if dur > 0 && dur < time.Second {
Logger.Printf(
internal.Logf(
"specified duration is %s, but minimal supported value is %s",
dur, time.Second,
)
Expand Down
4 changes: 2 additions & 2 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ func (c *PubSub) Pool() pool.Pooler {
return c.base.connPool
}

func SetReceiveMessageTimeout(d time.Duration) {
receiveMessageTimeout = d
func (c *PubSub) ReceiveMessageTimeout(timeout time.Duration) (*Message, error) {
return c.receiveMessage(timeout)
}
22 changes: 22 additions & 0 deletions internal/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package internal

import (
"fmt"
"io/ioutil"
"log"
)

var Debug bool

var Logger = log.New(ioutil.Discard, "redis: ", log.LstdFlags)

func Debugf(s string, args ...interface{}) {
if !Debug {
return
}
Logger.Output(2, fmt.Sprintf(s, args...))
}

func Logf(s string, args ...interface{}) {
Logger.Output(2, fmt.Sprintf(s, args...))
}
12 changes: 5 additions & 7 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@ package pool
import (
"errors"
"fmt"
"io/ioutil"
"log"
"net"
"sync"
"sync/atomic"
"time"

"gopkg.in/bsm/ratelimit.v1"
)

var Logger = log.New(ioutil.Discard, "redis: ", log.LstdFlags)
"gopkg.in/redis.v4/internal"
)

var (
ErrClosed = errors.New("redis: client is closed")
Expand Down Expand Up @@ -210,7 +208,7 @@ func (p *ConnPool) Put(cn *Conn) error {
if cn.Rd.Buffered() != 0 {
b, _ := cn.Rd.Peek(cn.Rd.Buffered())
err := fmt.Errorf("connection has unread data: %q", b)
Logger.Print(err)
internal.Logf(err.Error())
return p.Remove(cn, err)
}
p.freeConnsMu.Lock()
Expand Down Expand Up @@ -342,11 +340,11 @@ func (p *ConnPool) reaper(frequency time.Duration) {
}
n, err := p.ReapStaleConns()
if err != nil {
Logger.Printf("ReapStaleConns failed: %s", err)
internal.Logf("ReapStaleConns failed: %s", err)
continue
}
s := p.Stats()
Logger.Printf(
internal.Logf(
"reaper: removed %d stale conns (TotalConns=%d FreeConns=%d Requests=%d Hits=%d Timeouts=%d)",
n, s.TotalConns, s.FreeConns, s.Requests, s.Hits, s.Timeouts,
)
Expand Down
15 changes: 9 additions & 6 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (
"net"
"time"

"gopkg.in/redis.v4/internal"
"gopkg.in/redis.v4/internal/pool"
)

var receiveMessageTimeout = 5 * time.Second

// Posts a message to the given channel.
func (c *Client) Publish(channel, message string) *IntCmd {
req := NewIntCmd("PUBLISH", channel, message)
Expand Down Expand Up @@ -241,9 +240,13 @@ func (c *PubSub) Receive() (interface{}, error) {
// messages. It automatically reconnects to Redis Server and resubscribes
// to channels in case of network errors.
func (c *PubSub) ReceiveMessage() (*Message, error) {
return c.receiveMessage(5 * time.Second)
}

func (c *PubSub) receiveMessage(timeout time.Duration) (*Message, error) {
var errNum uint
for {
msgi, err := c.ReceiveTimeout(receiveMessageTimeout)
msgi, err := c.ReceiveTimeout(timeout)
if err != nil {
if !isNetworkError(err) {
return nil, err
Expand All @@ -254,7 +257,7 @@ func (c *PubSub) ReceiveMessage() (*Message, error) {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
err := c.Ping("")
if err != nil {
Logger.Printf("PubSub.Ping failed: %s", err)
internal.Logf("PubSub.Ping failed: %s", err)
}
}
} else {
Expand Down Expand Up @@ -294,12 +297,12 @@ func (c *PubSub) resubscribe() {
}
if len(c.channels) > 0 {
if err := c.Subscribe(c.channels...); err != nil {
Logger.Printf("Subscribe failed: %s", err)
internal.Logf("Subscribe failed: %s", err)
}
}
if len(c.patterns) > 0 {
if err := c.PSubscribe(c.patterns...); err != nil {
Logger.Printf("PSubscribe failed: %s", err)
internal.Logf("PSubscribe failed: %s", err)
}
}
}
5 changes: 2 additions & 3 deletions pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,7 @@ var _ = Describe("PubSub", func() {
})

It("should ReceiveMessage after timeout", func() {
timeout := time.Second
redis.SetReceiveMessageTimeout(timeout)
timeout := 100 * time.Millisecond

pubsub, err := client.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
Expand All @@ -276,7 +275,7 @@ var _ = Describe("PubSub", func() {
Expect(n).To(Equal(int64(1)))
}()

msg, err := pubsub.ReceiveMessage()
msg, err := pubsub.ReceiveMessageTimeout(timeout)
Expect(err).NotTo(HaveOccurred())
Expect(msg.Channel).To(Equal("mychannel"))
Expect(msg.Payload).To(Equal("hello"))
Expand Down
8 changes: 3 additions & 5 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,16 @@ package redis // import "gopkg.in/redis.v4"

import (
"fmt"
"io/ioutil"
"log"

"gopkg.in/redis.v4/internal"
"gopkg.in/redis.v4/internal/pool"
)

// Deprecated. Use SetLogger instead.
var Logger = log.New(ioutil.Discard, "redis: ", log.LstdFlags)
var Logger *log.Logger

func SetLogger(logger *log.Logger) {
Logger = logger
pool.Logger = logger
internal.Logger = logger
}

type baseClient struct {
Expand Down
3 changes: 2 additions & 1 deletion ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"gopkg.in/redis.v4/internal"
"gopkg.in/redis.v4/internal/consistenthash"
"gopkg.in/redis.v4/internal/hashtag"
"gopkg.in/redis.v4/internal/pool"
Expand Down Expand Up @@ -204,7 +205,7 @@ func (ring *Ring) heartbeat() {
for _, shard := range ring.shards {
err := shard.Client.Ping().Err()
if shard.Vote(err == nil || err == pool.ErrPoolTimeout) {
Logger.Printf("ring shard state changed: %s", shard)
internal.Logf("ring shard state changed: %s", shard)
rebalance = true
}
}
Expand Down
23 changes: 12 additions & 11 deletions sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"gopkg.in/redis.v4/internal"
"gopkg.in/redis.v4/internal/pool"
)

Expand Down Expand Up @@ -165,11 +166,11 @@ func (d *sentinelFailover) MasterAddr() (string, error) {
if d.sentinel != nil {
addr, err := d.sentinel.GetMasterAddrByName(d.masterName).Result()
if err != nil {
Logger.Printf("sentinel: GetMasterAddrByName %q failed: %s", d.masterName, err)
internal.Logf("sentinel: GetMasterAddrByName %q failed: %s", d.masterName, err)
d._resetSentinel()
} else {
addr := net.JoinHostPort(addr[0], addr[1])
Logger.Printf("sentinel: %q addr is %s", d.masterName, addr)
internal.Logf("sentinel: %q addr is %s", d.masterName, addr)
return addr, nil
}
}
Expand All @@ -188,7 +189,7 @@ func (d *sentinelFailover) MasterAddr() (string, error) {
})
masterAddr, err := sentinel.GetMasterAddrByName(d.masterName).Result()
if err != nil {
Logger.Printf("sentinel: GetMasterAddrByName %q failed: %s", d.masterName, err)
internal.Logf("sentinel: GetMasterAddrByName %q failed: %s", d.masterName, err)
sentinel.Close()
continue
}
Expand All @@ -198,7 +199,7 @@ func (d *sentinelFailover) MasterAddr() (string, error) {

d.setSentinel(sentinel)
addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
Logger.Printf("sentinel: %q addr is %s", d.masterName, addr)
internal.Logf("sentinel: %q addr is %s", d.masterName, addr)
return addr, nil
}

Expand Down Expand Up @@ -230,7 +231,7 @@ func (d *sentinelFailover) _resetSentinel() error {
func (d *sentinelFailover) discoverSentinels(sentinel *sentinelClient) {
sentinels, err := sentinel.Sentinels(d.masterName).Result()
if err != nil {
Logger.Printf("sentinel: Sentinels %q failed: %s", d.masterName, err)
internal.Logf("sentinel: Sentinels %q failed: %s", d.masterName, err)
return
}
for _, sentinel := range sentinels {
Expand All @@ -240,7 +241,7 @@ func (d *sentinelFailover) discoverSentinels(sentinel *sentinelClient) {
if key == "name" {
sentinelAddr := vals[i+1].(string)
if !contains(d.sentinelAddrs, sentinelAddr) {
Logger.Printf(
internal.Logf(
"sentinel: discovered new %q sentinel: %s",
d.masterName, sentinelAddr,
)
Expand Down Expand Up @@ -268,7 +269,7 @@ func (d *sentinelFailover) closeOldConns(newMaster string) {
"sentinel: closing connection to the old master %s",
cn.RemoteAddr(),
)
Logger.Print(err)
internal.Logf(err.Error())
d.pool.Remove(cn, err)
} else {
cnsToPut = append(cnsToPut, cn)
Expand All @@ -286,15 +287,15 @@ func (d *sentinelFailover) listen(sentinel *sentinelClient) {
if pubsub == nil {
pubsub = sentinel.PubSub()
if err := pubsub.Subscribe("+switch-master"); err != nil {
Logger.Printf("sentinel: Subscribe failed: %s", err)
internal.Logf("sentinel: Subscribe failed: %s", err)
d.resetSentinel()
return
}
}

msg, err := pubsub.ReceiveMessage()
if err != nil {
Logger.Printf("sentinel: ReceiveMessage failed: %s", err)
internal.Logf("sentinel: ReceiveMessage failed: %s", err)
pubsub.Close()
d.resetSentinel()
return
Expand All @@ -304,12 +305,12 @@ func (d *sentinelFailover) listen(sentinel *sentinelClient) {
case "+switch-master":
parts := strings.Split(msg.Payload, " ")
if parts[0] != d.masterName {
Logger.Printf("sentinel: ignore new %s addr", parts[0])
internal.Logf("sentinel: ignore new %s addr", parts[0])
continue
}

addr := net.JoinHostPort(parts[3], parts[4])
Logger.Printf(
internal.Logf(
"sentinel: new %q addr is %s",
d.masterName, addr,
)
Expand Down
3 changes: 2 additions & 1 deletion tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"

"gopkg.in/redis.v4/internal"
"gopkg.in/redis.v4/internal/pool"
)

Expand Down Expand Up @@ -58,7 +59,7 @@ func (tx *Tx) process(cmd Cmder) {
func (tx *Tx) Close() error {
tx.closed = true
if err := tx.Unwatch().Err(); err != nil {
Logger.Printf("Unwatch failed: %s", err)
internal.Logf("Unwatch failed: %s", err)
}
return tx.base.Close()
}
Expand Down

0 comments on commit 31abb18

Please sign in to comment.