Skip to content

Commit

Permalink
Merge pull request redis#594 from go-redis/feature/cluster-pubsub
Browse files Browse the repository at this point in the history
Add PubSub support to Cluster client
  • Loading branch information
vmihailenco authored Jul 11, 2017
2 parents 564772f + 3ddda73 commit da63fe7
Show file tree
Hide file tree
Showing 11 changed files with 519 additions and 243 deletions.
319 changes: 254 additions & 65 deletions cluster.go

Large diffs are not rendered by default.

271 changes: 163 additions & 108 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ func startCluster(scenario *clusterScenario) error {
scenario.nodeIds[pos] = info[:40]
}

// Meet cluster nodes
// Meet cluster nodes.
for _, client := range scenario.clients {
err := client.ClusterMeet("127.0.0.1", scenario.ports[0]).Err()
if err != nil {
return err
}
}

// Bootstrap masters
// Bootstrap masters.
slots := []int{0, 5000, 10000, 16384}
for pos, master := range scenario.masters() {
err := master.ClusterAddSlotsRange(slots[pos], slots[pos+1]-1).Err()
Expand All @@ -92,7 +92,7 @@ func startCluster(scenario *clusterScenario) error {
}
}

// Bootstrap slaves
// Bootstrap slaves.
for idx, slave := range scenario.slaves() {
masterId := scenario.nodeIds[idx]

Expand All @@ -115,7 +115,7 @@ func startCluster(scenario *clusterScenario) error {
}
}

// Wait until all nodes have consistent info
// Wait until all nodes have consistent info.
for _, client := range scenario.clients {
err := eventually(func() error {
res, err := client.ClusterSlots().Result()
Expand Down Expand Up @@ -189,62 +189,6 @@ var _ = Describe("ClusterClient", func() {
var client *redis.ClusterClient

assertClusterClient := func() {
It("should CLUSTER SLOTS", func() {
res, err := client.ClusterSlots().Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(HaveLen(3))

wanted := []redis.ClusterSlot{
{0, 4999, []redis.ClusterNode{{"", "127.0.0.1:8220"}, {"", "127.0.0.1:8223"}}},
{5000, 9999, []redis.ClusterNode{{"", "127.0.0.1:8221"}, {"", "127.0.0.1:8224"}}},
{10000, 16383, []redis.ClusterNode{{"", "127.0.0.1:8222"}, {"", "127.0.0.1:8225"}}},
}
Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred())
})

It("should CLUSTER NODES", func() {
res, err := client.ClusterNodes().Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(res)).To(BeNumerically(">", 400))
})

It("should CLUSTER INFO", func() {
res, err := client.ClusterInfo().Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(ContainSubstring("cluster_known_nodes:6"))
})

It("should CLUSTER KEYSLOT", func() {
hashSlot, err := client.ClusterKeySlot("somekey").Result()
Expect(err).NotTo(HaveOccurred())
Expect(hashSlot).To(Equal(int64(hashtag.Slot("somekey"))))
})

It("should CLUSTER COUNT-FAILURE-REPORTS", func() {
n, err := client.ClusterCountFailureReports(cluster.nodeIds[0]).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(0)))
})

It("should CLUSTER COUNTKEYSINSLOT", func() {
n, err := client.ClusterCountKeysInSlot(10).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(0)))
})

It("should CLUSTER SAVECONFIG", func() {
res, err := client.ClusterSaveConfig().Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal("OK"))
})

It("should CLUSTER SLAVES", func() {
nodesList, err := client.ClusterSlaves(cluster.nodeIds[0]).Result()
Expect(err).NotTo(HaveOccurred())
Expect(nodesList).Should(ContainElement(ContainSubstring("slave")))
Expect(nodesList).Should(HaveLen(1))
})

It("should GET/SET/DEL", func() {
val, err := client.Get("A").Result()
Expect(err).To(Equal(redis.Nil))
Expand All @@ -254,55 +198,24 @@ var _ = Describe("ClusterClient", func() {
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal("OK"))

val, err = client.Get("A").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal("VALUE"))
Eventually(func() string {
return client.Get("A").Val()
}).Should(Equal("VALUE"))

cnt, err := client.Del("A").Result()
Expect(err).NotTo(HaveOccurred())
Expect(cnt).To(Equal(int64(1)))
})

