Skip to content

Commit

Permalink
Add Client auth plugin and tls support for function to connect with b…
Browse files Browse the repository at this point in the history
…roker (apache#1935)

* Add Client auth plugin and tls support for function to connect with broker

* add authConfig builder

* add hostnameverification and tlsCertPath

* add broker-tls url on worker

* take string type for boolean data-type
  • Loading branch information
rdhabalia authored Jun 8, 2018
1 parent 1950538 commit 6e336b4
Show file tree
Hide file tree
Showing 18 changed files with 375 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io;

import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.mockito.Mockito.spy;

Expand All @@ -27,6 +28,8 @@
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand All @@ -40,14 +43,17 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
Expand Down Expand Up @@ -87,7 +93,7 @@ public class PulsarSinkE2ETest {

ServiceConfiguration config;
WorkerConfig workerConfig;
URL url;
URL urlTls;
PulsarService pulsar;
PulsarAdmin admin;
PulsarClient pulsarClient;
Expand All @@ -102,8 +108,16 @@ public class PulsarSinkE2ETest {

private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
private final int brokerWebServicePort = PortManager.nextFreePort();
private final int brokerWebServiceTlsPort = PortManager.nextFreePort();
private final int brokerServicePort = PortManager.nextFreePort();
private final int brokerServiceTlsPort = PortManager.nextFreePort();
private final int workerServicePort = PortManager.nextFreePort();

private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";

private static final Logger log = LoggerFactory.getLogger(PulsarSinkE2ETest.class);

@BeforeMethod
Expand All @@ -118,31 +132,67 @@ void setup(Method method) throws Exception {
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
bkEnsemble.start();

String hostHttpUrl = "http://127.0.0.1" + ":";
String brokerServiceUrl = "https://127.0.0.1:" + brokerWebServiceTlsPort;

config = spy(new ServiceConfiguration());
config.setClusterName("use");
Set<String> superUsers = Sets.newHashSet("superUser");
config.setSuperUserRoles(superUsers);
config.setWebServicePort(brokerWebServicePort);
config.setWebServicePortTls(brokerWebServiceTlsPort);
config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
config.setBrokerServicePort(brokerServicePort);
config.setBrokerServicePortTls(brokerServiceTlsPort);
config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());


Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderTls.class.getName());
config.setAuthenticationEnabled(true);
config.setAuthenticationProviders(providers);
config.setTlsEnabled(true);
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config.setTlsAllowInsecureConnection(true);


functionsWorkerService = createPulsarFunctionWorker(config);
url = new URL(hostHttpUrl + brokerWebServicePort);
urlTls = new URL(brokerServiceUrl);
boolean isFunctionWebServerRequired = method.getName()
.equals("testExternalReplicatorRedirectionToWorkerService");
Optional<WorkerService> functionWorkerService = isFunctionWebServerRequired ? Optional.ofNullable(null)
: Optional.of(functionsWorkerService);
pulsar = new PulsarService(config, functionWorkerService);
pulsar.start();
admin = new PulsarAdmin(url, (Authentication) null);

Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
Authentication authTls = new AuthenticationTls();
authTls.configure(authParams);

admin = spy(
PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).tlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH)
.allowTlsInsecureConnection(true).authentication(authTls).build());

brokerStatsClient = admin.brokerStats();
primaryHost = String.format("http://%s:%d", InetAddress.getLocalHost().getHostName(), brokerWebServicePort);

// update cluster metadata
ClusterData clusterData = new ClusterData(url.toString());
ClusterData clusterData = new ClusterData(urlTls.toString());
admin.clusters().updateCluster(config.getClusterName(), clusterData);

pulsarClient = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0, TimeUnit.SECONDS).build();
ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
if (isNotBlank(workerConfig.getClientAuthenticationPlugin())
&& isNotBlank(workerConfig.getClientAuthenticationParameters())) {
clientBuilder.enableTls(workerConfig.isUseTls());
clientBuilder.allowTlsInsecureConnection(workerConfig.isTlsAllowInsecureConnection());
clientBuilder.authentication(workerConfig.getClientAuthenticationPlugin(),
workerConfig.getClientAuthenticationParameters());
}
pulsarClient = clientBuilder.build();
// pulsarClient = PulsarClient.builder().serviceUrl(urlTls.toString()).statsInterval(0,
// TimeUnit.SECONDS).build();

