Skip to content

Commit

Permalink
Added setting for anonymous user role (apache#820)
Browse files Browse the repository at this point in the history
* Added setting for anonymous user role

* Addressed PR comments
  • Loading branch information
yush1ga authored and hrsakai committed Jan 16, 2018
1 parent 36226c7 commit e0c04ad
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 61 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ brokerClientAuthenticationParameters=
# Supported Athenz provider domain names(comma separated) for authentication
athenzDomainNames=

# When this parameter is not empty, unauthenticated users perform as anonymousUserRole
anonymousUserRole=

### --- BookKeeper Client --- ###

# Authentication plugin to use when connecting to bookies
Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ brokerClientAuthenticationParameters=
# Supported Athenz provider domain names(comma separated) for authentication
athenzDomainNames=

# When this parameter is not empty, unauthenticated users perform as anonymousUserRole
anonymousUserRole=

### --- BookKeeper Client --- ###

# Authentication plugin to use when connecting to bookies
Expand Down
3 changes: 3 additions & 0 deletions conf/websocket.conf
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ superUserRoles=
brokerClientAuthenticationPlugin=
brokerClientAuthenticationParameters=

# When this parameter is not empty, unauthenticated users perform as anonymousUserRole
anonymousUserRole=

### --- TLS --- ###

# Enable TLS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ public class ServiceConfiguration implements PulsarConfiguration {
private String brokerClientAuthenticationPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationDisabled";
private String brokerClientAuthenticationParameters = "";

// When this parameter is not empty, unauthenticated users perform as anonymousUserRole
private String anonymousUserRole = null;

/**** --- BookKeeper Client --- ****/
// Authentication plugin to use when connecting to bookies
private String bookkeeperClientAuthenticationPlugin;
Expand Down Expand Up @@ -796,6 +799,14 @@ public void setBrokerClientAuthenticationParameters(String brokerClientAuthentic
this.brokerClientAuthenticationParameters = brokerClientAuthenticationParameters;
}

public String getAnonymousUserRole() {
return anonymousUserRole;
}

public void setAnonymousUserRole(String anonymousUserRole) {
this.anonymousUserRole = anonymousUserRole;
}

public String getBookkeeperClientAuthenticationPlugin() {
return bookkeeperClientAuthenticationPlugin;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import javax.naming.AuthenticationException;
import javax.servlet.http.HttpServletRequest;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.slf4j.Logger;
Expand All @@ -38,10 +39,12 @@
*/
public class AuthenticationService implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(AuthenticationService.class);
private final String anonymousUserRole;

private final Map<String, AuthenticationProvider> providers = Maps.newHashMap();

public AuthenticationService(ServiceConfiguration conf) throws PulsarServerException {
anonymousUserRole = conf.getAnonymousUserRole();
if (conf.isAuthenticationEnabled()) {
try {
AuthenticationProvider provider;
Expand Down Expand Up @@ -71,6 +74,9 @@ public String authenticate(AuthenticationDataSource authData, String authMethodN
if (provider != null) {
return provider.authenticate(authData);
} else {
if (StringUtils.isNotBlank(anonymousUserRole)) {
return anonymousUserRole;
}
throw new AuthenticationException("Unsupported authentication mode: " + authMethodName);
}
}
Expand All @@ -88,6 +94,9 @@ public String authenticateHttpRequest(HttpServletRequest request) throws Authent

// No authentication provided
if (!providers.isEmpty()) {
if (StringUtils.isNotBlank(anonymousUserRole)) {
return anonymousUserRole;
}
// If at least a provider was configured, then the authentication needs to be provider
throw new AuthenticationException("Authentication required");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@
import static org.mockito.Mockito.spy;

import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.TimeUnit;

import javax.ws.rs.InternalServerErrorException;
Expand All @@ -42,6 +39,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
Expand Down Expand Up @@ -69,6 +67,10 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
@BeforeMethod
@Override
protected void setup() throws Exception {
if (methodName.equals("testAnonymousSyncProducerAndConsumer")) {
conf.setAnonymousUserRole("anonymousUser");
}

conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);

Expand Down Expand Up @@ -120,26 +122,10 @@ public Object[][] codecProvider() {
return new Object[][] { { 0 }, { 1000 } };
}

@Test(dataProvider = "batch")
public void testTlsSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
log.info("-- Starting {} test --", methodName);

Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
Authentication authTls = new AuthenticationTls();
authTls.configure(authParams);
internalSetup(authTls);

admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
"pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
admin.namespaces().createNamespace("my-property/use/my-ns");

public void testSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Exclusive);
Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name",
Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic", "my-subscriber-name",
conf);

ProducerConfiguration producerConf = new ProducerConfiguration();
Expand All @@ -150,7 +136,7 @@ public void testTlsSyncProducerAndConsumer(int batchMessageDelayMs) throws Excep
producerConf.setBatchingMaxMessages(5);
}

Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf);
Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic", producerConf);
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
Expand All @@ -168,6 +154,70 @@ public void testTlsSyncProducerAndConsumer(int batchMessageDelayMs) throws Excep
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
consumer.close();
}

@Test(dataProvider = "batch")
public void testTlsSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
log.info("-- Starting {} test --", methodName);

Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
Authentication authTls = new AuthenticationTls();
authTls.configure(authParams);
internalSetup(authTls);

admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
"pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
admin.namespaces().createNamespace("my-property/use/my-ns");

testSyncProducerAndConsumer(batchMessageDelayMs);

log.info("-- Exiting {} test --", methodName);
}

@Test(dataProvider = "batch")
public void testAnonymousSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
log.info("-- Starting {} test --", methodName);

Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
Authentication authTls = new AuthenticationTls();
authTls.configure(authParams);
internalSetup(authTls);

admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
"pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("anonymousUser"), Sets.newHashSet("use")));

// make a PulsarAdmin instance as "anonymousUser" for http request
admin.close();
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setOperationTimeout(1, TimeUnit.SECONDS);
admin = spy(new PulsarAdmin(brokerUrl, clientConf));
admin.namespaces().createNamespace("my-property/use/my-ns");
admin.persistentTopics().grantPermission("persistent://my-property/use/my-ns/my-topic", "anonymousUser", EnumSet
.allOf(AuthAction.class));

// setup the client
pulsarClient.close();
pulsarClient = PulsarClient.create("pulsar://localhost:" + BROKER_PORT, clientConf);

// unauthorized topic test
Exception pulsarClientException = null;
try {
pulsarClient.subscribe("persistent://my-property/use/my-ns/other-topic", "my-subscriber-name");
} catch (Exception e) {
pulsarClientException = e;
}
Assert.assertTrue(pulsarClientException instanceof PulsarClientException);

testSyncProducerAndConsumer(batchMessageDelayMs);

log.info("-- Exiting {} test --", methodName);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.websocket.proxy;

import org.apache.pulsar.broker.authentication.AuthenticationDataSource;

import javax.naming.AuthenticationException;

public class MockUnauthenticationProvider extends MockAuthenticationProvider {

@Override
public String getAuthMethodName() {
// method name
return "mockunauth";
}

@Override
public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
throw new AuthenticationException();
}

}
Loading

0 comments on commit e0c04ad

Please sign in to comment.