Skip to content

Commit

Permalink
[Issue 4283][flink] construct auth when building pulsar source (apach…
Browse files Browse the repository at this point in the history
…e#4284)

Fixes apache#4283

### Motivation

pulsar flink connector now uses ClientConfigData to instantiate pulsar client. Pulsar client composes Authentication which can not be serialized. In an environment with Auth there is no way to set auth in pulsar client.

### Modifications

Keep auth params away from persistence and serialization. Construct auth when building pulsar source
  • Loading branch information
shiv4289 authored and sijie committed May 18, 2019
1 parent ce685dc commit 353ca73
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,18 @@
*/
package org.apache.pulsar.client.impl.conf;

import java.io.Serializable;
import java.util.concurrent.TimeUnit;

import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ServiceUrlProvider;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;

import com.fasterxml.jackson.annotation.JsonIgnore;

import lombok.Data;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* This is a simple holder of the client configuration values.
Expand All @@ -44,10 +43,15 @@ public class ClientConfigurationData implements Serializable, Cloneable {

private String serviceUrl;
@JsonIgnore
private ServiceUrlProvider serviceUrlProvider;
private transient ServiceUrlProvider serviceUrlProvider;

@JsonIgnore
private Authentication authentication = new AuthenticationDisabled();
private transient Authentication authentication = new AuthenticationDisabled();
@JsonIgnore
private transient String authPluginClassName;
@JsonIgnore
private transient Map<String, String> authParams;

private long operationTimeoutMs = 30000;
private long statsIntervalSeconds = 60;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.apache.flink.util.Preconditions.checkNotNull;

import java.util.function.Function;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.RuntimeContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;

Expand Down Expand Up @@ -268,7 +269,7 @@ public PulsarSourceBuilder<T> pulsarAllConsumerConf(ConsumerConfigurationData co
}


public SourceFunction<T> build() {
public SourceFunction<T> build() throws PulsarClientException{
Preconditions.checkArgument(StringUtils.isNotBlank(this.clientConfigurationData.getServiceUrl()),
"a service url is required");
Preconditions.checkArgument((this.consumerConfigurationData.getTopicNames() != null &&
Expand All @@ -277,9 +278,27 @@ public SourceFunction<T> build() {
"At least one topic or topics pattern is required");
Preconditions.checkArgument(StringUtils.isNotBlank(this.consumerConfigurationData.getSubscriptionName()),
"a subscription name is required");

setTransientFields();

return new PulsarConsumerSource<>(this);
}

private void setTransientFields() throws PulsarClientException {
setAuth();
}

private void setAuth() throws PulsarClientException{
if (StringUtils.isBlank(this.clientConfigurationData.getAuthPluginClassName())
&& this.clientConfigurationData.getAuthParams() == null || this.clientConfigurationData.getAuthParams().isEmpty())
return;

clientConfigurationData.setAuthentication(
AuthenticationFactory.create(
this.clientConfigurationData.getAuthPluginClassName(),
this.clientConfigurationData.getAuthParams()));
}

/**
* Creates a PulsarSourceBuilder.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
Expand All @@ -47,7 +48,7 @@ public void before() {
}

@Test
public void testBuild() {
public void testBuild() throws PulsarClientException {
SourceFunction sourceFunction = pulsarSourceBuilder
.serviceUrl("testServiceUrl")
.topic("testTopic")
Expand All @@ -59,7 +60,7 @@ public void testBuild() {


@Test
public void testBuildWithConfPojo() {
public void testBuildWithConfPojo() throws PulsarClientException {
ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build();
ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
.topicNames(new HashSet<>(Arrays.asList("testTopic")))
Expand All @@ -74,7 +75,7 @@ public void testBuildWithConfPojo() {
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testBuildWithoutSettingRequiredProperties() {
public void testBuildWithoutSettingRequiredProperties() throws PulsarClientException {
pulsarSourceBuilder.build();
}

Expand Down Expand Up @@ -158,7 +159,7 @@ public TypeInformation<T> getProducedType() {
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testServiceUrlNullWithConfPojo() {
public void testServiceUrlNullWithConfPojo() throws PulsarClientException {
ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl(null).build();
ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
.topicNames(new HashSet<String>(Arrays.asList("testServiceUrl")))
Expand All @@ -172,7 +173,7 @@ public void testServiceUrlNullWithConfPojo() {
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testServiceUrlWithBlankWithConfPojo() {
public void testServiceUrlWithBlankWithConfPojo() throws PulsarClientException {
ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl(StringUtils.EMPTY).build();
ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
.topicNames(new HashSet<String>(Arrays.asList("testTopic")))
Expand All @@ -186,7 +187,7 @@ public void testServiceUrlWithBlankWithConfPojo() {
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testTopicPatternWithNullWithConfPojo() {
public void testTopicPatternWithNullWithConfPojo() throws PulsarClientException {
ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build();
ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
.topicsPattern(null)
Expand All @@ -200,7 +201,7 @@ public void testTopicPatternWithNullWithConfPojo() {
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testSubscriptionNameWithNullWithConfPojo() {
public void testSubscriptionNameWithNullWithConfPojo() throws PulsarClientException {
ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build();
ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
.topicNames(new HashSet<String>(Arrays.asList("testTopic")))
Expand All @@ -214,7 +215,7 @@ public void testSubscriptionNameWithNullWithConfPojo() {
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testSubscriptionNameWithBlankWithConfPojo() {
public void testSubscriptionNameWithBlankWithConfPojo() throws PulsarClientException {
pulsarSourceBuilder.topic(null);
ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build();
ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
Expand All @@ -229,7 +230,7 @@ public void testSubscriptionNameWithBlankWithConfPojo() {
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testSubscriptionInitialPositionWithConfPojo() {
public void testSubscriptionInitialPositionWithConfPojo() throws PulsarClientException {
pulsarSourceBuilder.topic(null);
ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build();
ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
Expand Down

0 comments on commit 353ca73

Please sign in to comment.