TenantInfo propAdmin = new TenantInfo();
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
Expand All @@ -166,6 +216,7 @@ void shutdown() throws Exception {
if (workerExecutor != null) {
workerExecutor.shutdown();
}
pulsarClient.close();
admin.close();
pulsar.close();
functionsWorkerService.stop();
Expand All @@ -179,8 +230,8 @@ private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
// worker talks to local broker
workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort());
workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + config.getWebServicePort());
workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls());
workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePortTls());
workerConfig.setFailureCheckFreqMs(100);
workerConfig.setNumFunctionPackageReplicas(1);
workerConfig.setClusterCoordinationTopicName("coordinate");
Expand All @@ -193,11 +244,19 @@ private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
workerConfig.setWorkerHostname(hostname);
workerConfig
.setWorkerId("c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort());

workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName());
workerConfig.setClientAuthenticationParameters(
String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH));
workerConfig.setUseTls(true);
workerConfig.setTlsAllowInsecureConnection(true);
workerConfig.setTlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH);

return new WorkerService(workerConfig);
}

/**
* Validates pulsar sink e2e functionality on functions.
* Validates pulsar sink e2e functionality on functions.
*
* @throws Exception
*/
Expand Down Expand Up @@ -236,17 +295,17 @@ public void testE2EPulsarSink() throws Exception {
}
retryStrategically((test) -> {
try {
SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.values()
.iterator().next();
SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.values().iterator()
.next();
return subStats.unackedMessages == 0;
} catch (PulsarAdminException e) {
return false;
}
}, 5, 150);
// validate pulsar-sink consumer has consumed all messages and delivered to Pulsar sink but unacked messages
// due to publish failure
Assert.assertEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator()
.next().unackedMessages, 0);
Assert.assertEquals(
admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages, 0);

}

Expand Down Expand Up @@ -281,7 +340,7 @@ protected FunctionDetails createSinkConfig(String jarFile, String tenant, String

// set up sink spec
SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
//sinkSpecBuilder.setClassName(PulsarSink.class.getName());
// sinkSpecBuilder.setClassName(PulsarSink.class.getName());
sinkSpecBuilder.setTopic(String.format("persistent://%s/%s/%s", tenant, namespace, "output"));
Map<String, Object> sinkConfigMap = Maps.newHashMap();
sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfigMap));
Expand All @@ -290,4 +349,4 @@ protected FunctionDetails createSinkConfig(String jarFile, String tenant, String

return functionDetailsBuilder.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.internal.FunctionsImpl;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.Resources;
Expand Down Expand Up @@ -642,11 +644,34 @@ class LocalRunner extends FunctionDetailsCommand {

@Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker")
protected String brokerServiceUrl;

@Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which function-process can connect to broker")
protected String clientAuthPlugin;

@Parameter(names = "--clientAuthParams", description = "Client authentication param")
protected String clientAuthParams;

@Parameter(names = "--use_tls", description = "Use tls connection\n")
protected boolean useTls;

@Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n")
protected boolean tlsAllowInsecureConnection;

@Parameter(names = "--hostname_verification_enabled", description = "Enable hostname verification")
protected boolean tlsHostNameVerificationEnabled;

@Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path")
protected String tlsTrustCertFilePath;

@Override
void runCmd() throws Exception {
CmdFunctions.startLocalRun(convertProto2(functionConfig),
functionConfig.getParallelism(), brokerServiceUrl, userCodeFile, admin);
CmdFunctions.startLocalRun(convertProto2(functionConfig), functionConfig.getParallelism(), brokerServiceUrl,
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
.clientAuthenticationParameters(clientAuthParams).useTls(useTls)
.tlsAllowInsecureConnection(tlsAllowInsecureConnection)
.tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled)
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
userCodeFile, admin);
}
}

