Skip to content

Commit

Permalink
KAFKA-13793: Add validators for configs that lack validators (apache#…
Browse files Browse the repository at this point in the history
…12010)


Reviewers: Mickael Maison <[email protected]>, Luke Chen <[email protected]>, Chris Egerton <[email protected]>, Christo Lolov <[email protected]>, Divij Vaidya <[email protected]>
  • Loading branch information
RivenSun2 authored May 9, 2022
1 parent 1278e38 commit df507e5
Show file tree
Hide file tree
Showing 22 changed files with 246 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.kafka.clients;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -203,4 +205,15 @@ public static Map<String, Object> postProcessReconnectBackoffConfigs(AbstractCon
}
return rval;
}

public static void postValidateSaslMechanismConfig(AbstractConfig config) {
SecurityProtocol securityProtocol = SecurityProtocol.forName(config.getString(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
String clientSaslMechanism = config.getString(SaslConfigs.SASL_MECHANISM);
if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) {
if (clientSaslMechanism == null || clientSaslMechanism.isEmpty()) {
throw new ConfigException(SaslConfigs.SASL_MECHANISM, null, "When the " + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG +
" configuration enables SASL, mechanism must be non-null and non-empty string.");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;

import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -212,6 +214,7 @@ public class AdminClientConfig extends AbstractConfig {
.define(SECURITY_PROTOCOL_CONFIG,
Type.STRING,
DEFAULT_SECURITY_PROTOCOL,
in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
Expand All @@ -220,6 +223,7 @@ public class AdminClientConfig extends AbstractConfig {

@Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
CommonClientConfigs.postValidateSaslMechanismConfig(this);
return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Utils;

Expand Down Expand Up @@ -351,6 +353,7 @@ public class ConsumerConfig extends AbstractConfig {
.define(GROUP_INSTANCE_ID_CONFIG,
Type.STRING,
null,
new ConfigDef.NonEmptyString(),
Importance.MEDIUM,
GROUP_INSTANCE_ID_DOC)
.define(SESSION_TIMEOUT_MS_CONFIG,
Expand Down Expand Up @@ -572,6 +575,7 @@ public class ConsumerConfig extends AbstractConfig {
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
Expand All @@ -580,6 +584,7 @@ public class ConsumerConfig extends AbstractConfig {

@Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
CommonClientConfigs.postValidateSaslMechanismConfig(this);
Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
maybeOverrideClientId(refinedConfigs);
return refinedConfigs;
Expand All @@ -602,11 +607,16 @@ private void maybeOverrideClientId(Map<String, Object> configs) {
protected static Map<String, Object> appendDeserializerToConfig(Map<String, Object> configs,
Deserializer<?> keyDeserializer,
Deserializer<?> valueDeserializer) {
// validate deserializer configuration, if the passed deserializer instance is null, the user must explicitly set a valid deserializer configuration value
Map<String, Object> newConfigs = new HashMap<>(configs);
if (keyDeserializer != null)
newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass());
else if (newConfigs.get(KEY_DESERIALIZER_CLASS_CONFIG) == null)
throw new ConfigException(KEY_DESERIALIZER_CLASS_CONFIG, null, "must be non-null.");
if (valueDeserializer != null)
newConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass());
else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) == null)
throw new ConfigException(VALUE_DESERIALIZER_CLASS_CONFIG, null, "must be non-null.");
return newConfigs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -448,6 +449,7 @@ public class ProducerConfig extends AbstractConfig {
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(SECURITY_PROVIDERS_CONFIG,
Expand Down Expand Up @@ -477,6 +479,7 @@ public class ProducerConfig extends AbstractConfig {

@Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
CommonClientConfigs.postValidateSaslMechanismConfig(this);
Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
postProcessAndValidateIdempotenceConfigs(refinedConfigs);
maybeOverrideClientId(refinedConfigs);
Expand Down Expand Up @@ -559,11 +562,16 @@ private static String parseAcks(String acksString) {
static Map<String, Object> appendSerializerToConfig(Map<String, Object> configs,
Serializer<?> keySerializer,
Serializer<?> valueSerializer) {
// validate serializer configuration, if the passed serializer instance is null, the user must explicitly set a valid serializer configuration value
Map<String, Object> newConfigs = new HashMap<>(configs);
if (keySerializer != null)
newConfigs.put(KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass());
else if (newConfigs.get(KEY_SERIALIZER_CLASS_CONFIG) == null)
throw new ConfigException(KEY_SERIALIZER_CLASS_CONFIG, null, "must be non-null.");
if (valueSerializer != null)
newConfigs.put(VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass());
else if (newConfigs.get(VALUE_SERIALIZER_CLASS_CONFIG) == null)
throw new ConfigException(VALUE_SERIALIZER_CLASS_CONFIG, null, "must be non-null.");
return newConfigs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public enum SslClientAuth {
NONE;

public static final List<SslClientAuth> VALUES =
Collections.unmodifiableList(Arrays.asList(SslClientAuth.values()));
Collections.unmodifiableList(Arrays.asList(SslClientAuth.values()));

public static SslClientAuth forConfig(String key) {
if (key == null) {
Expand All @@ -45,4 +45,9 @@ public static SslClientAuth forConfig(String key) {
}
return null;
}

@Override
public String toString() {
return super.toString().toLowerCase(Locale.ROOT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,21 @@

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class CommonClientConfigsTest {
private static class TestConfig extends AbstractConfig {
Expand All @@ -44,11 +51,23 @@ private static class TestConfig extends AbstractConfig {
1000L,
atLeast(0L),
ConfigDef.Importance.LOW,
"");
"")
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
ConfigDef.Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
in(Utils.enumOptions(SecurityProtocol.class)),
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(SaslConfigs.SASL_MECHANISM,
ConfigDef.Type.STRING,
SaslConfigs.DEFAULT_SASL_MECHANISM,
ConfigDef.Importance.MEDIUM,
SaslConfigs.SASL_MECHANISM_DOC);
}

@Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
CommonClientConfigs.postValidateSaslMechanismConfig(this);
return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
}

Expand Down Expand Up @@ -82,4 +101,17 @@ public void testExponentialBackoffDefaults() {
assertEquals(Long.valueOf(123L),
reconnectBackoffSetConf.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG));
}

@Test
public void testInvalidSaslMechanism() {
Map<String, Object> configs = new HashMap<>();
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
configs.put(SaslConfigs.SASL_MECHANISM, null);
ConfigException ce = assertThrows(ConfigException.class, () -> new TestConfig(configs));
assertTrue(ce.getMessage().contains(SaslConfigs.SASL_MECHANISM));

configs.put(SaslConfigs.SASL_MECHANISM, "");
ce = assertThrows(ConfigException.class, () -> new TestConfig(configs));
assertTrue(ce.getMessage().contains(SaslConfigs.SASL_MECHANISM));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.clients.consumer;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
Expand All @@ -30,6 +32,8 @@

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class ConsumerConfigTest {
Expand Down Expand Up @@ -98,6 +102,19 @@ public void testAppendDeserializerToConfig() {
assertEquals(newConfigs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), valueDeserializerClass);
}

@Test
public void testAppendDeserializerToConfigWithException() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, null);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
assertThrows(ConfigException.class, () -> ConsumerConfig.appendDeserializerToConfig(configs, null, valueDeserializer));

configs.clear();
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, null);
assertThrows(ConfigException.class, () -> ConsumerConfig.appendDeserializerToConfig(configs, keyDeserializer, null));
}

@Test
public void ensureDefaultThrowOnUnsupportedStableFlagToFalse() {
assertFalse(new ConsumerConfig(properties).getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
Expand All @@ -108,4 +125,24 @@ public void testDefaultPartitionAssignor() {
assertEquals(Arrays.asList(RangeAssignor.class, CooperativeStickyAssignor.class),
new ConsumerConfig(properties).getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
}

@Test
public void testInvalidGroupInstanceId() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
configs.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "");
ConfigException ce = assertThrows(ConfigException.class, () -> new ConsumerConfig(configs));
assertTrue(ce.getMessage().contains(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG));
}

@Test
public void testInvalidSecurityProtocol() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "abc");
ConfigException ce = assertThrows(ConfigException.class, () -> new ConsumerConfig(configs));
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@
*/
package org.apache.kafka.clients.producer;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.config.ConfigException;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ProducerConfigTest {

Expand Down Expand Up @@ -62,6 +64,19 @@ public void testAppendSerializerToConfig() {
assertEquals(newConfigs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), valueSerializerClass);
}

@Test
public void testAppendSerializerToConfigWithException() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, null);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
assertThrows(ConfigException.class, () -> ProducerConfig.appendSerializerToConfig(configs, null, valueSerializer));

configs.clear();
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, null);
assertThrows(ConfigException.class, () -> ProducerConfig.appendSerializerToConfig(configs, keySerializer, null));
}

@Test
public void testInvalidCompressionType() {
Map<String, Object> configs = new HashMap<>();
Expand All @@ -70,4 +85,14 @@ public void testInvalidCompressionType() {
configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "abc");
assertThrows(ConfigException.class, () -> new ProducerConfig(configs));
}

@Test
public void testInvalidSecurityProtocol() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "abc");
ConfigException ce = assertThrows(ConfigException.class, () -> new ProducerConfig(configs));
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;

import java.util.Map;
import java.util.HashMap;

import static org.apache.kafka.common.config.ConfigDef.ValidString.in;

/** Configuration required for MirrorClient to talk to a given target cluster.
* <p>
* Generally, these properties come from an mm2.properties configuration file
Expand Down Expand Up @@ -99,6 +103,7 @@ private Map<String, Object> clientConfig(String prefix) {
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
Expand All @@ -125,6 +130,7 @@ private Map<String, Object> clientConfig(String prefix) {
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
Expand Down
Loading

0 comments on commit df507e5

Please sign in to comment.