Skip to content

Commit

Permalink
PIP-55: Refresh Authentication Credentials (apache#6074)
Browse files Browse the repository at this point in the history
* PIP-55: Refresh Authentication Credentials

* Fixed import order

* Do not check for original client credential if it's not coming through proxy

* Fixed import order

* Fixed mocked test assumption

* Addressed comments

* Avoid to print NPE on auth refresh check if auth is disabled
  • Loading branch information
merlimat authored Feb 10, 2020
1 parent 981f6cb commit 4af5223
Show file tree
Hide file tree
Showing 14 changed files with 869 additions and 66 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,9 @@ authenticationEnabled=false
# Autentication provider name list, which is comma separated list of class names
authenticationProviders=

# Interval of time for checking for expired authentication credentials
authenticationRefreshCheckSeconds=60

# Enforce authorization
authorizationEnabled=false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private Set<String> authenticationProviders = Sets.newTreeSet();

@FieldContext(
category = CATEGORY_AUTHENTICATION,
doc = "Interval of time for checking for expired authentication credentials"
)
private int authenticationRefreshCheckSeconds = 60;

@FieldContext(
category = CATEGORY_AUTHORIZATION,
doc = "Enforce authorization"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,26 @@
*/
package org.apache.pulsar.broker.authentication;

import io.jsonwebtoken.Claims;
import io.jsonwebtoken.Jwt;
import io.jsonwebtoken.JwtException;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import static java.nio.charset.StandardCharsets.UTF_8;

import java.io.IOException;
import java.net.SocketAddress;
import java.security.Key;

import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;

import io.jsonwebtoken.security.SignatureException;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
import org.apache.pulsar.common.api.AuthData;

import io.jsonwebtoken.Claims;
import io.jsonwebtoken.Jwt;
import io.jsonwebtoken.JwtException;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import io.jsonwebtoken.security.SignatureException;

public class AuthenticationProviderToken implements AuthenticationProvider {

Expand Down Expand Up @@ -81,7 +86,13 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
String token = getToken(authData);

// Parse Token by validating
return parseToken(token);
return getPrincipal(authenticateToken(token));
}

@Override
public AuthenticationState newAuthState(AuthData authData, SocketAddress remoteAddress, SSLSession sslSession)
throws AuthenticationException {
return new TokenAuthenticationState(this, authData, remoteAddress, sslSession);
}

public static String getToken(AuthenticationDataSource authData) throws AuthenticationException {
Expand Down Expand Up @@ -112,19 +123,21 @@ private static String validateToken(final String token) throws AuthenticationExc
}
}

private String parseToken(final String token) throws AuthenticationException {
@SuppressWarnings("unchecked")
private Jwt<?, Claims> authenticateToken(final String token) throws AuthenticationException {
try {
@SuppressWarnings("unchecked")
Jwt<?, Claims> jwt = Jwts.parser()
return Jwts.parser()
.setSigningKey(validationKey)
.parse(token);

return jwt.getBody().get(roleClaim, String.class);
} catch (JwtException e) {
throw new AuthenticationException("Failed to authentication token: " + e.getMessage());
}
}

private String getPrincipal(Jwt<?, Claims> jwt) {
return jwt.getBody().get(roleClaim, String.class);
}

/**
* Try to get the validation key for tokens from several possible config options.
*/
Expand Down Expand Up @@ -166,4 +179,62 @@ private SignatureAlgorithm getPublicKeyAlgType(ServiceConfiguration conf) throws
return SignatureAlgorithm.RS256;
}
}

