From fd9875c41bdb86afad07413fdeedcc62781e92d2 Mon Sep 17 00:00:00 2001 From: Ivan Kelly Date: Mon, 15 Feb 2021 06:07:55 +0100 Subject: [PATCH] PLSR-1470: Only auth _errors_ should log at error level (#9325) An authentication failure should not log at error level, as it depends on the credentials that the client provides, which may well be wrong. An authentication/authorization error, where it cannot be decided if credentials are valid or not, is an error that needs operator attention and therefore should be logged at error level. Co-authored-by: Ivan Kelly Co-authored-by: Matteo Merli Co-authored-by: Sijie Guo --- .../AuthenticationProviderList.java | 2 +- .../broker/web/AuthenticationFilter.java | 8 +- .../pulsar/broker/service/ServerCnx.java | 33 +- .../pulsar/broker/auth/AuthLogsTest.java | 222 ++++++++++++++ .../broker/auth/MockAuthentication.java | 87 ++++++ .../auth/MockAuthenticationProvider.java | 70 +++++ .../auth/MockAuthorizationProvider.java | 289 ++++++++++++++++++ 7 files changed, 693 insertions(+), 18 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthenticationProvider.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java index 2d55f51008891..a79fabef3deb2 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java @@ -179,7 +179,7 @@ public AuthenticationState newAuthState(AuthData authData, SocketAddress remoteA authenticationException = ae; } if (states.isEmpty()) { - log.error("Failed to initialize a new auth state from {}", remoteAddress, authenticationException); + log.debug("Failed to initialize a new auth state from {}", remoteAddress, authenticationException); if (authenticationException != null) { throw authenticationException; } else { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java index 8555f34ec7481..5d3cae49394d5 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java @@ -20,6 +20,8 @@ import java.io.IOException; +import javax.naming.AuthenticationException; + import javax.servlet.Filter; import javax.servlet.FilterChain; import javax.servlet.FilterConfig; @@ -93,7 +95,11 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha } catch (Exception e) { HttpServletResponse httpResponse = (HttpServletResponse) response; httpResponse.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Authentication required"); - LOG.warn("[{}] Failed to authenticate HTTP request: {}", request.getRemoteAddr(), e.getMessage()); + if (e instanceof AuthenticationException) { + LOG.warn("[{}] Failed to authenticate HTTP request: {}", request.getRemoteAddr(), e.getMessage()); + } else { + LOG.error("[{}] Error performing authentication for HTTP", request.getRemoteAddr(), e); + } return; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 5df95ff747204..282a0e5469d54 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -54,7 +54,6 @@ import org.apache.bookkeeper.mledger.util.SafeRun; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.authentication.AuthenticationDataCommand; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; @@ -405,8 +404,8 @@ protected void handleLookup(CommandLookupTopic lookup) { } return null; }).exceptionally(ex -> { + logAuthException(remoteAddress, "lookup", getPrincipal(), Optional.of(topicName), ex); final String msg = "Exception occurred while trying to authorize lookup"; - log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, getPrincipal(), topicName, ex); ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId)); lookupSemaphore.release(); return null; @@ -478,8 +477,8 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa } return null; }).exceptionally(ex -> { + logAuthException(remoteAddress, "partition-metadata", getPrincipal(), Optional.of(topicName), ex); final String msg = "Exception occurred while trying to authorize get Partition Metadata"; - log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, getPrincipal(), topicName); ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg, requestId)); lookupSemaphore.release(); @@ -791,12 +790,8 @@ protected void handleConnect(CommandConnect connect) { } } } catch (Exception e) { + logAuthException(remoteAddress, "connect", getPrincipal(), Optional.empty(), e); String msg = "Unable to authenticate"; - if (e instanceof AuthenticationException) { - log.warn("[{}] {}: {}", remoteAddress, msg, e.getMessage()); - } else { - log.warn("[{}] {}", remoteAddress, msg, e); - } ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg)); close(); } @@ -1026,12 +1021,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { } return null; }).exceptionally(ex -> { - String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), getPrincipal()); - if (ex.getCause() instanceof PulsarServerException) { - log.info(msg); - } else { - log.warn(msg); - } + logAuthException(remoteAddress, "subscribe", getPrincipal(), Optional.of(topicName), ex); commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage()); return null; }); @@ -1264,8 +1254,7 @@ protected void handleProducer(final CommandProducer cmdProducer) { } return null; }).exceptionally(ex -> { - String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), getPrincipal()); - log.warn(msg); + logAuthException(remoteAddress, "producer", getPrincipal(), Optional.of(topicName), ex); commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage()); return null; }); @@ -2314,4 +2303,16 @@ public PulsarCommandSender getCommandSender() { public void execute(Runnable runnable) { ctx.channel().eventLoop().execute(runnable); } + + private static void logAuthException(SocketAddress remoteAddress, String operation, + String principal, Optional topic, Throwable ex) { + String topicString = topic.map(t -> ", topic=" + t.toString()).orElse(""); + if (ex instanceof AuthenticationException) { + log.info("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}", + remoteAddress, operation, principal, topicString, ex.getMessage()); + } else { + log.error("[{}] Error trying to authenticate: operation={}, principal={}{}", + remoteAddress, operation, principal, topicString, ex); + } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java new file mode 100644 index 0000000000000..d0323a8d5951a --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.auth; + +import static org.testng.Assert.fail; + +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.testng.annotations.AfterClass; + +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException; +import org.apache.pulsar.client.admin.PulsarAdminException.ServerSideErrorException; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException.AuthenticationException; +import org.apache.pulsar.client.api.PulsarClientException.AuthorizationException; +import org.apache.pulsar.client.api.Producer; +import com.google.common.collect.Sets; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This test doesn't test much in and off itself. + * However it is useful to see which logs are produced when there's an + * failure or error in authentication. + */ +public class AuthLogsTest extends MockedPulsarServiceBaseTest { + private static final Logger log = LoggerFactory.getLogger(AuthLogsTest.class); + + public AuthLogsTest() { + super(); + } + + @BeforeClass + @Override + public void setup() throws Exception { + conf.setClusterName("test"); + conf.setAuthenticationEnabled(true); + conf.setAuthenticationProviders( + Sets.newHashSet("org.apache.pulsar.broker.auth.MockAuthenticationProvider")); + conf.setAuthorizationProvider("org.apache.pulsar.broker.auth.MockAuthorizationProvider"); + conf.setAuthorizationEnabled(true); + conf.setAuthorizationAllowWildcardsMatching(true); + conf.setSuperUserRoles(Sets.newHashSet("super")); + internalSetup(); + + try (PulsarAdmin admin = PulsarAdmin.builder() + .authentication(new MockAuthentication("pass.pass")) + .serviceHttpUrl(brokerUrl.toString()).build()) { + admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress())); + admin.tenants().createTenant("public", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test"))); + admin.namespaces().createNamespace("public/default"); + admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet("test")); + } + } + + @AfterClass(alwaysRun = true) + @Override + public void cleanup() throws Exception { + internalCleanup(); + } + + @Test + public void binaryEndpoint() throws Exception { + log.info("LOG_TEST_SUCCESS_CLIENT should succeeed both client"); + try (PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .authentication(new MockAuthentication("pass.pass")) + .build(); + Producer producer = client.newProducer().topic("foobar").create(); + Consumer consumer = client.newConsumer().topic("foobar") + .subscriptionName("foobar").subscribe()) { + } + + log.info("LOG_TEST_PRODUCER_AUTHN_FAIL"); + try (PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .authentication(new MockAuthentication("fail.ignored")) + .build(); + Producer producer = client.newProducer().topic("foobar").create()) { + fail("Should fail auth"); + } catch (AuthenticationException ae) { /* expected */ } + + log.info("LOG_TEST_PRODUCER_AUTHN_ERROR"); + try (PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .authentication(new MockAuthentication("error.ignored")) + .build(); + Producer producer = client.newProducer().topic("foobar").create()) { + fail("Should fail auth"); + } catch (AuthenticationException ae) { /* expected */ } + + log.info("LOG_TEST_CONSUMER_AUTHN_FAIL"); + try (PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .authentication(new MockAuthentication("fail.ignored")) + .build(); + Consumer consumer = client.newConsumer().topic("foobar") + .subscriptionName("foobar").subscribe()) { + fail("Should fail auth"); + } catch (AuthenticationException ae) { /* expected */ } + + log.info("LOG_TEST_CONSUMER_AUTHN_ERROR"); + try (PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .authentication(new MockAuthentication("error.ignored")) + .build(); + Consumer consumer = client.newConsumer().topic("foobar") + .subscriptionName("foobar").subscribe()) { + fail("Should fail auth"); + } catch (AuthenticationException ae) { /* expected */ } + + log.info("LOG_TEST_PRODUCER_AUTHZ_FAIL"); + try (PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .authentication(new MockAuthentication("pass.fail")) + .build(); + Producer producer = client.newProducer().topic("foobar").create()) { + fail("Should fail auth"); + } catch (AuthorizationException ae) { /* expected */ } + + log.info("LOG_TEST_PRODUCER_AUTHZ_ERROR"); + try (PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .authentication(new MockAuthentication("pass.error")) + .build(); + Producer producer = client.newProducer().topic("foobar").create()) { + fail("Should fail auth"); + } catch (AuthorizationException ae) { /* expected */ } + + log.info("LOG_TEST_CONSUMER_AUTHZ_FAIL"); + try (PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .authentication(new MockAuthentication("pass.fail")) + .build(); + Consumer consumer = client.newConsumer().topic("foobar") + .subscriptionName("foobar").subscribe()) { + fail("Should fail auth"); + } catch (AuthorizationException ae) { /* expected */ } + + log.info("LOG_TEST_CONSUMER_AUTHZ_ERROR"); + try (PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .authentication(new MockAuthentication("pass.error")) + .build(); + Consumer consumer = client.newConsumer().topic("foobar") + .subscriptionName("foobar").subscribe()) { + fail("Should fail auth"); + } catch (AuthorizationException ae) { /* expected */ } + + log.info("LOG_TEST_CLIENT_DONE"); + } + + @Test + public void httpEndpoint() throws Exception { + log.info("LOG_TEST_SUCCESS_CLIENT should succeeed both client"); + try (PulsarAdmin admin = PulsarAdmin.builder() + .authentication(new MockAuthentication("pass.pass")) + .serviceHttpUrl(brokerUrl.toString()).build()) { + admin.namespaces().getNamespaces("public"); + } + + log.info("LOG_TEST_HTTP_AUTHN_FAIL"); + try (PulsarAdmin admin = PulsarAdmin.builder() + .authentication(new MockAuthentication("fail.ignore")) + .serviceHttpUrl(brokerUrl.toString()).build()) { + admin.namespaces().getNamespaces("public"); + fail("Should fail auth"); + } catch (NotAuthorizedException ae) { /* expected */ } + + log.info("LOG_TEST_HTTP_AUTHN_ERROR"); + try (PulsarAdmin admin = PulsarAdmin.builder() + .authentication(new MockAuthentication("error.ignore")) + .serviceHttpUrl(brokerUrl.toString()).build()) { + admin.namespaces().getNamespaces("public"); + fail("Should fail auth"); + } catch (NotAuthorizedException ae) { /* expected */ } + + + log.info("LOG_TEST_HTTP_AUTHZ_FAIL"); + try (PulsarAdmin admin = PulsarAdmin.builder() + .authentication(new MockAuthentication("pass.fail")) + .serviceHttpUrl(brokerUrl.toString()).build()) { + admin.namespaces().getNamespaces("public"); + fail("Should fail auth"); + } catch (NotAuthorizedException ae) { /* expected */ } + + log.info("LOG_TEST_HTTP_AUTHZ_ERROR"); + try (PulsarAdmin admin = PulsarAdmin.builder() + .authentication(new MockAuthentication("pass.error")) + .serviceHttpUrl(brokerUrl.toString()).build()) { + admin.namespaces().getNamespaces("public"); + fail("Should fail auth"); + } catch (ServerSideErrorException ae) { /* expected */ } + + + log.info("LOG_TEST_CLIENT_DONE"); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java new file mode 100644 index 0000000000000..a63c8494cac3d --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.auth; + +import com.google.common.collect.ImmutableMap; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.apache.pulsar.client.api.PulsarClientException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MockAuthentication implements Authentication { + private final static Logger log = LoggerFactory.getLogger(MockAuthentication.class); + private final String user; + + public MockAuthentication(String user) { + this.user = user; + } + + @Override + public void close() {} + + @Override + public String getAuthMethodName() { + return "mock"; + } + + @Override + public AuthenticationDataProvider getAuthData() throws PulsarClientException { + return new AuthenticationDataProvider() { + @Override + public boolean hasDataForHttp() { return true; } + @Override + public String getHttpAuthType() { return "mock"; } + @Override + public Set> getHttpHeaders() throws Exception { + return ImmutableMap.of("mockuser", user).entrySet(); + } + @Override + public boolean hasDataFromCommand() { + return true; + } + @Override + public String getCommandData() { + return user; + } + }; + } + + @Override + public void configure(Map authParams) { + } + + @Override + public void start() throws PulsarClientException {} + + + @Override + public void authenticationStage(String requestUrl, + AuthenticationDataProvider authData, + Map previousResHeaders, + CompletableFuture> authFuture) { + authFuture.complete(null); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthenticationProvider.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthenticationProvider.java new file mode 100644 index 0000000000000..bd6a61f7da5f8 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthenticationProvider.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.auth; + +import java.io.IOException; + +import javax.naming.AuthenticationException; + +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.AuthenticationProvider; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MockAuthenticationProvider implements AuthenticationProvider { + private static Logger log = LoggerFactory.getLogger(MockAuthenticationProvider.class); + + @Override + public void close() throws IOException {} + + @Override + public void initialize(ServiceConfiguration config) throws IOException {} + + @Override + public String getAuthMethodName() { + // method name + return "mock"; + } + + @Override + public String authenticate(AuthenticationDataSource authData) throws AuthenticationException { + String principal = "unknown"; + if (authData.hasDataFromHttp()) { + principal = authData.getHttpHeader("mockuser"); + } else if (authData.hasDataFromCommand()) { + principal = authData.getCommandData(); + } + + String[] parts = principal.split("\\."); + if (parts.length == 2) { + if (parts[0].equals("pass")) { + return principal; + } else if (parts[0].equals("fail")) { + throw new AuthenticationException("Do not pass"); + } else if (parts[0].equals("error")) { + throw new RuntimeException("Error in authn"); + } + } + throw new IllegalArgumentException( + "Not a valid principle. Should be [pass|fail|error].[pass|fail|error], found " + principal); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java new file mode 100644 index 0000000000000..d219f41bfadc4 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java @@ -0,0 +1,289 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.auth; + +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import org.apache.pulsar.broker.authorization.AuthorizationProvider; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.cache.ConfigurationCacheService; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.PolicyName; +import org.apache.pulsar.common.policies.data.PolicyOperation; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.policies.data.NamespaceOperation; +import org.apache.pulsar.common.policies.data.TenantOperation; +import org.apache.pulsar.common.policies.data.TopicOperation; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MockAuthorizationProvider implements AuthorizationProvider { + private static final Logger log = LoggerFactory.getLogger(MockAuthorizationProvider.class); + + @Override + public void close() {} + + @Override + public CompletableFuture isSuperUser(String role, + AuthenticationDataSource authenticationData, + ServiceConfiguration serviceConfiguration) { + return roleAuthorizedAsync(role); + } + + @Override + public CompletableFuture isSuperUser(String role, ServiceConfiguration serviceConfiguration) { + return roleAuthorizedAsync(role); + } + + @Override + public CompletableFuture isTenantAdmin(String tenant, String role, TenantInfo tenantInfo, + AuthenticationDataSource authenticationData) { + return roleAuthorizedAsync(role); + } + + @Override + public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException { + } + + @Override + public CompletableFuture canProduceAsync(TopicName topicName, String role, + AuthenticationDataSource authenticationData) { + return roleAuthorizedAsync(role); + } + + @Override + public CompletableFuture canConsumeAsync(TopicName topicName, String role, + AuthenticationDataSource authenticationData, + String subscription) { + return roleAuthorizedAsync(role); + } + + @Override + public CompletableFuture canLookupAsync(TopicName topicName, String role, + AuthenticationDataSource authenticationData) { + return roleAuthorizedAsync(role); + } + + @Override + public CompletableFuture allowFunctionOpsAsync(NamespaceName namespaceName, String role, + AuthenticationDataSource authenticationData) { + return roleAuthorizedAsync(role); + } + + @Override + public CompletableFuture allowSourceOpsAsync(NamespaceName namespaceName, String role, + AuthenticationDataSource authenticationData) { + return roleAuthorizedAsync(role); + } + + @Override + public CompletableFuture allowSinkOpsAsync(NamespaceName namespaceName, String role, + AuthenticationDataSource authenticationData) { + return roleAuthorizedAsync(role); + } + + @Override + public CompletableFuture grantPermissionAsync(NamespaceName namespace, Set actions, String role, + String authDataJson) { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture grantSubscriptionPermissionAsync(NamespaceName namespace, + String subscriptionName, Set roles, + String authDataJson) { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture revokeSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName, + String role, String authDataJson) { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture grantPermissionAsync(TopicName topicName, Set actions, String role, + String authDataJson) { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture allowTenantOperationAsync(String tenantName, String originalRole, String role, + TenantOperation operation, + AuthenticationDataSource authData) { + return roleAuthorizedAsync(role); + } + + @Override + public Boolean allowTenantOperation(String tenantName, String originalRole, String role, TenantOperation operation, + AuthenticationDataSource authData) { + return roleAuthorized(role); + } + + @Override + public CompletableFuture allowTenantOperationAsync(String tenantName, String role, + TenantOperation operation, + AuthenticationDataSource authData) { + return roleAuthorizedAsync(role); + } + + @Override + public Boolean allowTenantOperation(String tenantName, String role, TenantOperation operation, + AuthenticationDataSource authData) { + return roleAuthorized(role); + } + + @Override + public CompletableFuture allowNamespaceOperationAsync(NamespaceName namespaceName, + String role, + NamespaceOperation operation, + AuthenticationDataSource authData) { + return roleAuthorizedAsync(role); + } + + @Override + public Boolean allowNamespaceOperation(NamespaceName namespaceName, + String role, + NamespaceOperation operation, + AuthenticationDataSource authData) { + return roleAuthorized(role); + } + + + @Override + public CompletableFuture allowNamespaceOperationAsync(NamespaceName namespaceName, + String originalRole, + String role, + NamespaceOperation operation, + AuthenticationDataSource authData) { + return roleAuthorizedAsync(role); + } + + @Override + public Boolean allowNamespaceOperation(NamespaceName namespaceName, + String originalRole, + String role, + NamespaceOperation operation, + AuthenticationDataSource authData) { + return roleAuthorized(role); + } + + @Override + public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceName namespaceName, + PolicyName policy, + PolicyOperation operation, + String role, + AuthenticationDataSource authData) { + return roleAuthorizedAsync(role); + } + + @Override + public Boolean allowNamespacePolicyOperation(NamespaceName namespaceName, + PolicyName policy, + PolicyOperation operation, + String role, + AuthenticationDataSource authData) { + return roleAuthorized(role); + } + + @Override + public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceName namespaceName, + PolicyName policy, + PolicyOperation operation, + String originalRole, + String role, + AuthenticationDataSource authData) { + return roleAuthorizedAsync(role); + } + + @Override + public Boolean allowNamespacePolicyOperation(NamespaceName namespaceName, + PolicyName policy, + PolicyOperation operation, + String originalRole, + String role, + AuthenticationDataSource authData) { + return roleAuthorized(role); + } + + @Override + public CompletableFuture allowTopicOperationAsync(TopicName topic, + String role, + TopicOperation operation, + AuthenticationDataSource authData) { + return roleAuthorizedAsync(role); + } + + @Override + public Boolean allowTopicOperation(TopicName topicName, + String role, + TopicOperation operation, + AuthenticationDataSource authData) { + return roleAuthorized(role); + } + + @Override + public CompletableFuture allowTopicOperationAsync(TopicName topic, + String originalRole, + String role, + TopicOperation operation, + AuthenticationDataSource authData) { + return roleAuthorizedAsync(role); + } + + @Override + public Boolean allowTopicOperation(TopicName topicName, + String originalRole, + String role, + TopicOperation operation, + AuthenticationDataSource authData) { + return roleAuthorized(role); + } + + CompletableFuture roleAuthorizedAsync(String role) { + CompletableFuture promise = new CompletableFuture<>(); + try { + promise.complete(roleAuthorized(role)); + } catch (Exception e) { + promise.completeExceptionally(e); + } + return promise; + } + + boolean roleAuthorized(String role) { + String[] parts = role.split("\\."); + if (parts.length == 2) { + if (parts[1].equals("pass")) { + return true; + } else if (parts[1].equals("fail")) { + return false; + } else if (parts[1].equals("error")) { + throw new RuntimeException("Error in authn"); + } + } + throw new IllegalArgumentException( + "Not a valid principle. Should be [pass|fail|error].[pass|fail|error], found " + role); + } +}