From 36226c7f365615023042ecb88aff513817d44673 Mon Sep 17 00:00:00 2001 From: yush1ga Date: Mon, 15 Jan 2018 19:46:54 +0900 Subject: [PATCH] Add subscription auth mode by prefix (#899) * Add subscription auth mode by prefix * Fix tests * Throws exceptions when auth role prefix is missed --- .../authorization/AuthorizationManager.java | 50 ++++++++++++++-- .../pulsar/broker/admin/Namespaces.java | 44 ++++++++++++++ .../pulsar/broker/service/Consumer.java | 2 +- .../pulsar/broker/service/ServerCnx.java | 13 +++- .../pulsar/broker/auth/AuthorizationTest.java | 48 +++++++++++---- .../pulsar/broker/service/ServerCnxTest.java | 4 +- .../AuthenticatedProducerConsumerTest.java | 1 + .../proxy/ProxyAuthorizationTest.java | 6 +- .../pulsar/client/admin/Namespaces.java | 10 ++++ .../client/admin/internal/NamespacesImpl.java | 59 ++++++++++++------- .../pulsar/admin/cli/CmdNamespaces.java | 19 +++++- .../pulsar/common/policies/data/Policies.java | 8 ++- .../policies/data/SubscriptionAuthMode.java | 31 ++++++++++ .../pulsar/websocket/ConsumerHandler.java | 2 +- .../pulsar/websocket/ReaderHandler.java | 2 +- 15 files changed, 246 insertions(+), 53 deletions(-) create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionAuthMode.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java index e33e2a5a62cc0..9fa31ccfdb0c9 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java @@ -22,8 +22,10 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec; +import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.common.naming.DestinationName; @@ -78,14 +80,52 @@ public boolean canProduce(DestinationName destination, String role) throws Excep * the fully qualified destination name associated with the destination. * @param role * the app id used to receive messages from the destination. + * @param subscription + * the subscription name defined by the client */ - public CompletableFuture canConsumeAsync(DestinationName destination, String role) { - return checkAuthorization(destination, role, AuthAction.consume); + public CompletableFuture canConsumeAsync(DestinationName destination, String role, String subscription) { + CompletableFuture permissionFuture = new CompletableFuture<>(); + try { + configCache.policiesCache().getAsync(POLICY_ROOT + destination.getNamespace()).thenAccept(policies -> { + if (!policies.isPresent()) { + if (log.isDebugEnabled()) { + log.debug("Policies node couldn't be found for destination : {}", destination); + } + } else { + if (isNotBlank(subscription)) { + switch (policies.get().subscription_auth_mode) { + case Prefix: + if (!subscription.startsWith(role)) { + PulsarServerException ex = new PulsarServerException( + String.format("Failed to create consumer - The subscription name needs to be prefixed by the authentication role, like %s-xxxx for destination: %s", role, destination)); + permissionFuture.completeExceptionally(ex); + return; + } + break; + default: + break; + } + } + } + checkAuthorization(destination, role, AuthAction.consume).thenAccept(isAuthorized -> { + permissionFuture.complete(isAuthorized); + }); + }).exceptionally(ex -> { + log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination, + ex); + permissionFuture.completeExceptionally(ex); + return null; + }); + } catch (Exception e) { + log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination, e); + permissionFuture.completeExceptionally(e); + } + return permissionFuture; } - public boolean canConsume(DestinationName destination, String role) throws Exception { + public boolean canConsume(DestinationName destination, String role, String subscription) throws Exception { try { - return canConsumeAsync(destination, role).get(cacheTimeOutInSec, SECONDS); + return canConsumeAsync(destination, role, subscription).get(cacheTimeOutInSec, SECONDS); } catch (InterruptedException e) { log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination); throw e; @@ -107,7 +147,7 @@ public boolean canConsume(DestinationName destination, String role) throws Excep * @throws Exception */ public boolean canLookup(DestinationName destination, String role) throws Exception { - return canProduce(destination, role) || canConsume(destination, role); + return canProduce(destination, role) || canConsume(destination, role, null); } private CompletableFuture checkAuthorization(DestinationName destination, String role, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java index 5da4ca2da46af..cb8cf72a8dcd4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java @@ -75,6 +75,7 @@ import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.data.Stat; @@ -1443,6 +1444,49 @@ public void unsubscribeNamespaceBundle(@PathParam("property") String property, @ nsName.toString(), bundleRange); } + @POST + @Path("/{property}/{cluster}/{namespace}/subscriptionAuthMode") + @ApiOperation(value = " Set a subscription auth mode for all the destinations on a namespace.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace does not exist"), + @ApiResponse(code = 409, message = "Concurrent modification") }) + public void setSubscriptionAuthMode(@PathParam("property") String property, @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, SubscriptionAuthMode subscriptionAuthMode) { + validateAdminAccessOnProperty(property); + validatePoliciesReadOnlyAccess(); + + if (subscriptionAuthMode == null) { + subscriptionAuthMode = SubscriptionAuthMode.None; + } + + try { + Stat nodeStat = new Stat(); + final String path = path(POLICIES, property, cluster, namespace); + byte[] content = globalZk().getData(path, null, nodeStat); + Policies policies = jsonMapper().readValue(content, Policies.class); + policies.subscription_auth_mode = subscriptionAuthMode; + globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion()); + policiesCache().invalidate(path(POLICIES, property, cluster, namespace)); + log.info("[{}] Successfully updated subscription auth mode: namespace={}/{}/{}, map={}", clientAppId(), property, + cluster, namespace, jsonMapper().writeValueAsString(policies.backlog_quota_map)); + + } catch (KeeperException.NoNodeException e) { + log.warn("[{}] Failed to update subscription auth mode for namespace {}/{}/{}: does not exist", clientAppId(), + property, cluster, namespace); + throw new RestException(Status.NOT_FOUND, "Namespace does not exist"); + } catch (KeeperException.BadVersionException e) { + log.warn("[{}] Failed to update subscription auth mode for namespace {}/{}/{}: concurrent modification", + clientAppId(), property, cluster, namespace); + throw new RestException(Status.CONFLICT, "Concurrent modification"); + } catch (RestException pfe) { + throw pfe; + } catch (Exception e) { + log.error("[{}] Failed to update subscription auth mode for namespace {}/{}/{}", clientAppId(), property, + cluster, namespace, e); + throw new RestException(e); + } + } + private void clearBacklog(NamespaceName nsName, String bundleRange, String subscription) { try { List topicList = pulsar().getBrokerService() diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 07b0a462cc49d..64bd4243c6bce 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -458,7 +458,7 @@ public void checkPermissions() { DestinationName destination = DestinationName.get(subscription.getDestination()); if (cnx.getBrokerService().getAuthorizationManager() != null) { try { - if (cnx.getBrokerService().getAuthorizationManager().canConsume(destination, appId)) { + if (cnx.getBrokerService().getAuthorizationManager().canConsume(destination, appId, subscription.getName())) { return; } } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 5600d654bef67..df8e577c7f0a5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -37,6 +37,7 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.util.SafeRun; +import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.authentication.AuthenticationDataCommand; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; @@ -349,7 +350,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { if (service.isAuthorizationEnabled()) { authorizationFuture = service.getAuthorizationManager().canConsumeAsync( DestinationName.get(subscribe.getTopic()), - originalPrincipal != null ? originalPrincipal : authRole); + originalPrincipal != null ? originalPrincipal : authRole, + subscribe.getSubscription()); } else { authorizationFuture = CompletableFuture.completedFuture(true); } @@ -460,6 +462,15 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg)); } return null; + }).exceptionally(ex -> { + String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), authRole); + if (ex.getCause() instanceof PulsarServerException) { + log.info(msg); + } else { + log.warn(msg); + } + ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, ex.getMessage())); + return null; }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java index 499d4f8de9803..0f79e7509b47e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java @@ -19,16 +19,16 @@ package org.apache.pulsar.broker.auth; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; import java.util.EnumSet; import org.apache.pulsar.broker.authorization.AuthorizationManager; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.DestinationName; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PropertyAdmin; -import org.apache.pulsar.broker.authorization.AuthorizationManager; +import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -85,8 +85,8 @@ void simple() throws Exception { assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role"), true); assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true); assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role"), false); - assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role"), true); - assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "no-access-role"), false); + assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role", null), true); + assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "no-access-role", null), false); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "no-access-role"), false); @@ -94,7 +94,7 @@ void simple() throws Exception { waitForChange(); assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true); - assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true); + assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), true); // test for wildcard @@ -102,7 +102,7 @@ void simple() throws Exception { assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), false); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.2"), false); assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), false); - assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), false); + assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), false); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.1"), false); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.2"), false); @@ -112,7 +112,7 @@ void simple() throws Exception { assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), true); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.2"), true); assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), true); - assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), false); + assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), false); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.1"), false); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.2"), false); @@ -120,7 +120,7 @@ void simple() throws Exception { assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.my"), false); assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false); - assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false); + assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), false); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false); @@ -130,7 +130,7 @@ void simple() throws Exception { assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), true); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.my"), true); assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false); - assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), true); + assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), true); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false); @@ -143,7 +143,7 @@ void simple() throws Exception { assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), false); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.2"), false); assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), false); - assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), false); + assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), false); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.1"), false); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.2"), false); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "my.role.1"), false); @@ -156,7 +156,7 @@ void simple() throws Exception { assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), true); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.2"), true); assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), true); - assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), false); + assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), false); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.1"), false); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.2"), false); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "my.role.1"), false); @@ -166,7 +166,7 @@ void simple() throws Exception { assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.my"), false); assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false); - assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false); + assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), false); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "1.role.my"), false); @@ -179,12 +179,34 @@ void simple() throws Exception { assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), true); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.my"), true); assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false); - assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), true); + assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), true); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "1.role.my"), false); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "2.role.my"), false); + admin.persistentTopics().revokePermissions("persistent://p1/c1/ns1/ds1", "my.*"); + admin.persistentTopics().revokePermissions("persistent://p1/c1/ns1/ds1", "*.my"); + + // tests for subscription auth mode + admin.namespaces().grantPermissionOnNamespace("p1/c1/ns1", "*", EnumSet.of(AuthAction.consume)); + admin.namespaces().setSubscriptionAuthMode("p1/c1/ns1", SubscriptionAuthMode.Prefix); + waitForChange(); + + assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role1"), true); + assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role2"), true); + try { + assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role1", "sub1"), false); + fail(); + } catch (Exception e) {} + try { + assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role2", "sub2"), false); + fail(); + } catch (Exception e) {} + + assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role1", "role1-sub1"), true); + assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role2", "role2-sub2"), true); + admin.namespaces().deleteNamespace("p1/c1/ns1"); admin.properties().deleteProperty("p1"); admin.clusters().deleteCluster("c1"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index a3b9487e3861c..7e7e9fc214c81 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -1127,7 +1127,7 @@ public void testUnsupportedBatchMsgSubscribeCommand() throws Exception { @Test(timeOut = 30000) public void testSubscribeCommandWithAuthorizationPositive() throws Exception { AuthorizationManager authorizationManager = mock(AuthorizationManager.class); - doReturn(CompletableFuture.completedFuture(true)).when(authorizationManager).canConsumeAsync(Mockito.any(), Mockito.any()); + doReturn(CompletableFuture.completedFuture(true)).when(authorizationManager).canConsumeAsync(Mockito.any(), Mockito.any(), Mockito.any()); doReturn(authorizationManager).when(brokerService).getAuthorizationManager(); doReturn(true).when(brokerService).isAuthenticationEnabled(); doReturn(true).when(brokerService).isAuthorizationEnabled(); @@ -1147,7 +1147,7 @@ public void testSubscribeCommandWithAuthorizationPositive() throws Exception { @Test(timeOut = 30000) public void testSubscribeCommandWithAuthorizationNegative() throws Exception { AuthorizationManager authorizationManager = mock(AuthorizationManager.class); - doReturn(CompletableFuture.completedFuture(false)).when(authorizationManager).canConsumeAsync(Mockito.any(), Mockito.any()); + doReturn(CompletableFuture.completedFuture(false)).when(authorizationManager).canConsumeAsync(Mockito.any(), Mockito.any(), Mockito.any()); doReturn(authorizationManager).when(brokerService).getAuthorizationManager(); doReturn(true).when(brokerService).isAuthenticationEnabled(); doReturn(true).when(brokerService).isAuthorizationEnabled(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java index 3049f94dd3525..a6eff597009b6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java @@ -44,6 +44,7 @@ import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PropertyAdmin; +import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java index 5f267a8d8f6a4..c996c9f7130b0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java @@ -102,8 +102,8 @@ public void test() throws Exception { assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role"), true); assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true); assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role"), false); - assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role"), true); - assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "no-access-role"), false); + assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role", null), true); + assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "no-access-role", null), false); assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "no-access-role"), false); @@ -111,7 +111,7 @@ public void test() throws Exception { waitForChange(); assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true); - assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true); + assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), true); admin.namespaces().deleteNamespace("p1/c1/ns1"); admin.properties().deleteProperty("p1"); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index edd8c5ce73ed0..ebe754c8e0dad 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -33,6 +33,7 @@ import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; /** * Admin interface for namespaces management @@ -835,4 +836,13 @@ void clearNamespaceBundleBacklogForSubscription(String namespace, String bundle, * Unexpected error */ void setEncryptionRequiredStatus(String namespace, boolean encryptionRequired) throws PulsarAdminException; + + /** + * Set the given subscription auth mode on all destinations on a namespace + * + * @param namespace + * @param subscriptionAuthMode + * @throws PulsarAdminException + */ + void setSubscriptionAuthMode(String namespace, SubscriptionAuthMode subscriptionAuthMode) throws PulsarAdminException; } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 9f6467950cf58..79aa162c4e18f 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -40,6 +40,7 @@ import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; public class NamespacesImpl extends BaseResource implements Namespaces { @@ -76,7 +77,7 @@ public List getDestinations(String namespace) throws PulsarAdminExceptio NamespaceName ns = NamespaceName.get(namespace); return request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()) .path("destinations")).get(new GenericType>() { - }); + }); } catch (Exception e) { throw getApiException(e); } @@ -148,8 +149,8 @@ public Map> getPermissions(String namespace) throws Puls NamespaceName ns = NamespaceName.get(namespace); return request( namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("permissions")) - .get(new GenericType>>() { - }); + .get(new GenericType>>() { + }); } catch (Exception e) { throw getApiException(e); } @@ -184,8 +185,8 @@ public List getNamespaceReplicationClusters(String namespace) throws Pul NamespaceName ns = NamespaceName.get(namespace); return request( namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("replication")) - .get(new GenericType>() { - }); + .get(new GenericType>() { + }); } catch (Exception e) { throw getApiException(e); } @@ -208,8 +209,8 @@ public int getNamespaceMessageTTL(String namespace) throws PulsarAdminException NamespaceName ns = NamespaceName.get(namespace); return request( namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("messageTTL")) - .get(new GenericType() { - }); + .get(new GenericType() { + }); } catch (Exception e) { throw getApiException(e); @@ -231,7 +232,8 @@ public void setNamespaceMessageTTL(String namespace, int ttlInSeconds) throws Pu public void setDeduplicationStatus(String namespace, boolean enableDeduplication) throws PulsarAdminException { try { NamespaceName ns = NamespaceName.get(namespace); - request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("deduplication")) + request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()) + .path("deduplication")) .post(Entity.entity(enableDeduplication, MediaType.APPLICATION_JSON), ErrorData.class); } catch (Exception e) { throw getApiException(e); @@ -244,7 +246,7 @@ public Map getBacklogQuotaMap(String namespace) NamespaceName ns = NamespaceName.get(namespace); return request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()) .path("backlogQuotaMap")).get(new GenericType>() { - }); + }); } catch (Exception e) { throw getApiException(e); } @@ -255,8 +257,8 @@ public void setBacklogQuota(String namespace, BacklogQuota backlogQuota) throws try { NamespaceName ns = NamespaceName.get(namespace); request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()) - .path("backlogQuota")).post(Entity.entity(backlogQuota, MediaType.APPLICATION_JSON), - ErrorData.class); + .path("backlogQuota")) + .post(Entity.entity(backlogQuota, MediaType.APPLICATION_JSON), ErrorData.class); } catch (Exception e) { throw getApiException(e); } @@ -268,7 +270,7 @@ public void removeBacklogQuota(String namespace) throws PulsarAdminException { NamespaceName ns = NamespaceName.get(namespace); request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("backlogQuota") .queryParam("backlogQuotaType", BacklogQuotaType.destination_storage.toString())) - .delete(ErrorData.class); + .delete(ErrorData.class); } catch (Exception e) { throw getApiException(e); } @@ -291,7 +293,7 @@ public PersistencePolicies getPersistence(String namespace) throws PulsarAdminEx NamespaceName ns = NamespaceName.get(namespace); return request( namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("persistence")) - .get(PersistencePolicies.class); + .get(PersistencePolicies.class); } catch (Exception e) { throw getApiException(e); } @@ -315,7 +317,7 @@ public RetentionPolicies getRetention(String namespace) throws PulsarAdminExcept NamespaceName ns = NamespaceName.get(namespace); return request( namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("retention")) - .get(RetentionPolicies.class); + .get(RetentionPolicies.class); } catch (Exception e) { throw getApiException(e); } @@ -361,7 +363,7 @@ public void splitNamespaceBundle(String namespace, String bundle, boolean unload NamespaceName ns = NamespaceName.get(namespace); request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path(bundle) .path("split").queryParam("unload", Boolean.toString(unloadSplitBundles))) - .put(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); + .put(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); } catch (Exception e) { throw getApiException(e); } @@ -371,7 +373,8 @@ public void splitNamespaceBundle(String namespace, String bundle, boolean unload public void setDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException { try { NamespaceName ns = NamespaceName.get(namespace); - request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("dispatchRate")) + request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()) + .path("dispatchRate")) .post(Entity.entity(dispatchRate, MediaType.APPLICATION_JSON), ErrorData.class); } catch (Exception e) { throw getApiException(e); @@ -382,8 +385,8 @@ public void setDispatchRate(String namespace, DispatchRate dispatchRate) throws public DispatchRate getDispatchRate(String namespace) throws PulsarAdminException { try { NamespaceName ns = NamespaceName.get(namespace); - return request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("dispatchRate")) - .get(DispatchRate.class); + return request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()) + .path("dispatchRate")).get(DispatchRate.class); } catch (Exception e) { throw getApiException(e); } @@ -429,8 +432,8 @@ public void clearNamespaceBundleBacklogForSubscription(String namespace, String try { NamespaceName ns = NamespaceName.get(namespace); request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path(bundle) - .path("clearBacklog").path(subscription)).post(Entity.entity("", MediaType.APPLICATION_JSON), - ErrorData.class); + .path("clearBacklog").path(subscription)) + .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); } catch (Exception e) { throw getApiException(e); } @@ -453,8 +456,20 @@ public void unsubscribeNamespaceBundle(String namespace, String bundle, String s try { NamespaceName ns = NamespaceName.get(namespace); request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path(bundle) - .path("unsubscribe").path(subscription)).post(Entity.entity("", MediaType.APPLICATION_JSON), - ErrorData.class); + .path("unsubscribe").path(subscription)) + .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void setSubscriptionAuthMode(String namespace, SubscriptionAuthMode subscriptionAuthMode) throws PulsarAdminException { + try { + NamespaceName ns = NamespaceName.get(namespace); + request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()) + .path("subscriptionAuthMode")) + .post(Entity.entity(subscriptionAuthMode, MediaType.APPLICATION_JSON), ErrorData.class); } catch (Exception e) { throw getApiException(e); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index 9acf3badee377..37d7306f91f80 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -29,6 +29,7 @@ import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; @@ -308,7 +309,7 @@ private class SplitBundle extends CliCommand { @Parameter(names = { "--bundle", "-b" }, description = "{start-boundary}_{end-boundary}\n", required = true) private String bundle; - + @Parameter(names = { "--unload", "-u" }, description = "Unload newly split bundles after splitting old bundle", required = false) private boolean unload; @@ -534,6 +535,21 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Set subscription auth mode on a namespace") + private class SetSubscriptionAuthMode extends CliCommand { + @Parameter(description = "property/cluster/namespace", required = true) + private java.util.List params; + + @Parameter(names = { "-m", "--subscription-auth-mode" }, description = "subscription name", required = true) + private String mode; + + @Override + void run() throws Exception { + String namespace = validateNamespace(params); + admin.namespaces().setSubscriptionAuthMode(namespace, SubscriptionAuthMode.valueOf(mode)); + } + } + private static long validateSizeString(String s) { char last = s.charAt(s.length() - 1); String subStr = s.substring(0, s.length() - 1); @@ -623,5 +639,6 @@ public CmdNamespaces(PulsarAdmin admin) { jcommander.addCommand("unsubscribe", new Unsubscribe()); jcommander.addCommand("set-encryption-required", new SetEncryptionRequired()); + jcommander.addCommand("set-subscription-auth-mode", new SetSubscriptionAuthMode()); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index e2b7b059f7c9b..a46b5fee03787 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -47,6 +47,7 @@ public class Policies { public static final String LAST_BOUNDARY = "0xffffffff"; public boolean encryption_required = false; + public SubscriptionAuthMode subscription_auth_mode = SubscriptionAuthMode.None; @Override public boolean equals(Object obj) { @@ -61,7 +62,8 @@ public boolean equals(Object obj) { && Objects.equals(latency_stats_sample_rate, other.latency_stats_sample_rate) && message_ttl_in_seconds == other.message_ttl_in_seconds && Objects.equals(retention_policies, other.retention_policies) - && Objects.equals(encryption_required, other.encryption_required); + && Objects.equals(encryption_required, other.encryption_required) + && Objects.equals(subscription_auth_mode, other.subscription_auth_mode); } return false; @@ -86,7 +88,7 @@ public String toString() { .add("latency_stats_sample_rate", latency_stats_sample_rate) .add("message_ttl_in_seconds", message_ttl_in_seconds).add("retention_policies", retention_policies) .add("deleted", deleted) - .add("encryption_required", encryption_required).toString(); + .add("encryption_required", encryption_required) + .add("subscription_auth_mode", subscription_auth_mode).toString(); } } - diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionAuthMode.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionAuthMode.java new file mode 100644 index 0000000000000..5a57147619463 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionAuthMode.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.common.policies.data; + +/** + * Subscription authorization for Pulsar policies + */ +public enum SubscriptionAuthMode { + /** Every subscription name can be used by every role */ + None, + + /** Subscription name with auth role prefix can be used by the role */ + Prefix, +} diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java index c0ccf51b6295a..f6fd0eb636869 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java @@ -279,7 +279,7 @@ private ConsumerConfiguration getConsumerConfiguration() { @Override protected Boolean isAuthorized(String authRole) throws Exception { - return service.getAuthorizationManager().canConsume(DestinationName.get(topic), authRole); + return service.getAuthorizationManager().canConsume(DestinationName.get(topic), authRole, this.subscription); } private static String extractSubscription(HttpServletRequest request) { diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java index ddf23c5719f9e..eae771773d8cb 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java @@ -247,7 +247,7 @@ private ReaderConfiguration getReaderConfiguration() { @Override protected Boolean isAuthorized(String authRole) throws Exception { - return service.getAuthorizationManager().canConsume(DestinationName.get(topic), authRole); + return service.getAuthorizationManager().canConsume(DestinationName.get(topic), authRole, this.subscription); } private MessageId getMessageId() throws IOException {