private static final class TokenAuthenticationState implements AuthenticationState {
private final AuthenticationProviderToken provider;
private AuthenticationDataSource authenticationDataSource;
private Jwt<?, Claims> jwt;
private final SocketAddress remoteAddress;
private final SSLSession sslSession;
private long expiration;

TokenAuthenticationState(
AuthenticationProviderToken provider,
AuthData authData,
SocketAddress remoteAddress,
SSLSession sslSession) throws AuthenticationException {
this.provider = provider;
this.remoteAddress = remoteAddress;
this.sslSession = sslSession;
this.authenticate(authData);
}

@Override
public String getAuthRole() throws AuthenticationException {
return provider.getPrincipal(jwt);
}

@Override
public AuthData authenticate(AuthData authData) throws AuthenticationException {
String token = new String(authData.getBytes(), UTF_8);

this.jwt = provider.authenticateToken(token);
this.authenticationDataSource = new AuthenticationDataCommand(token, remoteAddress, sslSession);
if (jwt.getBody().getExpiration() != null) {
this.expiration = jwt.getBody().getExpiration().getTime();
} else {
// Disable expiration
this.expiration = Long.MAX_VALUE;
}

// There's no additional auth stage required
return null;
}

@Override
public AuthenticationDataSource getAuthDataSource() {
return authenticationDataSource;
}

@Override
public boolean isComplete() {
// The authentication of tokens is always done in one single stage
return true;
}

@Override
public boolean isExpired() {
return expiration < System.currentTimeMillis();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.pulsar.broker.authentication;

import javax.naming.AuthenticationException;
import javax.servlet.http.HttpServletRequest;

import org.apache.pulsar.common.api.AuthData;

Expand Down Expand Up @@ -59,4 +58,25 @@ public interface AuthenticationState {
default long getStateId() {
return -1L;
}

/**
* If the authentication state is expired, it will force the connection to be re-authenticated.
*/
default boolean isExpired() {
return false;
}

/**
* If the authentication state supports refreshing and the credentials are expired,
* the auth provider will call this method ot initiate the refresh process.
* <p>
* The auth state here will return the broker side data that will be used to send
* a challenge to the client.
*
* @return the {@link AuthData} for the broker challenge to client
* @throws AuthenticationException
*/
default AuthData refreshAuthentication() throws AuthenticationException {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
package org.apache.pulsar.broker.authentication;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import io.jsonwebtoken.Claims;
Expand All @@ -28,6 +31,7 @@
import io.jsonwebtoken.SignatureAlgorithm;
import io.jsonwebtoken.io.Decoders;
import io.jsonwebtoken.security.Keys;
import lombok.Cleanup;

import java.io.File;
import java.io.IOException;
Expand All @@ -46,6 +50,7 @@

import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
import org.apache.pulsar.common.api.AuthData;
import org.testng.annotations.Test;

public class AuthenticationProviderTokenTest {
Expand Down Expand Up @@ -533,4 +538,36 @@ public void testValidationWhenPublicKeyAlgIsInvalid() throws IOException {

new AuthenticationProviderToken().initialize(conf);
}


@Test
public void testExpiringToken() throws Exception {
SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);

@Cleanup
AuthenticationProviderToken provider = new AuthenticationProviderToken();

Properties properties = new Properties();
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_SECRET_KEY,
AuthTokenUtils.encodeKeyBase64(secretKey));

ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
provider.initialize(conf);

// Create a token that will expire in 3 seconds
String expiringToken = AuthTokenUtils.createToken(secretKey, SUBJECT,
Optional.of(new Date(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(3))));

AuthenticationState authState = provider.newAuthState(AuthData.of(expiringToken.getBytes()), null, null);
assertTrue(authState.isComplete());
assertFalse(authState.isExpired());

Thread.sleep(TimeUnit.SECONDS.toMillis(6));
assertTrue(authState.isExpired());
assertTrue(authState.isComplete());

AuthData brokerData = authState.refreshAuthentication();
assertNull(brokerData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,27 @@
*/
package org.apache.pulsar.broker.service;

import static org.apache.bookkeeper.util.SafeRunnable.safeRun;

import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.NettySslContextBuilder;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.flow.FlowControlHandler;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> {

public static final String TLS_HANDLER = "tls";
Expand All @@ -38,6 +48,14 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
private final NettySslContextBuilder sslCtxRefresher;
private final ServiceConfiguration brokerConf;

// This cache is used to maintain a list of active connections to iterate over them
// We keep weak references to have the cache to be auto cleaned up when the connections
// objects are GCed.
private final Cache<SocketAddress, ServerCnx> connections = Caffeine.newBuilder()
.weakKeys()
.weakValues()
.build();

/**
* @param pulsar
* An instance of {@link PulsarService}
Expand All @@ -59,6 +77,10 @@ public PulsarChannelInitializer(PulsarService pulsar, boolean enableTLS) throws
this.sslCtxRefresher = null;
}
this.brokerConf = pulsar.getConfiguration();

pulsar.getExecutor().scheduleAtFixedRate(safeRun(this::refreshAuthenticationCredentials),
pulsar.getConfig().getAuthenticationRefreshCheckSeconds(),
pulsar.getConfig().getAuthenticationRefreshCheckSeconds(), TimeUnit.SECONDS);
}

@Override
Expand All @@ -78,6 +100,19 @@ protected void initChannel(SocketChannel ch) throws Exception {
// ServerCnx ends up reading higher number of messages and broker can not throttle the messages by disabling
// auto-read.
ch.pipeline().addLast("flowController", new FlowControlHandler());
ch.pipeline().addLast("handler", new ServerCnx(pulsar));
ServerCnx cnx = new ServerCnx(pulsar);
ch.pipeline().addLast("handler", cnx);

connections.put(ch.remoteAddress(), cnx);
}

private void refreshAuthenticationCredentials() {
connections.asMap().values().forEach(cnx -> {
try {
cnx.refreshAuthenticationCredentials();
} catch (Throwable t) {
log.warn("[{}] Failed to refresh auth credentials", cnx.getRemoteAddress());
}
});
}
}
Loading

0 comments on commit 4af5223

Please sign in to comment.