Skip to content

Commit

Permalink
PLSR-1470: Only auth _errors_ should log at error level (apache#9325)
Browse files Browse the repository at this point in the history
An authentication failure should not log at error level, as it depends
on the credentials that the client provides, which may well be wrong.

An authentication/authorization error, where it cannot be decided if
credentials are valid or not, is an error that needs operator
attention and therefore should be logged at error level.

Co-authored-by: Ivan Kelly <[email protected]>
Co-authored-by: Matteo Merli <[email protected]>
Co-authored-by: Sijie Guo <[email protected]>
  • Loading branch information
4 people authored Feb 15, 2021
1 parent 6ebc795 commit fd9875c
Show file tree
Hide file tree
Showing 7 changed files with 693 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public AuthenticationState newAuthState(AuthData authData, SocketAddress remoteA
authenticationException = ae;
}
if (states.isEmpty()) {
log.error("Failed to initialize a new auth state from {}", remoteAddress, authenticationException);
log.debug("Failed to initialize a new auth state from {}", remoteAddress, authenticationException);
if (authenticationException != null) {
throw authenticationException;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.io.IOException;

import javax.naming.AuthenticationException;

import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
Expand Down Expand Up @@ -93,7 +95,11 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
} catch (Exception e) {
HttpServletResponse httpResponse = (HttpServletResponse) response;
httpResponse.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Authentication required");
LOG.warn("[{}] Failed to authenticate HTTP request: {}", request.getRemoteAddr(), e.getMessage());
if (e instanceof AuthenticationException) {
LOG.warn("[{}] Failed to authenticate HTTP request: {}", request.getRemoteAddr(), e.getMessage());
} else {
LOG.error("[{}] Error performing authentication for HTTP", request.getRemoteAddr(), e);
}
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
Expand Down Expand Up @@ -405,8 +404,8 @@ protected void handleLookup(CommandLookupTopic lookup) {
}
return null;
}).exceptionally(ex -> {
logAuthException(remoteAddress, "lookup", getPrincipal(), Optional.of(topicName), ex);
final String msg = "Exception occurred while trying to authorize lookup";
log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, getPrincipal(), topicName, ex);
ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId));
lookupSemaphore.release();
return null;
Expand Down Expand Up @@ -478,8 +477,8 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
}
return null;
}).exceptionally(ex -> {
logAuthException(remoteAddress, "partition-metadata", getPrincipal(), Optional.of(topicName), ex);
final String msg = "Exception occurred while trying to authorize get Partition Metadata";
log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, getPrincipal(), topicName);
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg,
requestId));
lookupSemaphore.release();
Expand Down Expand Up @@ -791,12 +790,8 @@ protected void handleConnect(CommandConnect connect) {
}
}
} catch (Exception e) {
logAuthException(remoteAddress, "connect", getPrincipal(), Optional.empty(), e);
String msg = "Unable to authenticate";
if (e instanceof AuthenticationException) {
log.warn("[{}] {}: {}", remoteAddress, msg, e.getMessage());
} else {
log.warn("[{}] {}", remoteAddress, msg, e);
}
ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg));
close();
}
Expand Down Expand Up @@ -1026,12 +1021,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
}
return null;
}).exceptionally(ex -> {
String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), getPrincipal());
if (ex.getCause() instanceof PulsarServerException) {
log.info(msg);
} else {
log.warn(msg);
}
logAuthException(remoteAddress, "subscribe", getPrincipal(), Optional.of(topicName), ex);
commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage());
return null;
});
Expand Down Expand Up @@ -1264,8 +1254,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {
}
return null;
}).exceptionally(ex -> {
String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), getPrincipal());
log.warn(msg);
logAuthException(remoteAddress, "producer", getPrincipal(), Optional.of(topicName), ex);
commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage());
return null;
});
Expand Down Expand Up @@ -2314,4 +2303,16 @@ public PulsarCommandSender getCommandSender() {
public void execute(Runnable runnable) {
ctx.channel().eventLoop().execute(runnable);
}

private static void logAuthException(SocketAddress remoteAddress, String operation,
String principal, Optional<TopicName> topic, Throwable ex) {
String topicString = topic.map(t -> ", topic=" + t.toString()).orElse("");
if (ex instanceof AuthenticationException) {
log.info("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}",
remoteAddress, operation, principal, topicString, ex.getMessage());
} else {
log.error("[{}] Error trying to authenticate: operation={}, principal={}{}",
remoteAddress, operation, principal, topicString, ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.auth;

import static org.testng.Assert.fail;

import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.annotations.AfterClass;

import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
import org.apache.pulsar.client.admin.PulsarAdminException.ServerSideErrorException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException.AuthenticationException;
import org.apache.pulsar.client.api.PulsarClientException.AuthorizationException;
import org.apache.pulsar.client.api.Producer;
import com.google.common.collect.Sets;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This test doesn't test much in and off itself.
* However it is useful to see which logs are produced when there's an
* failure or error in authentication.
*/
public class AuthLogsTest extends MockedPulsarServiceBaseTest {
private static final Logger log = LoggerFactory.getLogger(AuthLogsTest.class);

public AuthLogsTest() {
super();
}

@BeforeClass
@Override
public void setup() throws Exception {
conf.setClusterName("test");
conf.setAuthenticationEnabled(true);
conf.setAuthenticationProviders(
Sets.newHashSet("org.apache.pulsar.broker.auth.MockAuthenticationProvider"));
conf.setAuthorizationProvider("org.apache.pulsar.broker.auth.MockAuthorizationProvider");
conf.setAuthorizationEnabled(true);
conf.setAuthorizationAllowWildcardsMatching(true);
conf.setSuperUserRoles(Sets.newHashSet("super"));
internalSetup();

try (PulsarAdmin admin = PulsarAdmin.builder()
.authentication(new MockAuthentication("pass.pass"))
.serviceHttpUrl(brokerUrl.toString()).build()) {
admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
admin.tenants().createTenant("public",
new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
admin.namespaces().createNamespace("public/default");
admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet("test"));
}
}

@AfterClass(alwaysRun = true)
@Override
public void cleanup() throws Exception {
internalCleanup();
}

@Test
public void binaryEndpoint() throws Exception {
log.info("LOG_TEST_SUCCESS_CLIENT should succeeed both client");
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.authentication(new MockAuthentication("pass.pass"))
.build();
Producer<byte[]> producer = client.newProducer().topic("foobar").create();
Consumer<byte[]> consumer = client.newConsumer().topic("foobar")
.subscriptionName("foobar").subscribe()) {
}

log.info("LOG_TEST_PRODUCER_AUTHN_FAIL");
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.authentication(new MockAuthentication("fail.ignored"))
.build();
Producer<byte[]> producer = client.newProducer().topic("foobar").create()) {
fail("Should fail auth");
} catch (AuthenticationException ae) { /* expected */ }

log.info("LOG_TEST_PRODUCER_AUTHN_ERROR");
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.authentication(new MockAuthentication("error.ignored"))
.build();
Producer<byte[]> producer = client.newProducer().topic("foobar").create()) {
fail("Should fail auth");
} catch (AuthenticationException ae) { /* expected */ }

log.info("LOG_TEST_CONSUMER_AUTHN_FAIL");
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.authentication(new MockAuthentication("fail.ignored"))
.build();
Consumer<byte[]> consumer = client.newConsumer().topic("foobar")
.subscriptionName("foobar").subscribe()) {
fail("Should fail auth");
} catch (AuthenticationException ae) { /* expected */ }

log.info("LOG_TEST_CONSUMER_AUTHN_ERROR");
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.authentication(new MockAuthentication("error.ignored"))
.build();
Consumer<byte[]> consumer = client.newConsumer().topic("foobar")
.subscriptionName("foobar").subscribe()) {
fail("Should fail auth");
} catch (AuthenticationException ae) { /* expected */ }

log.info("LOG_TEST_PRODUCER_AUTHZ_FAIL");
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.authentication(new MockAuthentication("pass.fail"))
.build();
Producer<byte[]> producer = client.newProducer().topic("foobar").create()) {
fail("Should fail auth");
} catch (AuthorizationException ae) { /* expected */ }

