Skip to content

Commit

Permalink
[FLINK-23877][connector/pulsar] Change the config setter in pulsar so…
Browse files Browse the repository at this point in the history
…urce builder. Drop Properties. Remove useless config options.
  • Loading branch information
syhily authored and AHeise committed Aug 26, 2021
1 parent 069e437 commit 0fc9008
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,6 @@ private PulsarOptions() {
.withDescription(
"Whether to use TCP no-delay flag on the connection to disable Nagle algorithm.");

public static final ConfigOption<Boolean> PULSAR_USE_TLS =
ConfigOptions.key(CLIENT_CONFIG_PREFIX + "useTls")
.booleanType()
.defaultValue(false)
.withDescription("Whether to use TLS encryption on the connection.");

public static final ConfigOption<String> PULSAR_TLS_TRUST_CERTS_FILE_PATH =
ConfigOptions.key(CLIENT_CONFIG_PREFIX + "tlsTrustCertsFilePath")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
Expand All @@ -40,10 +40,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.regex.Pattern;

import static java.lang.Boolean.FALSE;
Expand All @@ -52,11 +53,8 @@
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_REGEX_SUBSCRIPTION_MODE;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_TOPICS_PATTERN;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_TOPIC_NAMES;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS;
import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.checkConfigurations;
import static org.apache.flink.util.InstantiationUtil.isSerializable;
Expand Down Expand Up @@ -134,8 +132,7 @@ public final class PulsarSourceBuilder<OUT> {
* @return this PulsarSourceBuilder.
*/
public PulsarSourceBuilder<OUT> setAdminUrl(String adminUrl) {
setConfiguration(PULSAR_ADMIN_URL, adminUrl);
return this;
return setConfig(PULSAR_ADMIN_URL, adminUrl);
}

/**
Expand All @@ -145,8 +142,7 @@ public PulsarSourceBuilder<OUT> setAdminUrl(String adminUrl) {
* @return this PulsarSourceBuilder.
*/
public PulsarSourceBuilder<OUT> setServiceUrl(String serviceUrl) {
setConfiguration(PULSAR_SERVICE_URL, serviceUrl);
return this;
return setConfig(PULSAR_SERVICE_URL, serviceUrl);
}

/**
Expand All @@ -156,8 +152,7 @@ public PulsarSourceBuilder<OUT> setServiceUrl(String serviceUrl) {
* @return this PulsarSourceBuilder.
*/
public PulsarSourceBuilder<OUT> setSubscriptionName(String subscriptionName) {
setConfiguration(PULSAR_SUBSCRIPTION_NAME, subscriptionName);
return this;
return setConfig(PULSAR_SUBSCRIPTION_NAME, subscriptionName);
}

/**
Expand Down Expand Up @@ -208,7 +203,6 @@ public PulsarSourceBuilder<OUT> setTopics(String... topics) {
*/
public PulsarSourceBuilder<OUT> setTopics(List<String> topics) {
ensureSubscriberIsNull("topics");
configuration.set(PULSAR_TOPIC_NAMES, topics);
this.subscriber = PulsarSubscriber.getTopicListSubscriber(topics);
return this;
}
Expand Down Expand Up @@ -253,14 +247,19 @@ public PulsarSourceBuilder<OUT> setTopicPattern(
* with {@link #setTopics} or {@link #setTopicPattern} in this builder.
*
* @param topicsPattern the pattern of the topic name to consume from.
* @param regexSubscriptionMode The topic filter for regex subscription.
* @param regexSubscriptionMode When subscribing to a topic using a regular expression, you can
* pick a certain type of topics.
* <ul>
* <li>PersistentOnly: only subscribe to persistent topics.
* <li>NonPersistentOnly: only subscribe to non-persistent topics.
* <li>AllTopics: subscribe to both persistent and non-persistent topics.
* </ul>
*
* @return this PulsarSourceBuilder.
*/
public PulsarSourceBuilder<OUT> setTopicPattern(
Pattern topicsPattern, RegexSubscriptionMode regexSubscriptionMode) {
ensureSubscriberIsNull("topic pattern");
configuration.set(PULSAR_TOPICS_PATTERN, topicsPattern.toString());
configuration.set(PULSAR_REGEX_SUBSCRIPTION_MODE, regexSubscriptionMode);
this.subscriber =
PulsarSubscriber.getTopicPatternSubscriber(topicsPattern, regexSubscriptionMode);
return this;
Expand All @@ -284,7 +283,7 @@ public PulsarSourceBuilder<OUT> setRangeGenerator(RangeGenerator rangeGenerator)
LOG.warn("No subscription type provided, set it to Key_Shared.");
setSubscriptionType(SubscriptionType.Key_Shared);
}
this.rangeGenerator = rangeGenerator;
this.rangeGenerator = checkNotNull(rangeGenerator);
return this;
}

Expand Down Expand Up @@ -360,64 +359,52 @@ public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema(

/**
* Set an arbitrary property for the PulsarSource and PulsarConsumer. The valid keys can be
* found in {@link PulsarSourceOptions}.
* found in {@link PulsarSourceOptions} and {@link PulsarOptions}.
*
* @param key the key of the property.
* @param value the value of the property.
* @return this PulsarSourceBuilder.
*/
public PulsarSourceBuilder<OUT> setProperty(String key, String value) {
checkNotNull(key);
checkNotNull(value);
if (configuration.containsKey(key)) {
ConfigOption<String> rawOption = ConfigOptions.key(key).stringType().noDefaultValue();
LOG.warn(
"Config option {} already has a value {}, override to new value {}",
key,
configuration.getString(rawOption),
value);
}
configuration.setString(key, value);
return this;
}

/**
* Set an arbitrary property for the PulsarSource and PulsarConsumer. The valid keys can be
* found in {@link PulsarSourceOptions}.
* <p>Make sure the option could be set only once or with same value.
*
* @param key the key of the property.
* @param value the value of the property.
* @return this PulsarSourceBuilder.
*/
public <T> PulsarSourceBuilder<OUT> setProperty(ConfigOption<T> key, T value) {
public <T> PulsarSourceBuilder<OUT> setConfig(ConfigOption<T> key, T value) {
checkNotNull(key);
checkNotNull(value);
if (configuration.contains(key)) {
T oldValue = configuration.get(key);
LOG.warn(
"Config option {} already has a value {}, override to new value {}",
key,
oldValue,
value);
checkArgument(
Objects.equals(oldValue, value),
"This option %s has been already set to value %s.",
key.key(),
oldValue);
} else {
configuration.set(key, value);
}
configuration.set(key, value);
return this;
}

/**
* Set arbitrary properties for the PulsarSource and PulsarConsumer. The valid keys can be found
* in {@link PulsarSourceOptions}.
* in {@link PulsarSourceOptions} and {@link PulsarOptions}.
*
* @param properties the properties to set for the PulsarSource.
* @param config the config to set for the PulsarSource.
* @return this PulsarSourceBuilder.
*/
public PulsarSourceBuilder<OUT> setProperties(Properties properties) {
for (String name : properties.stringPropertyNames()) {
String value = properties.getProperty(name);
if (value != null) {
setProperty(name, value);
public PulsarSourceBuilder<OUT> setConfig(Configuration config) {
Map<String, String> existedConfigs = configuration.toMap();
List<String> duplicatedKeys = new ArrayList<>();
for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
String key = entry.getKey();
String value2 = existedConfigs.get(key);
if (value2 != null && !value2.equals(entry.getValue())) {
duplicatedKeys.add(key);
}
}
checkArgument(
duplicatedKeys.isEmpty(),
"Invalid configuration, these keys %s are already exist with different config value.",
duplicatedKeys);
configuration.addAll(config);
return this;
}

Expand All @@ -431,31 +418,10 @@ public PulsarSource<OUT> build() {
// Check builder configuration.
checkConfigurations(configuration);

SubscriptionType subscriptionType = configuration.get(PULSAR_SUBSCRIPTION_TYPE);

// Ensure the topic subscriber for pulsar.
if (subscriber == null) {
if (configuration.contains(PULSAR_TOPIC_NAMES)) {
List<String> topics = configuration.get(PULSAR_TOPIC_NAMES);
this.subscriber = PulsarSubscriber.getTopicListSubscriber(topics);
} else if (configuration.contains(PULSAR_TOPICS_PATTERN)) {
String topicPatternStr = configuration.get(PULSAR_TOPICS_PATTERN);
Pattern topicPattern = Pattern.compile(topicPatternStr);
RegexSubscriptionMode regexSubscriptionMode =
configuration.get(PULSAR_REGEX_SUBSCRIPTION_MODE);

this.subscriber =
PulsarSubscriber.getTopicPatternSubscriber(
topicPattern, regexSubscriptionMode);
} else {
throw new IllegalArgumentException(
"No "
+ PULSAR_TOPIC_NAMES.key()
+ " or "
+ PULSAR_TOPICS_PATTERN.key()
+ " was provided.");
}
}
checkNotNull(subscriber, "No topic names or topic pattern are provided.");

SubscriptionType subscriptionType = configuration.get(PULSAR_SUBSCRIPTION_TYPE);
if (subscriptionType == SubscriptionType.Key_Shared) {
if (rangeGenerator == null) {
LOG.warn(
Expand Down Expand Up @@ -523,20 +489,6 @@ public PulsarSource<OUT> build() {

// ------------- private helpers --------------

/** Make sure this option is set only once or with same value. */
private <T> void setConfiguration(ConfigOption<T> option, T value) {
if (configuration.contains(option)) {
T oldValue = configuration.get(option);
checkArgument(
Objects.equals(oldValue, value),
"This option %s has been set to value %s.",
option.key(),
oldValue);
} else {
configuration.set(option, value);
}
}

/** Helper method for java compiler recognize the generic type. */
@SuppressWarnings("unchecked")
private <T extends OUT> PulsarSourceBuilder<T> specialized() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@
import org.apache.flink.connector.pulsar.source.config.CursorVerification;

import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

Expand All @@ -42,8 +40,8 @@

/**
* Configurations for PulsarSource. All the options list here could be configured in {@link
* PulsarSourceBuilder#setProperty(ConfigOption, Object)}. The {@link PulsarOptions} is also
* required for pulsar source.
* PulsarSourceBuilder#setConfig(ConfigOption, Object)}. The {@link PulsarOptions} is also required
* for pulsar source.
*
* @see PulsarOptions
*/
Expand Down Expand Up @@ -161,19 +159,6 @@ private PulsarSourceOptions() {
//
///////////////////////////////////////////////////////////////////////////////

public static final ConfigOption<List<String>> PULSAR_TOPIC_NAMES =
ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "topicNames")
.stringType()
.asList()
.defaultValues()
.withDescription("Topic name.");

public static final ConfigOption<String> PULSAR_TOPICS_PATTERN =
ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "topicsPattern")
.stringType()
.noDefaultValue()
.withDescription("Topic pattern.");

public static final ConfigOption<String> PULSAR_SUBSCRIPTION_NAME =
ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "subscriptionName")
.stringType()
Expand Down Expand Up @@ -414,35 +399,6 @@ private PulsarSourceOptions() {
.withDescription(
"Initial position at which to set cursor when subscribing to a topic at first time.");

public static final ConfigOption<Integer> PULSAR_PATTERN_AUTO_DISCOVERY_PERIOD =
ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "patternAutoDiscoveryPeriod")
.intType()
.defaultValue(60)
.withDescription(
Description.builder()
.text(
"Topic auto discovery period when using a pattern for topic's consumer.")
.linebreak()
.text("The default and minimum value is 1 minute.")
.build());

public static final ConfigOption<RegexSubscriptionMode> PULSAR_REGEX_SUBSCRIPTION_MODE =
ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "regexSubscriptionMode")
.enumType(RegexSubscriptionMode.class)
.defaultValue(RegexSubscriptionMode.PersistentOnly)
.withDescription(
Description.builder()
.text(
"When subscribing to a topic using a regular expression, you can pick a certain type of topics.")
.list(
text(
"PersistentOnly: only subscribe to persistent topics."),
text(
"NonPersistentOnly: only subscribe to non-persistent topics."),
text(
"AllTopics: subscribe to both persistent and non-persistent topics."))
.build());

// The config set for DeadLetterPolicy

/**
Expand Down

0 comments on commit 0fc9008

Please sign in to comment.