Skip to content

Commit

Permalink
[Issue 10221] Fix authorization error while using proxy and Prefix
Browse files Browse the repository at this point in the history
…subscription authentication mode (apache#10226)

Fixes apache#10221 

### Motivation
If using pulsar proxy and `Prefix` subscription authentication mode, [org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider#canConsumeAsync](https://github.com/apache/pulsar/blob/master/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java#L135) will throw exception which cause the consumer error.

### Modifications

Update the `org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider#allowTopicOperationAsync` logic, check `isSuperUser` first, then return `isAuthorizedFuture`.

### Verifying this change

- [x] Make sure that the change passes the CI checks.
  • Loading branch information
dragonls authored Apr 19, 2021
1 parent 36c3bc3 commit 889b9b8
Show file tree
Hide file tree
Showing 2 changed files with 297 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -551,20 +551,24 @@ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
break;
case CONSUME: isAuthorizedFuture = canConsumeAsync(topicName, role, authData, authData.getSubscription());
break;
default: isAuthorizedFuture = FutureUtil.failedFuture(
new IllegalStateException("TopicOperation is not supported."));
default:
return FutureUtil.failedFuture(new IllegalStateException("TopicOperation is not supported."));
}

CompletableFuture<Boolean> isSuperUserFuture = isSuperUser(role, authData, conf);

// check isSuperUser first
return isSuperUserFuture
.thenCombine(isAuthorizedFuture, (isSuperUser, isAuthorized) -> {
.thenCompose(isSuperUser -> {
if (log.isDebugEnabled()) {
log.debug("Verify if role {} is allowed to {} to topic {}:"
+ " isSuperUser={}, isAuthorized={}",
role, operation, topicName, isSuperUser, isAuthorized);
log.debug("Verify if role {} is allowed to {} to topic {}: isSuperUser={}",
role, operation, topicName, isSuperUser);
}
if (isSuperUser) {
return CompletableFuture.completedFuture(true);
} else {
return isAuthorizedFuture;
}
return isSuperUser || isAuthorized;
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server;

import static org.mockito.Mockito.spy;

import com.google.common.collect.Sets;

import java.util.*;
import java.util.concurrent.TimeUnit;

import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import javax.crypto.SecretKey;

public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(ProxyWithAuthorizationTest.class);

private final String ADMIN_ROLE = "admin";
private final String PROXY_ROLE = "proxy";
private final String BROKER_ROLE = "broker";
private final String CLIENT_ROLE = "client";
private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);

private final String ADMIN_TOKEN = Jwts.builder().setSubject(ADMIN_ROLE).signWith(SECRET_KEY).compact();
private final String PROXY_TOKEN = Jwts.builder().setSubject(PROXY_ROLE).signWith(SECRET_KEY).compact();
private final String BROKER_TOKEN = Jwts.builder().setSubject(BROKER_ROLE).signWith(SECRET_KEY).compact();
private final String CLIENT_TOKEN = Jwts.builder().setSubject(CLIENT_ROLE).signWith(SECRET_KEY).compact();

private ProxyService proxyService;
private final ProxyConfiguration proxyConfig = new ProxyConfiguration();

@BeforeMethod
@Override
protected void setup() throws Exception {
// enable auth&auth and use JWT at broker
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);
conf.getProperties().setProperty("tokenSecretKey", "data:;base64," + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));

Set<String> superUserRoles = new HashSet<>();
superUserRoles.add(ADMIN_ROLE);
superUserRoles.add(PROXY_ROLE);
superUserRoles.add(BROKER_ROLE);
conf.setSuperUserRoles(superUserRoles);

conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
conf.setBrokerClientAuthenticationParameters(BROKER_TOKEN);
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderToken.class.getName());
conf.setAuthenticationProviders(providers);

conf.setClusterName("proxy-authorization");
conf.setNumExecutorThreadPoolSize(5);

super.init();

// start proxy service
proxyConfig.setAuthenticationEnabled(true);
proxyConfig.setAuthorizationEnabled(false);
proxyConfig.getProperties().setProperty("tokenSecretKey", "data:;base64," + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());

proxyConfig.setServicePort(Optional.of(0));
proxyConfig.setWebServicePort(Optional.of(0));

// enable auth&auth and use JWT at proxy
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
proxyConfig.setBrokerClientAuthenticationParameters(PROXY_TOKEN);
proxyConfig.setAuthenticationProviders(providers);

proxyService = Mockito.spy(new ProxyService(proxyConfig,
new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
proxyService.close();
}

private void startProxy() throws Exception {
proxyService.start();
}

/**
* <pre>
* It verifies jwt + Authentication + Authorization (client -> proxy -> broker).
*
* 1. client connects to proxy over jwt and pass auth-data
* 2. proxy authenticate client and retrieve client-role
* and send it to broker as originalPrincipal over jwt
* 3. client creates producer/consumer via proxy
* 4. broker authorize producer/consumer create request using originalPrincipal
*
* </pre>
*/
@Test
public void testProxyAuthorization() throws Exception {
log.info("-- Starting {} test --", methodName);

startProxy();
createAdminClient();
PulsarClient proxyClient = createPulsarClient(proxyService.getServiceUrl(), PulsarClient.builder());

String namespaceName = "my-property/proxy-authorization/my-ns";

admin.clusters().createCluster("proxy-authorization", new ClusterData(brokerUrl.toString()));

admin.tenants().createTenant("my-property",
new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("proxy-authorization")));
admin.namespaces().createNamespace(namespaceName);

Consumer<byte[]> consumer;
try {
consumer = proxyClient.newConsumer()
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscribe();
Assert.fail("should have failed with authorization error");
} catch (Exception ex) {
// excepted
admin.namespaces().grantPermissionOnNamespace(namespaceName, CLIENT_ROLE,
Sets.newHashSet(AuthAction.consume));
log.info("-- Admin permissions {} ---", admin.namespaces().getPermissions(namespaceName));
consumer = proxyClient.newConsumer()
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscribe();
}

Producer<byte[]> producer;
try {
producer = proxyClient.newProducer(Schema.BYTES)
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1").create();
Assert.fail("should have failed with authorization error");
} catch (Exception ex) {
// excepted
admin.namespaces().grantPermissionOnNamespace(namespaceName, CLIENT_ROLE,
Sets.newHashSet(AuthAction.produce, AuthAction.consume));
log.info("-- Admin permissions {} ---", admin.namespaces().getPermissions(namespaceName));
producer = proxyClient.newProducer(Schema.BYTES)
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1").create();
}
final int msgs = 10;
for (int i = 0; i < msgs; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}

Message<byte[]> msg = null;
Set<String> messageSet = Sets.newHashSet();
int count = 0;
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
count++;
}
// Acknowledge the consumption of all messages at once
Assert.assertEquals(msgs, count);
consumer.acknowledgeCumulative(msg);
consumer.close();
log.info("-- Exiting {} test --", methodName);
}

/**
* <pre>
* It verifies jwt + Authentication + Authorization (client -> proxy -> broker).
* It also test `SubscriptionAuthMode.Prefix` mode.
*
* 1. client connects to proxy over jwt and pass auth-data
* 2. proxy authenticate client and retrieve client-role
* and send it to broker as originalPrincipal over jwt
* 3. client creates producer/consumer via proxy
* 4. broker authorize producer/consumer create request using originalPrincipal
*
* </pre>
*/
@Test
public void testProxyAuthorizationWithPrefixSubscriptionAuthMode() throws Exception {
log.info("-- Starting {} test --", methodName);

startProxy();
createAdminClient();
PulsarClient proxyClient = createPulsarClient(proxyService.getServiceUrl(), PulsarClient.builder());

String namespaceName = "my-property/proxy-authorization/my-ns";

admin.clusters().createCluster("proxy-authorization", new ClusterData(brokerUrl.toString()));

admin.tenants().createTenant("my-property",
new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("proxy-authorization")));
admin.namespaces().createNamespace(namespaceName);
admin.namespaces().grantPermissionOnNamespace(namespaceName, CLIENT_ROLE,
Sets.newHashSet(AuthAction.produce, AuthAction.consume));
admin.namespaces().setSubscriptionAuthMode(namespaceName, SubscriptionAuthMode.Prefix);

Consumer<byte[]> consumer;
try {
consumer = proxyClient.newConsumer()
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscribe();
Assert.fail("should have failed with authorization error");
} catch (Exception ex) {
// excepted
consumer = proxyClient.newConsumer()
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1")
.subscriptionName(CLIENT_ROLE + "-sub1").subscribe();
}

Producer<byte[]> producer = proxyClient.newProducer(Schema.BYTES)
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1").create();
final int msgs = 10;
for (int i = 0; i < msgs; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}

Message<byte[]> msg = null;
Set<String> messageSet = Sets.newHashSet();
int count = 0;
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
count++;
}
// Acknowledge the consumption of all messages at once
Assert.assertEquals(msgs, count);
consumer.acknowledgeCumulative(msg);
consumer.close();
log.info("-- Exiting {} test --", methodName);
}

private void createAdminClient() throws Exception {
admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
.authentication(AuthenticationFactory.token(ADMIN_TOKEN)).build());
}

private PulsarClient createPulsarClient(String proxyServiceUrl, ClientBuilder clientBuilder)
throws PulsarClientException {
return clientBuilder.serviceUrl(proxyServiceUrl).statsInterval(0, TimeUnit.SECONDS)
.authentication(AuthenticationFactory.token(CLIENT_TOKEN))
.operationTimeout(1000, TimeUnit.MILLISECONDS).build();
}
}

0 comments on commit 889b9b8

Please sign in to comment.