log.info("LOG_TEST_PRODUCER_AUTHZ_ERROR");
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.authentication(new MockAuthentication("pass.error"))
.build();
Producer<byte[]> producer = client.newProducer().topic("foobar").create()) {
fail("Should fail auth");
} catch (AuthorizationException ae) { /* expected */ }

log.info("LOG_TEST_CONSUMER_AUTHZ_FAIL");
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.authentication(new MockAuthentication("pass.fail"))
.build();
Consumer<byte[]> consumer = client.newConsumer().topic("foobar")
.subscriptionName("foobar").subscribe()) {
fail("Should fail auth");
} catch (AuthorizationException ae) { /* expected */ }

log.info("LOG_TEST_CONSUMER_AUTHZ_ERROR");
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.authentication(new MockAuthentication("pass.error"))
.build();
Consumer<byte[]> consumer = client.newConsumer().topic("foobar")
.subscriptionName("foobar").subscribe()) {
fail("Should fail auth");
} catch (AuthorizationException ae) { /* expected */ }

log.info("LOG_TEST_CLIENT_DONE");
}

@Test
public void httpEndpoint() throws Exception {
log.info("LOG_TEST_SUCCESS_CLIENT should succeeed both client");
try (PulsarAdmin admin = PulsarAdmin.builder()
.authentication(new MockAuthentication("pass.pass"))
.serviceHttpUrl(brokerUrl.toString()).build()) {
admin.namespaces().getNamespaces("public");
}

log.info("LOG_TEST_HTTP_AUTHN_FAIL");
try (PulsarAdmin admin = PulsarAdmin.builder()
.authentication(new MockAuthentication("fail.ignore"))
.serviceHttpUrl(brokerUrl.toString()).build()) {
admin.namespaces().getNamespaces("public");
fail("Should fail auth");
} catch (NotAuthorizedException ae) { /* expected */ }

log.info("LOG_TEST_HTTP_AUTHN_ERROR");
try (PulsarAdmin admin = PulsarAdmin.builder()
.authentication(new MockAuthentication("error.ignore"))
.serviceHttpUrl(brokerUrl.toString()).build()) {
admin.namespaces().getNamespaces("public");
fail("Should fail auth");
} catch (NotAuthorizedException ae) { /* expected */ }


log.info("LOG_TEST_HTTP_AUTHZ_FAIL");
try (PulsarAdmin admin = PulsarAdmin.builder()
.authentication(new MockAuthentication("pass.fail"))
.serviceHttpUrl(brokerUrl.toString()).build()) {
admin.namespaces().getNamespaces("public");
fail("Should fail auth");
} catch (NotAuthorizedException ae) { /* expected */ }

log.info("LOG_TEST_HTTP_AUTHZ_ERROR");
try (PulsarAdmin admin = PulsarAdmin.builder()
.authentication(new MockAuthentication("pass.error"))
.serviceHttpUrl(brokerUrl.toString()).build()) {
admin.namespaces().getNamespaces("public");
fail("Should fail auth");
} catch (ServerSideErrorException ae) { /* expected */ }


log.info("LOG_TEST_CLIENT_DONE");
}

}
Loading

0 comments on commit fd9875c

Please sign in to comment.