Expand Down Expand Up @@ -911,7 +936,8 @@ private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig functio
}

protected static void startLocalRun(org.apache.pulsar.functions.proto.Function.FunctionDetails functionDetails,
int parallelism, String brokerServiceUrl, String userCodeFile, PulsarAdmin admin)
int parallelism, String brokerServiceUrl, AuthenticationConfig authConfig,
String userCodeFile, PulsarAdmin admin)
throws Exception {

String serviceUrl = admin.getServiceUrl();
Expand All @@ -921,8 +947,8 @@ protected static void startLocalRun(org.apache.pulsar.functions.proto.Function.F
if (serviceUrl == null) {
serviceUrl = DEFAULT_SERVICE_URL;
}
try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(
serviceUrl, null, null, null)) {
try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(serviceUrl, authConfig, null, null,
null)) {
List<RuntimeSpawner> spawners = new LinkedList<>();
for (int i = 0; i < parallelism; ++i) {
InstanceConfig instanceConfig = new InstanceConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.internal.FunctionsImpl;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.Resources;
Expand Down Expand Up @@ -99,11 +100,35 @@ class LocalSinkRunner extends CreateSink {

@Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker")
protected String brokerServiceUrl;

@Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which function-process can connect to broker")
protected String clientAuthPlugin;

@Parameter(names = "--clientAuthParams", description = "Client authentication param")
protected String clientAuthParams;

@Parameter(names = "--use_tls", description = "Use tls connection\n")
protected boolean useTls;

@Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n")
protected boolean tlsAllowInsecureConnection;

@Parameter(names = "--hostname_verification_enabled", description = "Enable hostname verification")
protected boolean tlsHostNameVerificationEnabled;

@Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path")
protected String tlsTrustCertFilePath;

@Override
void runCmd() throws Exception {
CmdFunctions.startLocalRun(createSinkConfigProto2(sinkConfig),
sinkConfig.getParallelism(), brokerServiceUrl, jarFile, admin);
CmdFunctions.startLocalRun(createSinkConfigProto2(sinkConfig), sinkConfig.getParallelism(),
brokerServiceUrl,
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
.clientAuthenticationParameters(clientAuthParams).useTls(useTls)
.tlsAllowInsecureConnection(tlsAllowInsecureConnection)
.tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled)
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
jarFile, admin);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.internal.FunctionsImpl;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.Resources;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
Expand Down Expand Up @@ -95,11 +96,35 @@ class LocalSourceRunner extends CreateSource {

@Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker")
protected String brokerServiceUrl;

@Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which function-process can connect to broker")
protected String clientAuthPlugin;

@Parameter(names = "--clientAuthParams", description = "Client authentication param")
protected String clientAuthParams;

@Parameter(names = "--use_tls", description = "Use tls connection\n")
protected boolean useTls;

@Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n")
protected boolean tlsAllowInsecureConnection;

@Parameter(names = "--hostname_verification_enabled", description = "Enable hostname verification")
protected boolean tlsHostNameVerificationEnabled;

@Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path")
protected String tlsTrustCertFilePath;

@Override
void runCmd() throws Exception {
CmdFunctions.startLocalRun(createSourceConfigProto2(sourceConfig),
sourceConfig.getParallelism(), brokerServiceUrl, jarFile, admin);
CmdFunctions.startLocalRun(createSourceConfigProto2(sourceConfig), sourceConfig.getParallelism(),
brokerServiceUrl,
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
.clientAuthenticationParameters(clientAuthParams).useTls(useTls)
.tlsAllowInsecureConnection(tlsAllowInsecureConnection)
.tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled)
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
jarFile, admin);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class ClientConfigurationData implements Serializable, Cloneable {
private int maxLookupRequest = 50000;
private int maxNumberOfRejectedRequestPerConnection = 50;
private int keepAliveIntervalSeconds = 30;

public ClientConfigurationData clone() {
try {
return (ClientConfigurationData) super.clone();
Expand Down
Loading

0 comments on commit 6e336b4

Please sign in to comment.