Skip to content

Commit

Permalink
KAFKA-5112; Update compatibility system tests to include 0.10.2.1
Browse files Browse the repository at this point in the history
Also update message format tests now that we have a third message
format.

Finally, set group.initial.rebalance.delay.ms=100.

Author: Ismael Juma <[email protected]>

Reviewers: Ewen Cheslack-Postava <[email protected]>, Jason Gustafson <[email protected]>

Closes apache#2701 from ijuma/update-upgrade-tests-for-0.11
  • Loading branch information
ijuma committed Jun 6, 2017
1 parent 39eb31f commit b119d04
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 23 deletions.
1 change: 1 addition & 0 deletions tests/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ RUN mkdir -p "/opt/kafka-0.8.2.2" && curl -s "${MIRROR}kafka/0.8.2.2/kafka_2.10-
RUN mkdir -p "/opt/kafka-0.9.0.1" && curl -s "${MIRROR}kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.9.0.1"
RUN mkdir -p "/opt/kafka-0.10.0.1" && curl -s "${MIRROR}kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1"
RUN mkdir -p "/opt/kafka-0.10.1.1" && curl -s "${MIRROR}kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1"
RUN mkdir -p "/opt/kafka-0.10.2.1" && curl -s "${MIRROR}kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.1"

CMD service ssh start && tail -f /dev/null
2 changes: 1 addition & 1 deletion tests/docker/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ KAFKA_SRC=`dirname ${TESTS_DIR}`
KAFKA_VERSION=$(grep "version=.*" ${KAFKA_SRC}/gradle.properties | cut -f 2 -d =)
JDK_INFO="openjdk8"
KAFKA_IMAGE=${KAFKA_IMAGE:-kafkadev/kafka-image:${KAFKA_VERSION}}_${JDK_INFO}
export KAFKA_NUM_CONTAINERS=12
export KAFKA_NUM_CONTAINERS=13

chmod 600 ${SCRIPT_DIR}/ssh/id_rsa
cd ${KAFKA_SRC}
Expand Down
4 changes: 3 additions & 1 deletion tests/kafkatest/services/kafka/templates/kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,6 @@ replica.lag.time.max.ms={{replica_lag}}
{% if auto_create_topics_enable is defined and auto_create_topics_enable is not none %}
auto.create.topics.enable={{ auto_create_topics_enable }}
{% endif %}
offsets.topic.replication.factor={{ 3 if num_nodes > 3 else num_nodes }}
offsets.topic.replication.factor={{ 3 if num_nodes > 3 else num_nodes }}
# Set to a low, but non-zero value to exercise this path without making tests much slower
group.initial.rebalance.delay.ms=100
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from ducktape.tests.test import Test
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, V_0_10_1_0, KafkaVersion
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, V_0_10_1_0, KafkaVersion

def get_broker_features(broker_version):
features = {}
Expand Down Expand Up @@ -98,6 +98,7 @@ def invoke_compatibility_program(self, features):
@parametrize(broker_version=str(DEV_BRANCH))
@parametrize(broker_version=str(LATEST_0_10_0))
@parametrize(broker_version=str(LATEST_0_10_1))
@parametrize(broker_version=str(LATEST_0_10_2))
def run_compatibility_test(self, broker_version):
self.zk.start()
self.kafka.set_version(KafkaVersion(broker_version))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int_with_prefix
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, KafkaVersion
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, KafkaVersion