It("returns pool stats", func() {
Expect(client.PoolStats()).To(BeAssignableToTypeOf(&redis.PoolStats{}))
})

It("removes idle connections", func() {
stats := client.PoolStats()
Expect(stats.TotalConns).NotTo(BeZero())
Expect(stats.FreeConns).NotTo(BeZero())

time.Sleep(2 * time.Second)

stats = client.PoolStats()
Expect(stats.TotalConns).To(BeZero())
Expect(stats.FreeConns).To(BeZero())
})

It("follows redirects", func() {
Expect(client.Set("A", "VALUE", 0).Err()).NotTo(HaveOccurred())

slot := hashtag.Slot("A")
Expect(client.SwapSlotNodes(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))

val, err := client.Get("A").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal("VALUE"))
})

It("returns an error when there are no attempts left", func() {
opt := redisClusterOptions()
opt.MaxRedirects = -1
client := cluster.clusterClient(opt)

slot := hashtag.Slot("A")
Expect(client.SwapSlotNodes(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
client.SwapSlotNodes(slot)

err := client.Get("A").Err()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("MOVED"))

Expect(client.Close()).NotTo(HaveOccurred())
Eventually(func() string {
return client.Get("A").Val()
}).Should(Equal("VALUE"))
})

It("distributes keys", func() {
Expand All @@ -311,9 +224,14 @@ var _ = Describe("ClusterClient", func() {
Expect(err).NotTo(HaveOccurred())
}

wanted := []string{"keys=31", "keys=29", "keys=40"}
for i, master := range cluster.masters() {
Expect(master.Info().Val()).To(ContainSubstring(wanted[i]))
for _, master := range cluster.masters() {
Eventually(func() string {
return master.Info("keyspace").Val()
}, 5*time.Second).Should(Or(
ContainSubstring("keys=31"),
ContainSubstring("keys=29"),
ContainSubstring("keys=40"),
))
}
})

Expand All @@ -330,9 +248,14 @@ var _ = Describe("ClusterClient", func() {
Expect(err).NotTo(HaveOccurred())
}

wanted := []string{"keys=31", "keys=29", "keys=40"}
for i, master := range cluster.masters() {
Expect(master.Info().Val()).To(ContainSubstring(wanted[i]))
for _, master := range cluster.masters() {
Eventually(func() string {
return master.Info("keyspace").Val()
}, 5*time.Second).Should(Or(
ContainSubstring("keys=31"),
ContainSubstring("keys=29"),
ContainSubstring("keys=40"),
))
}
})

Expand Down Expand Up @@ -447,7 +370,7 @@ var _ = Describe("ClusterClient", func() {
})
}

Describe("Pipeline", func() {
Describe("with Pipeline", func() {
BeforeEach(func() {
pipe = client.Pipeline().(*redis.Pipeline)
})
Expand All @@ -459,7 +382,7 @@ var _ = Describe("ClusterClient", func() {
assertPipeline()
})

Describe("TxPipeline", func() {
Describe("with TxPipeline", func() {
BeforeEach(func() {
pipe = client.TxPipeline().(*redis.Pipeline)
})
Expand All @@ -472,6 +395,76 @@ var _ = Describe("ClusterClient", func() {
})
})

It("supports PubSub", func() {
pubsub := client.Subscribe("mychannel")
defer pubsub.Close()

Eventually(func() error {
_, err := client.Publish("mychannel", "hello").Result()
if err != nil {
return err
}

msg, err := pubsub.ReceiveTimeout(time.Second)
if err != nil {
return err
}

_, ok := msg.(*redis.Message)
if !ok {
return fmt.Errorf("got %T, wanted *redis.Message", msg)
}

return nil
}, 30*time.Second).ShouldNot(HaveOccurred())
})
}

Describe("ClusterClient", func() {
BeforeEach(func() {
opt = redisClusterOptions()
client = cluster.clusterClient(opt)

_ = client.ForEachMaster(func(master *redis.Client) error {
return master.FlushDB().Err()
})
})

AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
})

It("returns pool stats", func() {
Expect(client.PoolStats()).To(BeAssignableToTypeOf(&redis.PoolStats{}))
})

It("removes idle connections", func() {
stats := client.PoolStats()
Expect(stats.TotalConns).NotTo(BeZero())
Expect(stats.FreeConns).NotTo(BeZero())

time.Sleep(2 * time.Second)

stats = client.PoolStats()
Expect(stats.TotalConns).To(BeZero())
Expect(stats.FreeConns).To(BeZero())
})

It("returns an error when there are no attempts left", func() {
opt := redisClusterOptions()
opt.MaxRedirects = -1
client := cluster.clusterClient(opt)

slot := hashtag.Slot("A")
client.SwapSlotNodes(slot)

err := client.Get("A").Err()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("MOVED"))

Expect(client.Close()).NotTo(HaveOccurred())
})

It("calls fn for every master node", func() {
for i := 0; i < 10; i++ {
Expect(client.Set(strconv.Itoa(i), "", 0).Err()).NotTo(HaveOccurred())
Expand All @@ -488,16 +481,78 @@ var _ = Describe("ClusterClient", func() {
Expect(keys).To(HaveLen(0))
}
})
}

Describe("default ClusterClient", func() {
It("should CLUSTER SLOTS", func() {
res, err := client.ClusterSlots().Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(HaveLen(3))

wanted := []redis.ClusterSlot{
{0, 4999, []redis.ClusterNode{{"", "127.0.0.1:8220"}, {"", "127.0.0.1:8223"}}},
{5000, 9999, []redis.ClusterNode{{"", "127.0.0.1:8221"}, {"", "127.0.0.1:8224"}}},
{10000, 16383, []redis.ClusterNode{{"", "127.0.0.1:8222"}, {"", "127.0.0.1:8225"}}},
}
Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred())
})

It("should CLUSTER NODES", func() {
res, err := client.ClusterNodes().Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(res)).To(BeNumerically(">", 400))
})

It("should CLUSTER INFO", func() {
res, err := client.ClusterInfo().Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(ContainSubstring("cluster_known_nodes:6"))
})

It("should CLUSTER KEYSLOT", func() {
hashSlot, err := client.ClusterKeySlot("somekey").Result()
Expect(err).NotTo(HaveOccurred())
Expect(hashSlot).To(Equal(int64(hashtag.Slot("somekey"))))
})

It("should CLUSTER COUNT-FAILURE-REPORTS", func() {
n, err := client.ClusterCountFailureReports(cluster.nodeIds[0]).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(0)))
})

It("should CLUSTER COUNTKEYSINSLOT", func() {
n, err := client.ClusterCountKeysInSlot(10).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(0)))
})

It("should CLUSTER SAVECONFIG", func() {
res, err := client.ClusterSaveConfig().Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal("OK"))
})

It("should CLUSTER SLAVES", func() {
nodesList, err := client.ClusterSlaves(cluster.nodeIds[0]).Result()
Expect(err).NotTo(HaveOccurred())
Expect(nodesList).Should(ContainElement(ContainSubstring("slave")))
Expect(nodesList).Should(HaveLen(1))
})

assertClusterClient()
})

Describe("ClusterClient failover", func() {
BeforeEach(func() {
opt = redisClusterOptions()
client = cluster.clusterClient(opt)

_ = client.ForEachMaster(func(master *redis.Client) error {
return master.FlushDB().Err()
})

_ = client.ForEachSlave(func(slave *redis.Client) error {
return slave.ClusterFailover().Err()
})
})

AfterEach(func() {
Expand Down
7 changes: 4 additions & 3 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ func (c *ClusterClient) SlotAddrs(slot int) []string {
}

// SwapSlot swaps a slot's master/slave address for testing MOVED redirects.
func (c *ClusterClient) SwapSlotNodes(slot int) []string {
func (c *ClusterClient) SwapSlotNodes(slot int) {
nodes := c.state().slots[slot]
nodes[0], nodes[1] = nodes[1], nodes[0]
return c.SlotAddrs(slot)
if len(nodes) == 2 {
nodes[0], nodes[1] = nodes[1], nodes[0]
}
}
Loading

0 comments on commit da63fe7

Please sign in to comment.