Skip to content

Commit

Permalink
allow setup/teardown of different broker configurations
Browse files Browse the repository at this point in the history
  • Loading branch information
David Ormsbee committed Jan 2, 2012
1 parent 481aee3 commit b9d477b
Showing 1 changed file with 99 additions and 85 deletions.
184 changes: 99 additions & 85 deletions brod/test/test_zk.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@
4. Kafka will use the JMX ports from 10000 and up, depending on the number of
instances. This is not something we can set in the config file -- we have
to pass it in the form of an environment var JMX_PORT.
5. You have env installed (i.e. you're running on a UNIX machine)
6. You have a /tmp directory
5. You have a /tmp directory
What you need to run the tests:
Note that the test order matters. Bringing ZooKeeper and the Kafka brokers up
takes around 6 seconds or so, so we should try to avoid it. In general, the
tests should be self contained and try to return things to the state they were
if possible.
takes a few seconds each time, so we try to avoid it when possible. The tests
are run in the order that they appear in the module, and at any time, we can
change the Kafka server topology by prefixing our test with:
@with_setup(setup_servers(num_brokers, partitions_per_broker))
def test_005_something():
...
We'll probably have to allow the world to be reset eventually.
"""
import logging
import os
Expand All @@ -32,6 +36,7 @@
import time
from collections import namedtuple
from datetime import datetime
from functools import partial
from itertools import chain
from subprocess import Popen
from unittest import TestCase
Expand All @@ -56,70 +61,85 @@
NUM_PARTITIONS = 5 # How many partitions per topic per broker
TOTAL_NUM_PARTITIONS = NUM_BROKERS * NUM_PARTITIONS

log = logging.getLogger("brod.test_zk")
log = logging.getLogger("brod")

######################### Module Level Vars for Runs ###########################
# These get reset every time that teardown/setup() is run. If your test requires
# a clean state, it should use the @with_setup() decorator. Otherwise, we leave
# the instances up to make tests run faster.

kafka_configs = None
kafka_processes = None
run_dir = None
zk_config = None
zk_process = None

def setup():
global kafka_configs, kafka_processes, run_dir, zk_config, zk_process

# For those tests that ask for new server instances -- kill the old one.
if run_exists():
teardown()

timestamp = datetime.now().strftime('%Y-%m-%d-%H_%M_%s_%f')
run_dir = os.path.join("/tmp", "brod_zk_test", timestamp)
os.makedirs(run_dir)
log.info("ZooKeeper and Kafka data in {0}".format(run_dir))

# Set up configuration and data directories for ZK and Kafka
zk_config = setup_zookeeper()
kafka_configs = setup_kafka(NUM_BROKERS, NUM_PARTITIONS)

# Start ZooKeeper...
log.info("Starting ZooKeeper with config {0}".format(zk_config))
zk_process = Popen(["kafka-run-class.sh",
"org.apache.zookeeper.server.quorum.QuorumPeerMain",
zk_config.config_file],
stdout=open(run_dir + "/zookeeper.log", "wb"),
stderr=open(run_dir + "/zookeeper_error.log", "wb"),
shell=False,
preexec_fn=os.setsid)
# Give ZK a little time to finish starting up before we start spawning
# Kafka instances to connect to it.
time.sleep(2)

# Start Kafka. We use kafka-run-class.sh instead of
# kafka-server-start.sh because the latter sets the JMX_PORT to 9999
# and we want to set it differently for each Kafka instance
kafka_processes = []
for kafka_config in kafka_configs:
env = os.environ.copy()
env["JMX_PORT"] = str(kafka_config.jmx_port)
log.info("Starting Kafka with config {0}".format(kafka_config))
run_log = "kafka_{0}.log".format(kafka_config.broker_id)
run_errs = "kafka_error_{0}.log".format(kafka_config.broker_id)
process = Popen(["kafka-run-class.sh",
"kafka.Kafka",
kafka_config.config_file],
stdout=open("{0}/{1}".format(run_dir, run_log), "wb"),
stderr=open("{0}/{1}".format(run_dir, run_errs), "wb"),
shell=False,
preexec_fn=os.setsid,
env=env)
kafka_processes.append(process)
class RunConfig(object):
kafka_configs = None
kafka_processes = None
run_dir = None
zk_config = None
zk_process = None

@classmethod
def clear(cls):
cls.kafka_configs = cls.kafka_processes = cls.run_dir = cls.zk_config \
= cls.zk_process = None
@classmethod
def is_running(cls):
return any([cls.kafka_configs, cls.kafka_processes, cls.run_dir,
cls.zk_config, cls.zk_process])


def setup_servers(num_brokers, partitions_per_broker):
def run_setup():
# For those tests that ask for new server instances -- kill the old one.
if RunConfig.is_running():
teardown()

timestamp = datetime.now().strftime('%Y-%m-%d-%H_%M_%s_%f')
RunConfig.run_dir = os.path.join("/tmp", "brod_zk_test", timestamp)
os.makedirs(RunConfig.run_dir)
log.info("SETUP: {0} brokers, {1} partitions per partition."
.format(num_brokers, partitions_per_broker))
log.info("SETUP: ZooKeeper and Kafka data in {0}".format(RunConfig.run_dir))

# Set up configuration and data directories for ZK and Kafka
RunConfig.zk_config = setup_zookeeper()
RunConfig.kafka_configs = setup_kafka(num_brokers, partitions_per_broker)

# Start ZooKeeper...
log.info("SETUP: Starting ZooKeeper with config {0}"
.format(RunConfig.zk_config))
RunConfig.zk_process = Popen(["kafka-run-class.sh",
"org.apache.zookeeper.server.quorum.QuorumPeerMain",
RunConfig.zk_config.config_file],
stdout=open(RunConfig.run_dir + "/zookeeper.log", "wb"),
stderr=open(RunConfig.run_dir + "/zookeeper_error.log", "wb"),
shell=False,
preexec_fn=os.setsid)
# Give ZK a little time to finish starting up before we start spawning
# Kafka instances to connect to it.
time.sleep(2)

# Start Kafka. We use kafka-run-class.sh instead of
# kafka-server-start.sh because the latter sets the JMX_PORT to 9999
# and we want to set it differently for each Kafka instance
RunConfig.kafka_processes = []
for kafka_config in RunConfig.kafka_configs:
env = os.environ.copy()
env["JMX_PORT"] = str(kafka_config.jmx_port)
log.info("SETUP: Starting Kafka with config {0}".format(kafka_config))
run_log = "kafka_{0}.log".format(kafka_config.broker_id)
run_errs = "kafka_error_{0}.log".format(kafka_config.broker_id)
process = Popen(["kafka-run-class.sh",
"kafka.Kafka",
kafka_config.config_file],
stdout=open("{0}/{1}".format(RunConfig.run_dir, run_log), "wb"),
stderr=open("{0}/{1}".format(RunConfig.run_dir, run_errs), "wb"),
shell=False,
preexec_fn=os.setsid,
env=env)
RunConfig.kafka_processes.append(process)

# Now give the Kafka instances a little time to spin up...
time.sleep(2)

# Now give the Kafka instances a little time to spin up...
time.sleep(2)
return run_setup

def setup_zookeeper():
# Create all the directories we need...
Expand Down Expand Up @@ -153,23 +173,24 @@ def setup_kafka(num_instances, num_partitions):

return configs

def tearDown():
def teardown():
# Have to kill Kafka before ZooKeeper, or Kafka will get very distraught
# You can't kill the processes with Popen.terminate() because what we
# call is just a shell script that spawns off a Java process. But since
# we did that bit with preexec_fn=os.setsid when we created them, we can
# kill the entire process group with os.killpg
if not run_exists():
if not RunConfig.is_running():
return

for process in kafka_processes:
log.info("Terminating Kafka process {0}".format(process))
for process in RunConfig.kafka_processes:
log.info("TEARDOWN: Terminating Kafka process {0}".format(process))
os.killpg(process.pid, signal.SIGTERM)

log.info("Terminating ZooKeeper process {0}".format(zk_process))
os.killpg(zk_process.pid, signal.SIGTERM)
log.info("TEARDOWN: Terminating ZooKeeper process {0}"
.format(RunConfig.zk_process))
os.killpg(RunConfig.zk_process.pid, signal.SIGTERM)
time.sleep(1)
reset_run_vars()
RunConfig.clear()

def write_config(template_name, finished_location, format_obj):
with open(template(template_name)) as template_file:
Expand All @@ -179,7 +200,7 @@ def write_config(template_name, finished_location, format_obj):
finished_file.write(config_text)

def create_run_dirs(*dirs):
paths = [os.path.join(run_dir, d) for d in dirs]
paths = [os.path.join(RunConfig.run_dir, d) for d in dirs]
for path in paths:
os.makedirs(path)
return paths
Expand All @@ -189,13 +210,6 @@ def template(config):
script_dir = os.path.dirname(os.path.realpath(__file__))
return os.path.join(script_dir, "server_config", config)

def reset_run_vars():
global kafka_configs, kafka_processes, run_dir, zk_config, zk_process
kafka_configs = kafka_processes = run_dir = zk_config = zk_process = None

def run_exists():
return any([kafka_configs, kafka_processes, run_dir, zk_config, zk_process])

####

def print_zk_snapshot():
Expand All @@ -204,9 +218,11 @@ def print_zk_snapshot():
print zk.export_tree(ephemeral=True)

################################ TESTS BEGIN ###################################

@with_setup(setup_servers(NUM_BROKERS, NUM_PARTITIONS))
def test_001_consumer_rebalancing():
"""Consumer rebalancing, with auto rebalancing."""
for kafka_config in kafka_configs:
for kafka_config in RunConfig.kafka_configs:
k = Kafka("localhost", kafka_config.port)
for topic in ["t1", "t2", "t3"]:
k.produce(topic, ["bootstrap"], 0)
Expand All @@ -229,8 +245,6 @@ def test_001_consumer_rebalancing():

c3 = ZKConsumer(ZK_CONNECT_STR, "group_001", "t1")
assert_equals(len(c3.broker_partitions), (TOTAL_NUM_PARTITIONS) / 3)
# c1.rebalance()
# c2.rebalance()

time.sleep(1)
assert_equals(sum(len(c.broker_partitions) for c in [c1, c2, c3]),
Expand All @@ -241,17 +255,17 @@ def test_001_consumer_rebalancing():
TOTAL_NUM_PARTITIONS,
"There should be no overlaps")

def test_003_consumers():
def test_002_consumers():
"""Multi-broker/partition fetches"""
c1 = ZKConsumer(ZK_CONNECT_STR, "group_003", "topic_003")
c1 = ZKConsumer(ZK_CONNECT_STR, "group_002", "topic_002")

result = c1.fetch()
assert_equals(len(result), 0, "This shouldn't error, but it should be empty")

for kafka_config in kafka_configs:
for kafka_config in RunConfig.kafka_configs:
k = Kafka("localhost", kafka_config.port)
for partition in range(NUM_PARTITIONS):
k.produce("topic_003", ["hello"], partition)
k.produce("topic_002", ["hello"], partition)
time.sleep(2)

# This should grab "hello" from every partition and every topic
Expand Down

0 comments on commit b9d477b

Please sign in to comment.