Skip to content

Commit

Permalink
[FLINK-29433][Connector/Pulsar] Support Auth through the builder patt…
Browse files Browse the repository at this point in the history
…ern in Pulsar connector. (apache#21071)
  • Loading branch information
syhily authored Oct 18, 2022
1 parent 2bab5ad commit 25c4297
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_NAME;
Expand Down Expand Up @@ -243,6 +247,35 @@ public PulsarSinkBuilder<IN> delaySendingMessage(MessageDelayer<IN> messageDelay
return this;
}

/**
* Configure the authentication provider to use in the Pulsar client instance.
*
* @param authPluginClassName name of the Authentication-Plugin you want to use
* @param authParamsString string which represents parameters for the Authentication-Plugin,
* e.g., "key1:val1,key2:val2"
* @return this PulsarSinkBuilder.
*/
public PulsarSinkBuilder<IN> setAuthentication(
String authPluginClassName, String authParamsString) {
configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName);
configBuilder.set(PULSAR_AUTH_PARAMS, authParamsString);
return this;
}

/**
* Configure the authentication provider to use in the Pulsar client instance.
*
* @param authPluginClassName name of the Authentication-Plugin you want to use
* @param authParams map which represents parameters for the Authentication-Plugin
* @return this PulsarSinkBuilder.
*/
public PulsarSinkBuilder<IN> setAuthentication(
String authPluginClassName, Map<String, String> authParams) {
configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName);
configBuilder.set(PULSAR_AUTH_PARAM_MAP, authParams);
return this;
}

/**
* Set an arbitrary property for the PulsarSink and Pulsar Producer. The valid keys can be found
* in {@link PulsarSinkOptions} and {@link PulsarOptions}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,15 @@

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;

import static java.lang.Boolean.FALSE;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CONSUMER_NAME;
Expand Down Expand Up @@ -369,6 +373,35 @@ public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema(
return self;
}

/**
* Configure the authentication provider to use in the Pulsar client instance.
*
* @param authPluginClassName name of the Authentication-Plugin you want to use
* @param authParamsString string which represents parameters for the Authentication-Plugin,
* e.g., "key1:val1,key2:val2"
* @return this PulsarSourceBuilder.
*/
public PulsarSourceBuilder<OUT> setAuthentication(
String authPluginClassName, String authParamsString) {
configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName);
configBuilder.set(PULSAR_AUTH_PARAMS, authParamsString);
return this;
}

/**
* Configure the authentication provider to use in the Pulsar client instance.
*
* @param authPluginClassName name of the Authentication-Plugin you want to use
* @param authParams map which represents parameters for the Authentication-Plugin
* @return this PulsarSourceBuilder.
*/
public PulsarSourceBuilder<OUT> setAuthentication(
String authPluginClassName, Map<String, String> authParams) {
configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName);
configBuilder.set(PULSAR_AUTH_PARAM_MAP, authParams);
return this;
}

/**
* Set an arbitrary property for the PulsarSource and Pulsar Consumer. The valid keys can be
* found in {@link PulsarSourceOptions} and {@link PulsarOptions}.
Expand Down

0 comments on commit 25c4297

Please sign in to comment.