Skip to content

Commit

Permalink
[Issue 4379] [Java Client] Build auth from class and params in Pulsar…
Browse files Browse the repository at this point in the history
…ClientImpl (apache#4381)

* Flink client to accept all pulsar client conf

In this patch, we provide handles for flink connecotr to accept ClientConfigurationData, ProducerConfigurationData, ConsumerConfigurationData so flink client can:
1. accept all params of client, producer and consumer
2. Keep pace with pulsar-client

* Flink client to accept all pulsar client conf

Added test cases

* Removing commented code

* flink: construct auth when building pulsarsource

* fixed failing tests

* removed unused import

* Added builder defaults for lombok builder
Set Auth from class and params (if set) in PulsarClientImpl.java

* Remove @BUilder.default from attributes where no defaults exist

* Added tests for ClientConfiguration Data builders

* cosmetic changes in code

Co-Authored-By: Sijie Guo <[email protected]>

* fixing typo

* Removed test, not true anymore

* Removed lombok builders

* fixed the failing tests

* Because the authentication field is transient, it is not serialized. On desirialization then its null and desirialization crashes with NPE
  • Loading branch information
shiv4289 authored and merlimat committed Jun 6, 2019
1 parent 1473bda commit 17d2b42
Show file tree
Hide file tree
Showing 14 changed files with 204 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Producer;
Expand All @@ -57,6 +58,7 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
Expand Down Expand Up @@ -127,6 +129,7 @@ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGr
throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
}
this.eventLoopGroup = eventLoopGroup;
setAuth(conf);
this.conf = conf;
conf.getAuthentication().start();
this.cnxPool = cnxPool;
Expand All @@ -142,6 +145,14 @@ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGr
state.set(State.Open);
}

private void setAuth(ClientConfigurationData conf) throws PulsarClientException {
if (StringUtils.isBlank(conf.getAuthPluginClassName()) || StringUtils.isBlank( conf.getAuthParams())) {
return;
}

conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(), conf.getAuthParams()));
}

