Skip to content

Commit

Permalink
[hotfix][Kafka] Refactor properties for KafkaTestEnvironment setup
Browse files Browse the repository at this point in the history
  • Loading branch information
pnowojski authored and StefanRRichter committed Aug 8, 2017
1 parent b7f96f7 commit e11a591
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
private String zookeeperConnectionString;
private String brokerConnectionString = "";
private Properties standardProps;
private Properties additionalServerProperties;
private boolean secureMode = false;
private Config config;
// 6 seconds is default. Seems to be too small for travis. 30 seconds
private int zkTimeout = 30000;

Expand All @@ -96,7 +95,7 @@ public Properties getStandardProperties() {
@Override
public Properties getSecureProperties() {
Properties prop = new Properties();
if (secureMode) {
if (config.isSecureMode()) {
prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
prop.put("security.protocol", "SASL_PLAINTEXT");
prop.put("sasl.kerberos.service.name", "kafka");
Expand Down Expand Up @@ -215,26 +214,24 @@ public boolean isSecureRunSupported() {
}

@Override
public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
public void prepare(Config config) {
//increase the timeout since in Travis ZK connection takes long time for secure connection.
if (secureMode) {
if (config.isSecureMode()) {
//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
numKafkaServers = 1;
config.setKafkaServersNumber(1);
zkTimeout = zkTimeout * 15;
}
this.config = config;

this.additionalServerProperties = additionalServerProperties;
this.secureMode = secureMode;
File tempDir = new File(System.getProperty("java.io.tmpdir"));

tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());

tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString()));
assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());

tmpKafkaDirs = new ArrayList<>(numKafkaServers);
for (int i = 0; i < numKafkaServers; i++) {
tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
for (int i = 0; i < config.getKafkaServersNumber(); i++) {
File tmpDir = new File(tmpKafkaParent, "server-" + i);
assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
tmpKafkaDirs.add(tmpDir);
Expand All @@ -249,12 +246,12 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties,
LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);

LOG.info("Starting KafkaServer");
brokers = new ArrayList<>(numKafkaServers);
brokers = new ArrayList<>(config.getKafkaServersNumber());

for (int i = 0; i < numKafkaServers; i++) {
for (int i = 0; i < config.getKafkaServersNumber(); i++) {
brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));

if (secureMode) {
if (config.isSecureMode()) {
brokerConnectionString += hostAndPortToUrlString(
KafkaTestEnvironment.KAFKA_HOST,
brokers.get(i).socketServer().boundPort(
Expand Down Expand Up @@ -347,7 +344,7 @@ public void createTestTopic(String topic, int numberOfPartitions, int replicatio
final long deadline = System.nanoTime() + 30_000_000_000L;
do {
try {
if (secureMode) {
if (config.isSecureMode()) {
//increase wait time since in Travis ZK timeout occurs frequently
int wait = zkTimeout / 100;
LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
Expand Down Expand Up @@ -407,8 +404,8 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except
// for CI stability, increase zookeeper session timeout
kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
if (additionalServerProperties != null) {
kafkaProperties.putAll(additionalServerProperties);
if (config.getKafkaServerProperties() != null) {
kafkaProperties.putAll(config.getKafkaServerProperties());
}

final int numTries = 5;
Expand All @@ -418,7 +415,7 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except
kafkaProperties.put("port", Integer.toString(kafkaPort));

//to support secure kafka cluster
if (secureMode) {
if (config.isSecureMode()) {
LOG.info("Adding Kafka secure configurations");
kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
private String zookeeperConnectionString;
private String brokerConnectionString = "";
private Properties standardProps;
private Properties additionalServerProperties;

private Config config;

public String getBrokerConnectionString() {
return brokerConnectionString;
Expand Down Expand Up @@ -206,8 +207,8 @@ public boolean isSecureRunSupported() {
}

@Override
public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
this.additionalServerProperties = additionalServerProperties;
public void prepare(Config config) {
this.config = config;
File tempDir = new File(System.getProperty("java.io.tmpdir"));

tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
Expand All @@ -224,8 +225,8 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties,
fail("cannot create kafka temp dir: " + e.getMessage());
}

tmpKafkaDirs = new ArrayList<>(numKafkaServers);
for (int i = 0; i < numKafkaServers; i++) {
tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
for (int i = 0; i < config.getKafkaServersNumber(); i++) {
File tmpDir = new File(tmpKafkaParent, "server-" + i);
assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
tmpKafkaDirs.add(tmpDir);
Expand All @@ -240,9 +241,9 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties,
zookeeperConnectionString = zookeeper.getConnectString();

LOG.info("Starting KafkaServer");
brokers = new ArrayList<>(numKafkaServers);
brokers = new ArrayList<>(config.getKafkaServersNumber());

for (int i = 0; i < numKafkaServers; i++) {
for (int i = 0; i < config.getKafkaServersNumber(); i++) {
brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
SocketServer socketServer = brokers.get(i).socketServer();

Expand Down Expand Up @@ -391,8 +392,8 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except
// for CI stability, increase zookeeper session timeout
kafkaProperties.put("zookeeper.session.timeout.ms", "30000");
kafkaProperties.put("zookeeper.connection.timeout.ms", "30000");
if (additionalServerProperties != null) {
kafkaProperties.putAll(additionalServerProperties);
if (config.getKafkaServerProperties() != null) {
kafkaProperties.putAll(config.getKafkaServerProperties());
}

final int numTries = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
private String zookeeperConnectionString;
private String brokerConnectionString = "";
private Properties standardProps;
private Properties additionalServerProperties;
private boolean secureMode = false;
// 6 seconds is default. Seems to be too small for travis. 30 seconds
private String zkTimeout = "30000";

private Config config;

public String getBrokerConnectionString() {
return brokerConnectionString;
}
Expand Down Expand Up @@ -200,27 +200,24 @@ public boolean isSecureRunSupported() {
}

@Override
public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {

public void prepare(Config config) {
//increase the timeout since in Travis ZK connection takes long time for secure connection.
if (secureMode) {
if (config.isSecureMode()) {
//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
numKafkaServers = 1;
config.setKafkaServersNumber(1);
zkTimeout = String.valueOf(Integer.parseInt(zkTimeout) * 15);
}
this.config = config;

this.additionalServerProperties = additionalServerProperties;
this.secureMode = secureMode;
File tempDir = new File(System.getProperty("java.io.tmpdir"));

tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());

tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString()));
assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());

tmpKafkaDirs = new ArrayList<>(numKafkaServers);
for (int i = 0; i < numKafkaServers; i++) {
tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
for (int i = 0; i < config.getKafkaServersNumber(); i++) {
File tmpDir = new File(tmpKafkaParent, "server-" + i);
assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
tmpKafkaDirs.add(tmpDir);
Expand All @@ -236,13 +233,13 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties,
LOG.info("zookeeperConnectionString: {}", zookeeperConnectionString);

LOG.info("Starting KafkaServer");
brokers = new ArrayList<>(numKafkaServers);
brokers = new ArrayList<>(config.getKafkaServersNumber());

for (int i = 0; i < numKafkaServers; i++) {
for (int i = 0; i < config.getKafkaServersNumber(); i++) {
brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));

SocketServer socketServer = brokers.get(i).socketServer();
if (secureMode) {
if (this.config.isSecureMode()) {
brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
} else {
brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
Expand Down Expand Up @@ -335,7 +332,7 @@ public void createTestTopic(String topic, int numberOfPartitions, int replicatio
final long deadline = System.nanoTime() + Integer.parseInt(zkTimeout) * 1_000_000L;
do {
try {
if (secureMode) {
if (config.isSecureMode()) {
//increase wait time since in Travis ZK timeout occurs frequently
int wait = Integer.parseInt(zkTimeout) / 100;
LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
Expand Down Expand Up @@ -400,8 +397,8 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except
// for CI stability, increase zookeeper session timeout
kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
if (additionalServerProperties != null) {
kafkaProperties.putAll(additionalServerProperties);
if (config.getKafkaServerProperties() != null) {
kafkaProperties.putAll(config.getKafkaServerProperties());
}

final int numTries = 5;
Expand All @@ -411,7 +408,7 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except
kafkaProperties.put("port", Integer.toString(kafkaPort));

//to support secure kafka cluster
if (secureMode) {
if (config.isSecureMode()) {
LOG.info("Adding Kafka secure configurations");
kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
Expand Down Expand Up @@ -442,7 +439,7 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except

public Properties getSecureProperties() {
Properties prop = new Properties();
if (secureMode) {
if (config.isSecureMode()) {
prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
prop.put("security.protocol", "SASL_PLAINTEXT");
prop.put("sasl.kerberos.service.name", "kafka");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public static void prepare() throws IOException, ClassNotFoundException {
specificProperties.setProperty("log.retention.minutes", "0");
specificProperties.setProperty("log.retention.ms", "250");
specificProperties.setProperty("log.retention.check.interval.ms", "100");
kafkaServer.prepare(1, specificProperties, false);
kafkaServer.prepare(kafkaServer.createConfig().setKafkaServerProperties(specificProperties));

standardProps = kafkaServer.getStandardProperties();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ protected static void startClusters(boolean secureMode) throws ClassNotFoundExce

LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());

kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS, secureMode);
kafkaServer.prepare(kafkaServer.createConfig().setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS).setSecureMode(secureMode));

standardProps = kafkaServer.getStandardProperties();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,56 @@
* Abstract class providing a Kafka test environment.
*/
public abstract class KafkaTestEnvironment {
/**
* Configuration class for {@link KafkaTestEnvironment}.
*/
public static class Config {
private int kafkaServersNumber = 1;
private Properties kafkaServerProperties = null;
private boolean secureMode = false;

/**
* Please use {@link KafkaTestEnvironment#createConfig()} method.
*/
private Config() {
}

public int getKafkaServersNumber() {
return kafkaServersNumber;
}

public Config setKafkaServersNumber(int kafkaServersNumber) {
this.kafkaServersNumber = kafkaServersNumber;
return this;
}

public Properties getKafkaServerProperties() {
return kafkaServerProperties;
}

public Config setKafkaServerProperties(Properties kafkaServerProperties) {
this.kafkaServerProperties = kafkaServerProperties;
return this;
}

public boolean isSecureMode() {
return secureMode;
}

public Config setSecureMode(boolean secureMode) {
this.secureMode = secureMode;
return this;
}
}

protected static final String KAFKA_HOST = "localhost";

public abstract void prepare(int numKafkaServers, Properties kafkaServerProperties, boolean secureMode);

public void prepare(int numberOfKafkaServers, boolean secureMode) {
this.prepare(numberOfKafkaServers, null, secureMode);
public static Config createConfig() {
return new Config();
}

public abstract void prepare(Config config);

public abstract void shutdown();

public abstract void deleteTestTopic(String topic);
Expand Down

0 comments on commit e11a591

Please sign in to comment.