Skip to content

Commit

Permalink
re-added balancing test
Browse files Browse the repository at this point in the history
  • Loading branch information
David Ormsbee committed Dec 8, 2011
1 parent 1c127f4 commit 22df339
Showing 1 changed file with 34 additions and 34 deletions.
68 changes: 34 additions & 34 deletions brod/test/test_zk.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,40 +57,40 @@

class TestZK(TestCase):

# def test_001_consumers_manual_rebalancing(self):
# """Test that basic consumer rebalancing logic works..."""
# for kafka_config in self.kafka_configs:
# k = Kafka("localhost", kafka_config.port)
# for topic in ["t1", "t2", "t3"]:
# k.produce(topic, ["bootstrap"], 0)
# time.sleep(1)
#
# producer = ZKProducer(ZK_CONNECT_STR, "t1")
# self.assertEquals(len(producer.broker_partitions),
# TOTAL_NUM_PARTITIONS,
# "We should be sending to all broker_partitions.")
#
# c1 = ZKConsumer(ZK_CONNECT_STR, "group1", "t1")
# self.assertEquals(len(c1.broker_partitions),
# TOTAL_NUM_PARTITIONS,
# "Only one consumer, it should have all partitions.")
# c2 = ZKConsumer(ZK_CONNECT_STR, "group1", "t1")
# self.assertEquals(len(c2.broker_partitions),
# (TOTAL_NUM_PARTITIONS) / 2)
# c1.rebalance()
# self.assertEquals(len(set(c1.broker_partitions + c2.broker_partitions)),
# TOTAL_NUM_PARTITIONS,
# "We should have all broker partitions covered.")
#
# c3 = ZKConsumer(ZK_CONNECT_STR, "group1", "t1")
# self.assertEquals(len(c3.broker_partitions),
# (TOTAL_NUM_PARTITIONS) / 3)
# c1.rebalance()
# c2.rebalance()
# self.assertEquals(len(set(c1.broker_partitions + c2.broker_partitions +
# c3.broker_partitions)),
# TOTAL_NUM_PARTITIONS,
# "We should have all broker partitions covered with no overlap.")
def test_001_consumers_manual_rebalancing(self):
"""Test that basic consumer rebalancing logic works..."""
for kafka_config in self.kafka_configs:
k = Kafka("localhost", kafka_config.port)
for topic in ["t1", "t2", "t3"]:
k.produce(topic, ["bootstrap"], 0)
time.sleep(1)

producer = ZKProducer(ZK_CONNECT_STR, "t1")
self.assertEquals(len(producer.broker_partitions),
TOTAL_NUM_PARTITIONS,
"We should be sending to all broker_partitions.")

c1 = ZKConsumer(ZK_CONNECT_STR, "group1", "t1")
self.assertEquals(len(c1.broker_partitions),
TOTAL_NUM_PARTITIONS,
"Only one consumer, it should have all partitions.")
c2 = ZKConsumer(ZK_CONNECT_STR, "group1", "t1")
self.assertEquals(len(c2.broker_partitions),
(TOTAL_NUM_PARTITIONS) / 2)
c1.rebalance()
self.assertEquals(len(set(c1.broker_partitions + c2.broker_partitions)),
TOTAL_NUM_PARTITIONS,
"We should have all broker partitions covered.")

c3 = ZKConsumer(ZK_CONNECT_STR, "group1", "t1")
self.assertEquals(len(c3.broker_partitions),
(TOTAL_NUM_PARTITIONS) / 3)
c1.rebalance()
c2.rebalance()
self.assertEquals(len(set(c1.broker_partitions + c2.broker_partitions +
c3.broker_partitions)),
TOTAL_NUM_PARTITIONS,
"We should have all broker partitions covered with no overlap.")

def test_002_consumers(self):
c1 = ZKConsumer(ZK_CONNECT_STR, "group002", "topic002")
Expand Down

0 comments on commit 22df339

Please sign in to comment.