Skip to content

Commit

Permalink
[fix][broker] Fix the reason label of authentication metrics (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoran10 authored May 5, 2023
1 parent 010bd50 commit 2b515ff
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ public class AuthenticationProviderAthenz implements AuthenticationProvider {
private List<String> domainNameList = null;
private int allowedOffset = 30;

public enum ErrorCode {
UNKNOWN,
NO_CLIENT,
NO_TOKEN,
NO_PUBLIC_KEY,
DOMAIN_MISMATCH,
INVALID_TOKEN,
}

@Override
public void initialize(ServiceConfiguration config) throws IOException {
String domainNames;
Expand Down Expand Up @@ -81,11 +90,13 @@ public String getAuthMethodName() {
public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
SocketAddress clientAddress;
String roleToken;
ErrorCode errorCode = ErrorCode.UNKNOWN;
try {

if (authData.hasDataFromPeer()) {
clientAddress = authData.getPeerAddress();
} else {
errorCode = ErrorCode.NO_CLIENT;
throw new AuthenticationException("Authentication data source does not have a client address");
}

Expand All @@ -94,13 +105,16 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
} else if (authData.hasDataFromHttp()) {
roleToken = authData.getHttpHeader(AuthZpeClient.ZPE_TOKEN_HDR);
} else {
errorCode = ErrorCode.NO_TOKEN;
throw new AuthenticationException("Authentication data source does not have a role token");
}

if (roleToken == null) {
errorCode = ErrorCode.NO_TOKEN;
throw new AuthenticationException("Athenz token is null, can't authenticate");
}
if (roleToken.isEmpty()) {
errorCode = ErrorCode.NO_TOKEN;
throw new AuthenticationException("Athenz RoleToken is empty, Server is Using Athenz Authentication");
}
if (log.isDebugEnabled()) {
Expand All @@ -110,6 +124,7 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
RoleToken token = new RoleToken(roleToken);

if (!domainNameList.contains(token.getDomain())) {
errorCode = ErrorCode.DOMAIN_MISMATCH;
throw new AuthenticationException(
String.format("Athenz RoleToken Domain mismatch, Expected: %s, Found: %s",
domainNameList.toString(), token.getDomain()));
Expand All @@ -120,6 +135,7 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
PublicKey ztsPublicKey = AuthZpeClient.getZtsPublicKey(token.getKeyId());

if (ztsPublicKey == null) {
errorCode = ErrorCode.NO_PUBLIC_KEY;
throw new AuthenticationException("Unable to retrieve ZTS Public Key");
}

Expand All @@ -128,13 +144,13 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), getAuthMethodName());
return token.getPrincipal();
} else {
errorCode = ErrorCode.INVALID_TOKEN;
throw new AuthenticationException(
String.format("Athenz Role Token Not Authenticated from Client: %s", clientAddress));
}
}
} catch (AuthenticationException exception) {
AuthenticationMetrics.authenticateFailure(getClass().getSimpleName(), getAuthMethodName(),
exception.getMessage());
incrementFailureMetric(errorCode);
throw exception;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ DecodedJWT verifyJWT(PublicKey publicKey,
}

static void incrementFailureMetric(AuthenticationExceptionCode code) {
AuthenticationMetrics.authenticateFailure(SIMPLE_NAME, "token", code.toString());
AuthenticationMetrics.authenticateFailure(SIMPLE_NAME, AUTH_METHOD_NAME, code);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.util.FutureUtil;

Expand Down Expand Up @@ -143,6 +144,10 @@ default CompletableFuture<Boolean> authenticateHttpRequestAsync(HttpServletReque
}
}

default void incrementFailureMetric(Enum<?> errorCode) {
AuthenticationMetrics.authenticateFailure(getClass().getSimpleName(), getAuthMethodName(), errorCode);
}

/**
* Set response, according to passed in request.
* and return whether we should do following chain.doFilter or not.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ public class AuthenticationProviderBasic implements AuthenticationProvider {
private static final String CONF_PULSAR_PROPERTY_KEY = "basicAuthConf";
private Map<String, String> users;

private enum ErrorCode {
UNKNOWN,
EMPTY_AUTH_DATA,
INVALID_HEADER,
INVALID_AUTH_DATA,
INVALID_TOKEN,
}

@Override
public void close() throws IOException {
// noop
Expand Down Expand Up @@ -104,9 +112,10 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
String userId = authParams.getUserId();
String password = authParams.getPassword();
String msg = "Unknown user or invalid password";

ErrorCode errorCode = ErrorCode.UNKNOWN;
try {
if (users.get(userId) == null) {
errorCode = ErrorCode.INVALID_AUTH_DATA;
throw new AuthenticationException(msg);
}

Expand All @@ -117,15 +126,16 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
List<String> splitEncryptedPassword = Arrays.asList(encryptedPassword.split("\\$"));
if (splitEncryptedPassword.size() != 4 || !encryptedPassword
.equals(Md5Crypt.apr1Crypt(password.getBytes(), splitEncryptedPassword.get(2)))) {
errorCode = ErrorCode.INVALID_TOKEN;
throw new AuthenticationException(msg);
}
// For crypt algorithm
} else if (!encryptedPassword.equals(Crypt.crypt(password.getBytes(), encryptedPassword.substring(0, 2)))) {
errorCode = ErrorCode.INVALID_TOKEN;
throw new AuthenticationException(msg);
}
} catch (AuthenticationException exception) {
AuthenticationMetrics.authenticateFailure(getClass().getSimpleName(), getAuthMethodName(),
exception.getMessage());
incrementFailureMetric(errorCode);
throw exception;
}
AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), getAuthMethodName());
Expand All @@ -144,24 +154,29 @@ public AuthParams(AuthenticationDataSource authData) throws AuthenticationExcept
String rawAuthToken = authData.getHttpHeader(HTTP_HEADER_NAME);
// parsing and validation
if (StringUtils.isBlank(rawAuthToken) || !rawAuthToken.toUpperCase().startsWith("BASIC ")) {
incrementFailureMetric(ErrorCode.INVALID_HEADER);
throw new AuthenticationException("Authentication token has to be started with \"Basic \"");
}
String[] splitRawAuthToken = rawAuthToken.split(" ");
if (splitRawAuthToken.length != 2) {
incrementFailureMetric(ErrorCode.INVALID_HEADER);
throw new AuthenticationException("Base64 encoded token is not found");
}

try {
authParams = new String(Base64.getDecoder().decode(splitRawAuthToken[1]));
} catch (Exception e) {
incrementFailureMetric(ErrorCode.INVALID_HEADER);
throw new AuthenticationException("Base64 decoding is failure: " + e.getMessage());
}
} else {
incrementFailureMetric(ErrorCode.EMPTY_AUTH_DATA);
throw new AuthenticationException("Authentication data source does not have data");
}

String[] parsedAuthParams = authParams.split(":");
if (parsedAuthParams.length != 2) {
incrementFailureMetric(ErrorCode.INVALID_AUTH_DATA);
throw new AuthenticationException("Base64 decoded params are invalid");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,15 @@ private interface AuthProcessor<T, W> {

}

private enum ErrorCode {
UNKNOWN,
AUTH_REQUIRED,
}

static <T, W> T applyAuthProcessor(List<W> processors, AuthProcessor<T, W> authFunc)
throws AuthenticationException {
AuthenticationException authenticationException = null;
String errorCode = ErrorCode.UNKNOWN.name();
for (W ap : processors) {
try {
return authFunc.apply(ap);
Expand All @@ -56,19 +62,19 @@ static <T, W> T applyAuthProcessor(List<W> processors, AuthProcessor<T, W> authF
}
// Store the exception so we can throw it later instead of a generic one
authenticationException = ae;
errorCode = ap.getClass().getSimpleName() + "-INVALID-AUTH";
}
}

if (null == authenticationException) {
AuthenticationMetrics.authenticateFailure(
AuthenticationProviderList.class.getSimpleName(),
"authentication-provider-list", "Authentication required");
"authentication-provider-list", ErrorCode.AUTH_REQUIRED);
throw new AuthenticationException("Authentication required");
} else {
AuthenticationMetrics.authenticateFailure(AuthenticationProviderList.class.getSimpleName(),
"authentication-provider-list",
authenticationException.getMessage() != null
? authenticationException.getMessage() : "Authentication required");
AuthenticationMetrics.authenticateFailure(
AuthenticationProviderList.class.getSimpleName(),
"authentication-provider-list", errorCode);
throw authenticationException;
}

Expand Down Expand Up @@ -129,7 +135,7 @@ private void authenticateRemainingAuthStates(CompletableFuture<AuthData> authCha
previousException = new AuthenticationException("Authentication required");
}
AuthenticationMetrics.authenticateFailure(AuthenticationProviderList.class.getSimpleName(),
"authentication-provider-list", "Authentication required");
"authentication-provider-list", ErrorCode.AUTH_REQUIRED);
authChallengeFuture.completeExceptionally(previousException);
return;
}
Expand Down Expand Up @@ -235,7 +241,7 @@ private void authenticateRemainingAuthProviders(CompletableFuture<String> roleFu
previousException = new AuthenticationException("Authentication required");
}
AuthenticationMetrics.authenticateFailure(AuthenticationProviderList.class.getSimpleName(),
"authentication-provider-list", "Authentication required");
"authentication-provider-list", ErrorCode.AUTH_REQUIRED);
roleFuture.completeExceptionally(previousException);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@

public class AuthenticationProviderTls implements AuthenticationProvider {

private enum ErrorCode {
UNKNOWN,
INVALID_CERTS,
INVALID_CN, // invalid common name
}

@Override
public void close() throws IOException {
// noop
Expand All @@ -45,6 +51,7 @@ public String getAuthMethodName() {
@Override
public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
String commonName = null;
ErrorCode errorCode = ErrorCode.UNKNOWN;
try {
if (authData.hasDataFromTls()) {
/**
Expand Down Expand Up @@ -72,6 +79,7 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
// CN=Steve Kille,O=Isode Limited,C=GB
Certificate[] certs = authData.getTlsCertificates();
if (null == certs) {
errorCode = ErrorCode.INVALID_CERTS;
throw new AuthenticationException("Failed to get TLS certificates from client");
}
String distinguishedName = ((X509Certificate) certs[0]).getSubjectX500Principal().getName();
Expand All @@ -85,12 +93,12 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
}

if (commonName == null) {
errorCode = ErrorCode.INVALID_CN;
throw new AuthenticationException("Client unable to authenticate with TLS certificate");
}
AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), getAuthMethodName());
} catch (AuthenticationException exception) {
AuthenticationMetrics.authenticateFailure(getClass().getSimpleName(), getAuthMethodName(),
exception.getMessage());
incrementFailureMetric(errorCode);
throw exception;
}
return commonName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ public class AuthenticationProviderToken implements AuthenticationProvider {
private String confTokenAudienceSettingName;
private String confTokenAllowedClockSkewSecondsSettingName;

public enum ErrorCode {
INVALID_AUTH_DATA,
INVALID_TOKEN,
INVALID_AUDIENCES,
}

@Override
public void close() throws IOException {
// noop
Expand Down Expand Up @@ -158,19 +164,18 @@ public String getAuthMethodName() {

@Override
public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
String token;
try {
// Get Token
String token;
token = getToken(authData);
// Parse Token by validating
String role = getPrincipal(authenticateToken(token));
AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), getAuthMethodName());
return role;
} catch (AuthenticationException exception) {
AuthenticationMetrics.authenticateFailure(getClass().getSimpleName(), getAuthMethodName(),
exception.getMessage());
incrementFailureMetric(ErrorCode.INVALID_AUTH_DATA);
throw exception;
}
// Parse Token by validating
String role = getPrincipal(authenticateToken(token));
AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), getAuthMethodName());
return role;
}

@Override
Expand Down Expand Up @@ -241,16 +246,19 @@ private Jwt<?, Claims> authenticateToken(final String token) throws Authenticati
List<String> audiences = (List<String>) object;
// audience not contains this broker, throw exception.
if (audiences.stream().noneMatch(audienceInToken -> audienceInToken.equals(audience))) {
throw new AuthenticationException("Audiences in token: [" + String.join(", ", audiences)
+ "] not contains this broker: " + audience);
incrementFailureMetric(ErrorCode.INVALID_AUDIENCES);
throw new AuthenticationException("Audiences in token: ["
+ String.join(", ", audiences) + "] not contains this broker: " + audience);
}
} else if (object instanceof String) {
if (!object.equals(audience)) {
throw new AuthenticationException("Audiences in token: [" + object
+ "] not contains this broker: " + audience);
incrementFailureMetric(ErrorCode.INVALID_AUDIENCES);
throw new AuthenticationException(
"Audiences in token: [" + object + "] not contains this broker: " + audience);
}
} else {
// should not reach here.
incrementFailureMetric(ErrorCode.INVALID_AUDIENCES);
throw new AuthenticationException("Audiences in token is not in expected format: " + object);
}
}
Expand All @@ -264,6 +272,7 @@ private Jwt<?, Claims> authenticateToken(final String token) throws Authenticati
if (e instanceof ExpiredJwtException) {
expiredTokenMetrics.inc();
}
incrementFailureMetric(ErrorCode.INVALID_TOKEN);
throw new AuthenticationException("Failed to authentication token: " + e.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,27 @@ public static void authenticateSuccess(String providerName, String authMethod) {

/**
* Log authenticate failure event to the authentication metrics.
*
* This method is deprecated due to the label "reason" is a potential infinite value.
* @deprecated See {@link #authenticateFailure(String, String, Enum)} ()}
*
* @param providerName The short class name of the provider
* @param authMethod Authentication method name.
* @param reason Failure reason.
*/
@Deprecated
public static void authenticateFailure(String providerName, String authMethod, String reason) {
authFailuresMetrics.labels(providerName, authMethod, reason).inc();
}

/**
* Log authenticate failure event to the authentication metrics.
* @param providerName The short class name of the provider
* @param authMethod Authentication method name.
* @param errorCode Error code.
*/
public static void authenticateFailure(String providerName, String authMethod, Enum<?> errorCode) {
authFailuresMetrics.labels(providerName, authMethod, errorCode.name()).inc();
}

}
Loading

0 comments on commit 2b515ff

Please sign in to comment.