Skip to content

Commit

Permalink
Allow to config Sasl configs in Kafka sink. (apache#11422)
Browse files Browse the repository at this point in the history
### Motivation
Allow pulsar io to push messages to sasl kafka cluster.

### Modifications

Add several kafka sasl configs, make them configable.


### Documentation
I think the pulsar-io can automatically generate the doc for added fields. So we don't need add any docs.
  • Loading branch information
Shoothzj authored Jul 27, 2021
1 parent 38bba54 commit 3afbef2
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
import java.util.Properties;

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.Sink;
Expand Down Expand Up @@ -94,6 +97,27 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
}

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSinkConfig.getBootstrapServers());
if (StringUtils.isNotEmpty(kafkaSinkConfig.getSecurityProtocol())) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSinkConfig.getSecurityProtocol());
}
if (StringUtils.isNotEmpty(kafkaSinkConfig.getSaslMechanism())) {
props.put(SaslConfigs.SASL_MECHANISM, kafkaSinkConfig.getSaslMechanism());
}
if (StringUtils.isNotEmpty(kafkaSinkConfig.getSaslJaasConfig())) {
props.put(SaslConfigs.SASL_JAAS_CONFIG, kafkaSinkConfig.getSaslJaasConfig());
}
if (StringUtils.isNotEmpty(kafkaSinkConfig.getSslEnabledProtocols())) {
props.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, kafkaSinkConfig.getSslEnabledProtocols());
}
if (StringUtils.isNotEmpty(kafkaSinkConfig.getSslEndpointIdentificationAlgorithm())) {
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, kafkaSinkConfig.getSslEndpointIdentificationAlgorithm());
}
if (StringUtils.isNotEmpty(kafkaSinkConfig.getSslTruststoreLocation())) {
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaSinkConfig.getSslTruststoreLocation());
}
if (StringUtils.isNotEmpty(kafkaSinkConfig.getSslTruststorePassword())) {
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaSinkConfig.getSslTruststorePassword());
}
props.put(ProducerConfig.ACKS_CONFIG, kafkaSinkConfig.getAcks());
props.put(ProducerConfig.BATCH_SIZE_CONFIG, String.valueOf(kafkaSinkConfig.getBatchSize()));
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, String.valueOf(kafkaSinkConfig.getMaxRequestSize()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,51 +37,94 @@ public class KafkaSinkConfig implements Serializable {
private static final long serialVersionUID = 1L;

@FieldDoc(
required = true,
defaultValue = "",
help =
"A comma-separated list of host and port pairs that are the addresses of "
+ "the Kafka brokers that a Kafka client connects to initially bootstrap itself")
required = true,
defaultValue = "",
help =
"A comma-separated list of host and port pairs that are the addresses of "
+ "the Kafka brokers that a Kafka client connects to initially bootstrap itself")
private String bootstrapServers;

@FieldDoc(
required = false,
defaultValue = "",
help = "Protocol used to communicate with kafka brokers.")
private String securityProtocol;

@FieldDoc(
required = false,
defaultValue = "",
help = "SASL mechanism used for kafka client connections.")
private String saslMechanism;

@FieldDoc(
required = false,
defaultValue = "",
help = "JAAS login context parameters for SASL connections in the format used by JAAS configuration files.")
private String saslJaasConfig;

@FieldDoc(
required = false,
defaultValue = "",
help = "The list of protocols enabled for SSL connections.")
private String sslEnabledProtocols;

@FieldDoc(
required = false,
defaultValue = "",
help = "The endpoint identification algorithm to validate server hostname using server certificate.")
private String sslEndpointIdentificationAlgorithm;

@FieldDoc(
required = false,
defaultValue = "",
help = "The location of the trust store file.")
private String sslTruststoreLocation;

@FieldDoc(
required = false,
defaultValue = "",
help = "The password for the trust store file.")
private String sslTruststorePassword;

@FieldDoc(
required = true,
defaultValue = "",
help =
"The number of acknowledgments the producer requires the leader to have received "
+ "before considering a request complete. This controls the durability of records that are sent.")
required = true,
defaultValue = "",
help =
"The number of acknowledgments the producer requires the leader to have received "
+ "before considering a request complete. This controls the durability of records that are sent.")
private String acks;
@FieldDoc(
defaultValue = "16384L",
help =
"The batch size that Kafka producer will attempt to batch records together before sending them to brokers.")
defaultValue = "16384L",
help =
"The batch size that Kafka producer will attempt to batch records together before sending them to brokers.")
private long batchSize = 16384L;
@FieldDoc(
defaultValue = "1048576L",
help =
"The maximum size of a Kafka request in bytes.")
defaultValue = "1048576L",
help =
"The maximum size of a Kafka request in bytes.")
private long maxRequestSize = 1048576L;
@FieldDoc(
required = true,
defaultValue = "",
help =
"The Kafka topic that is used for Pulsar moving messages to.")
required = true,
defaultValue = "",
help =
"The Kafka topic that is used for Pulsar moving messages to.")
private String topic;
@FieldDoc(
defaultValue = "org.apache.kafka.common.serialization.StringSerializer",
help =
"The serializer class for Kafka producer to serialize keys.")
defaultValue = "org.apache.kafka.common.serialization.StringSerializer",
help =
"The serializer class for Kafka producer to serialize keys.")
private String keySerializerClass = "org.apache.kafka.common.serialization.StringSerializer";
@FieldDoc(
defaultValue = "org.apache.kafka.common.serialization.ByteArraySerializer",
help =
"The serializer class for Kafka producer to serialize values. You typically shouldn't care this. "
+ "Since the serializer will be set by a specific implementation of `KafkaAbstractSink`.")
defaultValue = "org.apache.kafka.common.serialization.ByteArraySerializer",
help =
"The serializer class for Kafka producer to serialize values. You typically shouldn't care this. "
+ "Since the serializer will be set by a specific implementation of `KafkaAbstractSink`.")
private String valueSerializerClass = "org.apache.kafka.common.serialization.ByteArraySerializer";
@FieldDoc(
defaultValue = "",
help =
"The producer config properties to be passed to Producer. Note that other properties specified "
+ "in the connector config file take precedence over this config.")
defaultValue = "",
help =
"The producer config properties to be passed to Producer. Note that other properties specified "
+ "in the connector config file take precedence over this config.")
private Map<String, Object> producerConfigProperties;

