Skip to content

Commit

Permalink
Issue 3655: Kerberos authentication for proxy (apache#3997)
Browse files Browse the repository at this point in the history
Fixes apache#3655

Master Issue: apache#3491

### Motivation
add support of Kerberos authentication for proxy

### Modifications
add support of Kerberos authentication for proxy ;
add unit test.

### Verifying this change
Ut passed
  • Loading branch information
jiazhai authored and sijie committed Apr 9, 2019
1 parent f229d3b commit 66a1b1b
Show file tree
Hide file tree
Showing 14 changed files with 558 additions and 118 deletions.
7 changes: 7 additions & 0 deletions pulsar-broker-auth-sasl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@
<type>test-jar</type>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-proxy</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>managed-ledger-original</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.pulsar.broker.authentication;

import static org.apache.pulsar.common.sasl.SaslConstants.JAAS_BROKER_SECTION_NAME;
import static org.apache.pulsar.common.sasl.SaslConstants.JAAS_SERVER_SECTION_NAME;
import static org.apache.pulsar.common.sasl.SaslConstants.JAAS_CLIENT_ALLOWED_IDS;
import static org.apache.pulsar.common.sasl.SaslConstants.KINIT_COMMAND;

Expand Down Expand Up @@ -53,7 +53,7 @@ public void initialize(ServiceConfiguration config) throws IOException {
this.configuration = Maps.newHashMap();
final String allowedIdsPatternRegExp = config.getSaslJaasClientAllowedIds();
configuration.put(JAAS_CLIENT_ALLOWED_IDS, allowedIdsPatternRegExp);
configuration.put(JAAS_BROKER_SECTION_NAME, config.getSaslJaasBrokerSectionName());
configuration.put(JAAS_SERVER_SECTION_NAME, config.getSaslJaasServerSectionName());
configuration.put(KINIT_COMMAND, config.getKinitCommand());

try {
Expand All @@ -63,7 +63,7 @@ public void initialize(ServiceConfiguration config) throws IOException {
throw new IOException(error);
}

loginContextName = config.getSaslJaasBrokerSectionName();
loginContextName = config.getSaslJaasServerSectionName();
if (jaasCredentialsContainer == null) {
log.info("JAAS loginContext is: {}." , loginContextName);
try {
Expand Down Expand Up @@ -101,7 +101,6 @@ public AuthenticationState newAuthState(AuthData authData,
SocketAddress remoteAddress,
SSLSession sslSession) throws AuthenticationException {
try {
new PulsarSaslServer(jaasCredentialsContainer.getSubject(), allowedIdsPattern);
return new SaslAuthenticationState(
new SaslAuthenticationDataSource(
new PulsarSaslServer(jaasCredentialsContainer.getSubject(), allowedIdsPattern)));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.authentication;

import java.io.File;
import java.io.FileWriter;
import java.net.URI;
import java.nio.file.Files;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import javax.security.auth.login.Configuration;

import com.google.common.collect.Sets;
import org.apache.bookkeeper.test.PortManager;
import org.apache.commons.io.FileUtils;
import org.apache.curator.shaded.com.google.common.collect.Maps;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationSasl;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(ProxySaslAuthenticationTest.class);
private int webServicePort;
private int servicePort;

public static File kdcDir;
public static File kerberosWorkDir;

private static MiniKdc kdc;
private static Properties properties;

private static String localHostname = "localhost";

@BeforeClass
public static void startMiniKdc() throws Exception {
kdcDir = Files.createTempDirectory("test-kdc-dir").toFile();
kerberosWorkDir = Files.createTempDirectory("test-kerberos-work-dir").toFile();

properties = MiniKdc.createConf();
kdc = new MiniKdc(properties, kdcDir);
kdc.start();

String principalBrokerNoRealm = "broker/" + localHostname;
String principalBroker = "broker/" + localHostname + "@" + kdc.getRealm();
log.info("principalBroker: " + principalBroker);

String principalClientNoRealm = "client/" + localHostname;
String principalClient = principalClientNoRealm + "@" + kdc.getRealm();
log.info("principalClient: " + principalClient);

String principalProxyNoRealm = "proxy/" + localHostname;
String principalProxy = principalProxyNoRealm + "@" + kdc.getRealm();
log.info("principalProxy: " + principalProxy);

File keytabClient = new File(kerberosWorkDir, "pulsarclient.keytab");
kdc.createPrincipal(keytabClient, principalClientNoRealm);

File keytabBroker = new File(kerberosWorkDir, "pulsarbroker.keytab");
kdc.createPrincipal(keytabBroker, principalBrokerNoRealm);

File keytabProxy = new File(kerberosWorkDir, "pulsarproxy.keytab");
kdc.createPrincipal(keytabProxy, principalProxyNoRealm);

File jaasFile = new File(kerberosWorkDir, "jaas.properties");
try (FileWriter writer = new FileWriter(jaasFile)) {
writer.write("\n"
+ "PulsarBroker {\n"
+ " com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
+ " useKeyTab=true\n"
+ " keyTab=\"" + keytabBroker.getAbsolutePath() + "\n"
+ " storeKey=true\n"
+ " useTicketCache=false\n" // won't test useTicketCache=true on JUnit tests
+ " principal=\"" + principalBroker + "\";\n"
+ "};\n"
+ "\n"
+ "\n"
+ "\n"
+ "PulsarProxy{\n"
+ " com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
+ " useKeyTab=true\n"
+ " keyTab=\"" + keytabProxy.getAbsolutePath() + "\n"
+ " storeKey=true\n"
+ " useTicketCache=false\n" // won't test useTicketCache=true on JUnit tests
+ " principal=\"" + principalProxy + "\";\n"
+ "};\n"
+ "\n"
+ "\n"
+ "\n"
+ "PulsarClient {\n"
+ " com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
+ " useKeyTab=true\n"
+ " keyTab=\"" + keytabClient.getAbsolutePath() + "\n"
+ " storeKey=true\n"
+ " useTicketCache=false\n"
+ " principal=\"" + principalClient + "\";\n"
+ "};\n"
);
}

File krb5file = new File(kerberosWorkDir, "krb5.properties");
try (FileWriter writer = new FileWriter(krb5file)) {
String conf = "[libdefaults]\n"
+ " default_realm = " + kdc.getRealm() + "\n"
+ " udp_preference_limit = 1\n" // force use TCP
+ "\n"
+ "\n"
+ "[realms]\n"
+ " " + kdc.getRealm() + " = {\n"
+ " kdc = " + kdc.getHost() + ":" + kdc.getPort() + "\n"
+ " }";
writer.write(conf);
log.info("krb5.properties:\n" + conf);
}

System.setProperty("java.security.auth.login.config", jaasFile.getAbsolutePath());
System.setProperty("java.security.krb5.properties", krb5file.getAbsolutePath());
Configuration.getConfiguration().refresh();

// Client config

log.info("created AuthenticationSasl");
}

@AfterClass
public static void stopMiniKdc() {
System.clearProperty("java.security.auth.login.config");
System.clearProperty("java.security.krb5.properties");
if (kdc != null) {
kdc.stop();
}
FileUtils.deleteQuietly(kdcDir);
FileUtils.deleteQuietly(kerberosWorkDir);
Assert.assertFalse(kdcDir.exists());
Assert.assertFalse(kerberosWorkDir.exists());
}

@BeforeMethod
@Override
protected void setup() throws Exception {
log.info("-- {} --, start at host: {}", methodName, localHostname);
webServicePort = PortManager.nextFreePort();
servicePort = PortManager.nextFreePort();
isTcpLookup = true;
conf.setAdvertisedAddress(localHostname);
conf.setAuthenticationEnabled(true);
conf.setSaslAuthentication(true);
conf.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
conf.setSaslJaasServerSectionName("PulsarBroker");
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderSasl.class.getName());
conf.setAuthenticationProviders(providers);
conf.setClusterName("test");

super.init();

lookupUrl = new URI("broker://" + "localhost" + ":" + BROKER_PORT);

super.producerBaseSetup();
log.info("-- {} --, end.", methodName);
}

@Override
@AfterMethod
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
void testAuthentication() throws Exception {
log.info("-- Starting {} test --", methodName);

// Step 1: Create Admin Client
//updateAdminClient();
final String proxyServiceUrl = "pulsar://localhost:" + servicePort;
// create a client which connects to proxy and pass authData
String topicName = "persistent://my-property/my-ns/my-topic1";

ProxyConfiguration proxyConfig = new ProxyConfiguration();
proxyConfig.setAuthenticationEnabled(true);
proxyConfig.setServicePort(servicePort);
proxyConfig.setWebServicePort(webServicePort);
proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT);
proxyConfig.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
proxyConfig.setSaslJaasServerSectionName("PulsarProxy");

// proxy connect to broker
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
/*proxyConfig.setBrokerClientAuthenticationParameters(
"{\"saslJaasClientSectionName\": " + "\"PulsarProxy\"," +
"\"serverType\": " + "\"broker\"}");*/
proxyConfig.setBrokerClientAuthenticationParameters(
"{\"saslJaasClientSectionName\": " + "\"PulsarProxy\"," +
"\"serverType\": " + "\"broker\"}");

// proxy as a server, it will use sasl to authn
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderSasl.class.getName());
proxyConfig.setAuthenticationProviders(providers);

proxyConfig.setForwardAuthorizationCredentials(true);
AuthenticationService authenticationService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
ProxyService proxyService = new ProxyService(proxyConfig, authenticationService);

proxyService.start();
log.info("1 proxy service started {}", proxyService);

// Step 3: Pass correct client params
PulsarClient proxyClient = createProxyClient(proxyServiceUrl, 1);
log.info("2 create proxy client {}, {}", proxyServiceUrl, proxyClient);

Producer<byte[]> producer = proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
log.info("3 created producer.");

Consumer<byte[]> consumer = proxyClient.newConsumer(Schema.BYTES).topic(topicName).subscriptionName("test-sub").subscribe();
log.info("4 created consumer.");

for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
log.info("Produced message: [{}]", message);
}

Message<byte[]> msg = null;
Set<String> messageSet = Sets.newHashSet();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.info("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
consumer.close();

proxyClient.close();
proxyService.close();
}

private PulsarClient createProxyClient(String proxyServiceUrl, int numberOfConnections) throws PulsarClientException {
Map<String, String> clientSaslConfig = Maps.newHashMap();
clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
clientSaslConfig.put("serverType", "proxy");
log.info("set client jaas section name: PulsarClient, serverType: proxy");
Authentication authSasl = AuthenticationFactory.create(AuthenticationSasl.class.getName(), clientSaslConfig);

return PulsarClient.builder().serviceUrl(proxyServiceUrl)
.authentication(authSasl).connectionsPerBroker(numberOfConnections).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ protected void setup() throws Exception {
conf.setAuthenticationEnabled(true);
conf.setSaslAuthentication(true);
conf.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
conf.setSaslJaasBrokerSectionName("PulsarBroker");
conf.setSaslJaasServerSectionName("PulsarBroker");
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderSasl.class.getName());
conf.setAuthenticationProviders(providers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ "(disable default-ttl with value 0)"
)
private int ttlDurationDefaultInSeconds = 0;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Enable the deletion of inactive topics"
Expand Down Expand Up @@ -625,7 +625,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
category = CATEGORY_SASL_AUTH,
doc = "Service Principal, for login context name. Default value is \"Broker\"."
)
private String saslJaasBrokerSectionName = SaslConstants.JAAS_DEFAULT_BROKER_SECTION_NAME;
private String saslJaasServerSectionName = SaslConstants.JAAS_DEFAULT_BROKER_SECTION_NAME;

@FieldContext(
category = CATEGORY_SASL_AUTH,
Expand Down
Loading

0 comments on commit 66a1b1b

Please sign in to comment.