public ClientConfigurationData getConfiguration() {
return conf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;

import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* This is a simple holder of the client configuration values.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ClientConfigurationData implements Serializable, Cloneable {
Expand All @@ -47,10 +45,8 @@ public class ClientConfigurationData implements Serializable, Cloneable {

@JsonIgnore
private transient Authentication authentication = new AuthenticationDisabled();
@JsonIgnore
private transient String authPluginClassName;
@JsonIgnore
private transient Map<String, String> authParams;
private String authPluginClassName;
private String authParams;

private long operationTimeoutMs = 30000;
private long statsIntervalSeconds = 60;
Expand All @@ -74,6 +70,13 @@ public class ClientConfigurationData implements Serializable, Cloneable {
private long defaultBackoffIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100);
private long maxBackoffIntervalNanos = TimeUnit.SECONDS.toNanos(30);

public Authentication getAuthentication() {
if (authentication == null) {
this.authentication = new AuthenticationDisabled();
}
return authentication;
}

public boolean isUseTls() {
if (useTls)
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.pulsar.client.api.SubscriptionType;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import lombok.Data;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ProducerConfigurationData implements Serializable, Cloneable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
import java.util.Collections;

import org.apache.commons.io.FileUtils;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.testng.annotations.Test;

public class AuthenticationTokenTest {
Expand All @@ -54,6 +57,33 @@ public void testAuthToken() throws Exception {
authToken.close();
}

@Test
public void testAuthTokenClientConfig() throws Exception {
ClientConfigurationData clientConfig = new ClientConfigurationData();
clientConfig.setServiceUrl("pulsar://service-url");
clientConfig.setAuthPluginClassName(AuthenticationToken.class.getName());
clientConfig.setAuthParams("token-xyz");

PulsarClientImpl pulsarClient = new PulsarClientImpl(clientConfig);

Authentication authToken = pulsarClient.getConfiguration().getAuthentication();
assertEquals(authToken.getAuthMethodName(), "token");

AuthenticationDataProvider authData = authToken.getAuthData();
assertTrue(authData.hasDataFromCommand());
assertEquals(authData.getCommandData(), "token-xyz");

assertFalse(authData.hasDataForTls());
assertNull(authData.getTlsCertificates());
assertNull(authData.getTlsPrivateKey());

assertTrue(authData.hasDataForHttp());
assertEquals(authData.getHttpHeaders(),
Collections.singletonMap("Authorization", "Bearer token-xyz").entrySet());

authToken.close();
}

@Test
public void testAuthTokenConfig() throws Exception {
AuthenticationToken authToken = new AuthenticationToken();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.testng.annotations.Test;

/**
Expand Down Expand Up @@ -108,4 +111,19 @@ public void testLoadConfigurationDataWithUnknownFields() {
assertTrue(re.getCause() instanceof IOException);
}
}

@Test
public void testConfigBuilder() throws PulsarClientException {
ClientConfigurationData clientConfig = new ClientConfigurationData();
clientConfig.setServiceUrl("pulsar://unknown:6650");
clientConfig.setStatsIntervalSeconds(80);

PulsarClientImpl pulsarClient = new PulsarClientImpl(clientConfig);
assertTrue(pulsarClient != null, "Pulsar client built using config should not be null");

assertTrue(pulsarClient.getConfiguration().getServiceUrl().equals("pulsar://unknown:6650"));
assertEquals(pulsarClient.getConfiguration().getNumListenerThreads(), 1, "builder default not set properly");
assertEquals(pulsarClient.getConfiguration().getStatsIntervalSeconds(), 80,
"builder default should overrite if set explicitly");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ private void setTransientFields() throws PulsarClientException {

private void setAuth() throws PulsarClientException{
if (StringUtils.isBlank(this.clientConfigurationData.getAuthPluginClassName())
&& this.clientConfigurationData.getAuthParams() == null || this.clientConfigurationData.getAuthParams().isEmpty())
|| StringUtils.isBlank(this.clientConfigurationData.getAuthParams()))
return;

clientConfigurationData.setAuthentication(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,65 +61,55 @@ public void testPulsarAvroOutputFormatConstructor() {

@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarAvroOutputFormatConstructorV2WhenServiceUrlIsNull() {
ClientConfigurationData clientConf = ClientConfigurationData.builder()
.serviceUrl(null)
.build();
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setServiceUrl(null);

ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
.topicName("testTopic")
.build();
ProducerConfigurationData producerConf = new ProducerConfigurationData();
producerConf.setTopicName("testTopic");

new PulsarAvroOutputFormat(clientConf, producerConf);
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarAvroOutputFormatConstructorV2WhenTopicNameIsNull() {
ClientConfigurationData clientConf = ClientConfigurationData.builder()
.serviceUrl("testServiceUrl")
.build();
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setServiceUrl("testServiceUrl");

ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
.topicName(null)
.build();
ProducerConfigurationData producerConf = new ProducerConfigurationData();
producerConf.setTopicName(null);

new PulsarAvroOutputFormat(clientConf, producerConf);
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarAvroOutputFormatConstructorV2WhenTopicNameIsBlank() {
ClientConfigurationData clientConf = ClientConfigurationData.builder()
.serviceUrl("testServiceUrl")
.build();
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setServiceUrl("testServiceUrl");

ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
.topicName(StringUtils.EMPTY)
.build();
ProducerConfigurationData producerConf = new ProducerConfigurationData();
producerConf.setTopicName(StringUtils.EMPTY);

new PulsarAvroOutputFormat(clientConf, producerConf);
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarAvroOutputFormatConstructorV2WhenServiceUrlIsBlank() {
ClientConfigurationData clientConf = ClientConfigurationData.builder()
.serviceUrl(StringUtils.EMPTY)
.build();
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setServiceUrl(StringUtils.EMPTY);

ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
.topicName(StringUtils.EMPTY)
.build();
ProducerConfigurationData producerConf = new ProducerConfigurationData();
producerConf.setTopicName(StringUtils.EMPTY);

new PulsarAvroOutputFormat(clientConf, producerConf);
}

@Test
public void testPulsarAvroOutputFormatConstructorV2() {
ClientConfigurationData clientConf = ClientConfigurationData.builder()
.serviceUrl("testServiceUrl")
.build();
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setServiceUrl("testServiceUrl");

ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
.topicName("testTopic")
.build();
ProducerConfigurationData producerConf = new ProducerConfigurationData();
producerConf.setTopicName("testTopic");

PulsarAvroOutputFormat pulsarAvroOutputFormat = new PulsarAvroOutputFormat(clientConf, producerConf);
assertNotNull(pulsarAvroOutputFormat);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,65 +60,55 @@ public void testPulsarCsvOutputFormatConstructor() {

@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarCsvOutputFormatConstructorV2WhenServiceUrlIsNull() {
ClientConfigurationData clientConf = ClientConfigurationData.builder()
.serviceUrl(null)
.build();
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setServiceUrl(null);

ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
.topicName("testTopic")
.build();
ProducerConfigurationData producerConf = new ProducerConfigurationData();
producerConf.setTopicName("testTopic");

new PulsarAvroOutputFormat(clientConf, producerConf);
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarCsvOutputFormatConstructorV2WhenTopicNameIsNull() {
ClientConfigurationData clientConf = ClientConfigurationData.builder()
.serviceUrl("testServiceUrl")
.build();
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setServiceUrl("testServiceUrl");

ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
.topicName(null)
.build();
ProducerConfigurationData producerConf = new ProducerConfigurationData();
producerConf.setTopicName(null);

new PulsarAvroOutputFormat(clientConf, producerConf);
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarCsvOutputFormatConstructorV2WhenTopicNameIsBlank() {
ClientConfigurationData clientConf = ClientConfigurationData.builder()
.serviceUrl("testServiceUrl")
.build();
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setServiceUrl("testServiceUrl");

ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
.topicName(StringUtils.EMPTY)
.build();
ProducerConfigurationData producerConf = new ProducerConfigurationData();
producerConf.setTopicName(StringUtils.EMPTY);

new PulsarAvroOutputFormat(clientConf, producerConf);
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarCsvOutputFormatConstructorV2WhenServiceUrlIsBlank() {
ClientConfigurationData clientConf = ClientConfigurationData.builder()
.serviceUrl(StringUtils.EMPTY)
.build();
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setServiceUrl(StringUtils.EMPTY);

ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
.topicName("testTopic")
.build();
ProducerConfigurationData producerConf = new ProducerConfigurationData();
producerConf.setTopicName("testTopic");

new PulsarAvroOutputFormat(clientConf, producerConf);
}

@Test
public void testPulsarCsvOutputFormatConstructorV2() {
ClientConfigurationData clientConf = ClientConfigurationData.builder()
.serviceUrl("testServiceUrl")
.build();
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setServiceUrl("testServiceUrl");

ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
.topicName("testTopic")
.build();
ProducerConfigurationData producerConf = new ProducerConfigurationData();
producerConf.setTopicName("testTopic");

PulsarCsvOutputFormat pulsarCsvOutputFormat = new PulsarCsvOutputFormat(clientConf, producerConf);
assertNotNull(pulsarCsvOutputFormat);
Expand Down
Loading

0 comments on commit 17d2b42

Please sign in to comment.