public static KafkaSinkConfig load(String yamlFile) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
Expand Down Expand Up @@ -221,6 +222,24 @@ public final void loadFromYamlFileTest() throws IOException {
assertEquals("1", props.getProperty(ProducerConfig.ACKS_CONFIG));
}

@Test
public final void loadFromSaslYamlFileTest() throws IOException {
File yamlFile = getFile("kafkaSinkConfigSasl.yaml");
KafkaSinkConfig config = KafkaSinkConfig.load(yamlFile.getAbsolutePath());
assertNotNull(config);
assertEquals(config.getBootstrapServers(), "localhost:6667");
assertEquals(config.getTopic(), "test");
assertEquals(config.getAcks(), "1");
assertEquals(config.getBatchSize(), 16384L);
assertEquals(config.getMaxRequestSize(), 1048576L);
assertEquals(config.getSecurityProtocol(), SecurityProtocol.SASL_PLAINTEXT.name);
assertEquals(config.getSaslMechanism(), "PLAIN");
assertEquals(config.getSaslJaasConfig(), "org.apache.kafka.common.security.plain.PlainLoginModule required \nusername=\"alice\" \npassword=\"pwd\";");
assertEquals(config.getSslEndpointIdentificationAlgorithm(), "");
assertEquals(config.getSslTruststoreLocation(), "/etc/cert.pem");
assertEquals(config.getSslTruststorePassword(), "cert_pwd");
}

private File getFile(String name) {
ClassLoader classLoader = getClass().getClassLoader();
return new File(classLoader.getResource(name).getFile());
Expand Down
29 changes: 29 additions & 0 deletions pulsar-io/kafka/src/test/resources/kafkaSinkConfigSasl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

"bootstrapServers": "localhost:6667"
"topic": "test"
"acks": "1"
"securityProtocol": "SASL_PLAINTEXT"
"saslMechanism" : "PLAIN"
"saslJaasConfig" : "org.apache.kafka.common.security.plain.PlainLoginModule required \nusername=\"alice\" \npassword=\"pwd\";"
"sslEnabledProtocols" : "TLSv1.2"
"sslEndpointIdentificationAlgorithm" : ""
"sslTruststoreLocation" : "/etc/cert.pem"
"sslTruststorePassword" : "cert_pwd"

0 comments on commit 3afbef2

Please sign in to comment.