class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
"""
Expand Down Expand Up @@ -55,6 +55,7 @@ def min_cluster_size(self):
@parametrize(broker_version=str(DEV_BRANCH))
@parametrize(broker_version=str(LATEST_0_10_0))
@parametrize(broker_version=str(LATEST_0_10_1))
@parametrize(broker_version=str(LATEST_0_10_2))
def test_produce_consume(self, broker_version):
print("running producer_consumer_compat with broker_version = %s" % broker_version)
self.kafka.set_version(KafkaVersion(broker_version))
Expand Down
32 changes: 22 additions & 10 deletions tests/kafkatest/tests/client/message_format_change_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
from kafkatest.version import LATEST_0_9, LATEST_0_10, DEV_BRANCH, KafkaVersion
from kafkatest.version import LATEST_0_9, LATEST_0_10, LATEST_0_11, DEV_BRANCH, KafkaVersion


class MessageFormatChangeTest(ProduceConsumeValidateTest):
Expand All @@ -49,26 +49,34 @@ def produce_and_consume(self, producer_version, consumer_version, group):
message_validator=is_int,
version=KafkaVersion(producer_version))
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
self.topic, new_consumer=False, consumer_timeout_ms=30000,
self.topic, consumer_timeout_ms=30000,
message_validator=is_int, version=KafkaVersion(consumer_version))
self.consumer.group_id = group
self.run_produce_consume_validate(lambda: wait_until(
lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
timeout_sec=120, backoff_sec=1,
err_msg="Producer did not produce all messages in reasonable amount of time"))

@cluster(num_nodes=10)
@cluster(num_nodes=12)
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(DEV_BRANCH))
@parametrize(producer_version=str(LATEST_0_10), consumer_version=str(LATEST_0_10))
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9))
def test_compatibility(self, producer_version, consumer_version):
""" This tests performs the following checks:
The workload is a mix of 0.9.x and 0.10.x producers and consumers
that produce to and consume from a 0.10.x cluster
The workload is a mix of 0.9.x, 0.10.x and 0.11.x producers and consumers
that produce to and consume from a DEV_BRANCH cluster
1. initially the topic is using message format 0.9.0
2. change the message format version for topic to 0.10.0 on the fly.
3. change the message format version for topic back to 0.9.0 on the fly.
3. change the message format version for topic to 0.11.0 on the fly.
4. change the message format version for topic back to 0.10.0 on the fly (only if the client version is 0.11.0 or newer)
- The producers and consumers should not have any issue.
- Note that for 0.9.x consumers/producers we only do steps 1 and 2
Note regarding step number 4. Downgrading the message format version is generally unsupported as it breaks
older clients. More concretely, if we downgrade a topic from 0.11.0 to 0.10.0 after it contains messages with
version 0.11.0, we will return the 0.11.0 messages without down conversion due to an optimisation in the
handling of fetch requests. This will break any consumer that doesn't support 0.11.0. So, in practice, step 4
is similar to step 2 and it didn't seem worth it to increase the cluster size to in order to add a step 5 that
would change the message format version for the topic back to 0.9.0.0.
"""
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=DEV_BRANCH, topics={self.topic: {
"partitions": 3,
Expand All @@ -84,9 +92,13 @@ def test_compatibility(self, producer_version, consumer_version):
self.kafka.alter_message_format(self.topic, str(LATEST_0_10))
self.produce_and_consume(producer_version, consumer_version, "group2")

self.logger.info("Third format change to 0.11.0")
self.kafka.alter_message_format(self.topic, str(LATEST_0_11))
self.produce_and_consume(producer_version, consumer_version, "group3")

if producer_version == str(DEV_BRANCH) and consumer_version == str(DEV_BRANCH):
self.logger.info("Third format change back to 0.9.0")
self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
self.produce_and_consume(producer_version, consumer_version, "group3")
self.logger.info("Fourth format change back to 0.10.0")
self.kafka.alter_message_format(self.topic, str(LATEST_0_10))
self.produce_and_consume(producer_version, consumer_version, "group4")


Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
from kafkatest.version import LATEST_0_10_1, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, DEV_BRANCH, KafkaVersion

from kafkatest.version import LATEST_0_10_2, LATEST_0_10_1, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, DEV_BRANCH, KafkaVersion

# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x)
class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
Expand All @@ -46,12 +45,12 @@ def setUp(self):

@cluster(num_nodes=6)
@parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], new_consumer=False, timestamp_type=None)
@parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None)
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(DEV_BRANCH), compression_types=["none"], new_consumer=False, timestamp_type=None)
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None)
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(DEV_BRANCH), compression_types=["snappy"], timestamp_type=None)
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("CreateTime"))
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(DEV_BRANCH), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
@parametrize(producer_version=str(LATEST_0_10_2), consumer_version=str(LATEST_0_10_2), compression_types=["lz4"], timestamp_type=str("CreateTime"))
@parametrize(producer_version=str(LATEST_0_10_1), consumer_version=str(LATEST_0_10_1), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
@parametrize(producer_version=str(LATEST_0_10_0), consumer_version=str(LATEST_0_10_0), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
Expand Down
16 changes: 10 additions & 6 deletions tests/kafkatest/tests/core/upgrade_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, DEV_BRANCH, KafkaVersion
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, DEV_BRANCH, KafkaVersion

class TestUpgrade(ProduceConsumeValidateTest):

Expand Down Expand Up @@ -62,11 +62,16 @@ def perform_upgrade(self, from_kafka_version, to_message_format_version=None):
self.kafka.start_node(node)

@cluster(num_nodes=6)
@parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=str(LATEST_0_9), compression_types=["none"])
@parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=str(LATEST_0_10), compression_types=["snappy"], new_consumer=False)
@parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=None, compression_types=["lz4"])
@parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=None, compression_types=["none"])
@parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=None, compression_types=["snappy"])
@parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=None, compression_types=["lz4"], new_consumer=False)
@parametrize(from_kafka_version=str(LATEST_0_10_1), to_message_format_version=None, compression_types=["lz4"])
@parametrize(from_kafka_version=str(LATEST_0_10_1), to_message_format_version=None, compression_types=["snappy"], new_consumer=False)
@parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"], new_consumer=False)
@parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"])
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], new_consumer=False)
@parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["lz4"])
@cluster(num_nodes=7)
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], security_protocol="SASL_SSL")
@cluster(num_nodes=6)
Expand All @@ -75,16 +80,15 @@ def perform_upgrade(self, from_kafka_version, to_message_format_version=None):
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"])
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False)
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["snappy"])
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"], new_consumer=False)
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"])
@cluster(num_nodes=7)
@parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["none"], new_consumer=False)
@parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["snappy"], new_consumer=False)
def test_upgrade(self, from_kafka_version, to_message_format_version, compression_types,
new_consumer=True, security_protocol="PLAINTEXT"):
"""Test upgrade of Kafka broker cluster from 0.8.2, 0.9.0, 0.10.0, 0.10.1 to the current version
"""Test upgrade of Kafka broker cluster from 0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2 to the current version
from_kafka_version is a Kafka version to upgrade from: either 0.8.2.X, 0.9.0.x or 0.10.0.x
from_kafka_version is a Kafka version to upgrade from
If to_message_format_version is None, it means that we will upgrade to default (latest)
message format version. It is possible to upgrade to 0.10 brokers but still use message
Expand Down

0 comments on commit b119d04

Please sign in to comment.