Skip to content

Commit

Permalink
[Issue apache#6711]: add audience verify in AuthenticationProviderTok…
Browse files Browse the repository at this point in the history
…en (apache#6716)

Fixes apache#6711

### Motivation
User like to be able to configure the JWT authentication provider to verify the audience on incoming tokens.  I believe this will improve security because it would prevent a spoofer from reusing a token that was intended for another purpose (yet signed by the same issuer).  [RFC 6749 section 4.1.3](https://tools.ietf.org/html/rfc7519#section-4.1.3) has some guidance on this.  In my scenario, the token is an OAuth 2.0 token, and OAuth 2.0 makes extensive use of the audience claim ([ref](https://auth0.com/docs/tokens/guides/validate-access-tokens#check-additional-standard-claims)).

1. a configurable audience claim name (e.g. `aud`).
2. if audience isn't configured, do not validate the audience (for back-compatibility).
3. if audience is configured, validate that the value is present in the token.

### Modifications
- Add the logic in AuthenticationProviderToken.
- Add related tests.

### Verifying this change
- Ut passed
  • Loading branch information
jiazhai authored Apr 16, 2020
1 parent 1fd1b2b commit d6709ae
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 12 deletions.
11 changes: 9 additions & 2 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ brokerDeleteInactiveTopicsMode=delete_when_no_subscriptions
# Topics that are inactive for longer than this value will be deleted
brokerDeleteInactiveTopicsMaxInactiveDurationSeconds=

# Max pending publish requests per connection to avoid keeping large number of pending
# Max pending publish requests per connection to avoid keeping large number of pending
# requests in memory. Default: 1000
maxPendingPublishdRequestsPerConnection=1000

Expand Down Expand Up @@ -460,6 +460,13 @@ tokenPublicKey=
# The token "claim" that will be interpreted as the authentication "role" or "principal" by AuthenticationProviderToken (defaults to "sub" if blank)
tokenAuthClaim=

# The token audience "claim" name, e.g. "aud", that will be used to get the audience from token.
# If not set, audience will not be verified.
tokenAudienceClaim=

# The token audience stands for this broker. The field `tokenAudienceClaim` of a valid token, need contains this.
tokenAudience=

### --- SASL Authentication Provider --- ###

# This is a regexp, which limits the range of possible ids which can connect to the Broker using SASL.
Expand Down Expand Up @@ -758,7 +765,7 @@ replicationProducerQueueSize=1000
# Replicator prefix used for replicator producer name and cursor name
replicatorPrefix=pulsar.repl

# Duration to check replication policy to avoid replicator inconsistency
# Duration to check replication policy to avoid replicator inconsistency
# due to missing ZooKeeper watch (disable with value 0)
replicatioPolicyCheckDurationSeconds=600

Expand Down
9 changes: 8 additions & 1 deletion conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ brokerClientTrustCertsFilePath=
# Whether TLS is enabled when communicating with Pulsar brokers
tlsEnabledWithBroker=false

# Tls cert refresh duration in seconds (set 0 to check on every new connection)
# Tls cert refresh duration in seconds (set 0 to check on every new connection)
tlsCertRefreshCheckDurationSec=300

##### --- Rate Limiting --- #####
Expand Down Expand Up @@ -192,6 +192,13 @@ tokenPublicKey=
# The token "claim" that will be interpreted as the authentication "role" or "principal" by AuthenticationProviderToken (defaults to "sub" if blank)
tokenAuthClaim=

# The token audience "claim" name, e.g. "aud", that will be used to get the audience from token.
# If not set, audience will not be verified.
tokenAudienceClaim=

# The token audience stands for this broker. The field `tokenAudienceClaim` of a valid token, need contains this.
tokenAudience=

### --- Deprecated config variables --- ###

# Deprecated. Use configurationStoreServers
Expand Down
15 changes: 11 additions & 4 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ brokerDeleteInactiveTopicsEnabled=true
# How often to check for inactive topics
brokerDeleteInactiveTopicsFrequencySeconds=60

# Max pending publish requests per connection to avoid keeping large number of pending
# Max pending publish requests per connection to avoid keeping large number of pending
# requests in memory. Default: 1000
maxPendingPublishdRequestsPerConnection=1000

Expand Down Expand Up @@ -179,8 +179,8 @@ dispatchThrottlingRatePerTopicInMsg=0
# default message-byte dispatch-throttling
dispatchThrottlingRatePerTopicInByte=0

# Dispatch rate-limiting relative to publish rate.
# (Enabling flag will make broker to dynamically update dispatch-rate relatively to publish-rate:
# Dispatch rate-limiting relative to publish rate.
# (Enabling flag will make broker to dynamically update dispatch-rate relatively to publish-rate:
# throttle-dispatch-rate = (publish-rate + configured dispatch-rate).
dispatchThrottlingRateRelativeToPublishRate=false

Expand Down Expand Up @@ -265,6 +265,13 @@ anonymousUserRole=
# The token "claim" that will be interpreted as the authentication "role" or "principal" by AuthenticationProviderToken (defaults to "sub" if blank)
tokenAuthClaim=

# The token audience "claim" name, e.g. "aud", that will be used to get the audience from token.
# If not set, audience will not be verified.
tokenAudienceClaim=

# The token audience stands for this broker. The field `tokenAudienceClaim` of a valid token, need contains this.
tokenAudience=

### --- BookKeeper Client --- ###

# Authentication plugin to use when connecting to bookies
Expand Down Expand Up @@ -505,7 +512,7 @@ replicationConnectionsPerBroker=16
# Replicator producer queue size
replicationProducerQueueSize=1000

# Duration to check replication policy to avoid replicator inconsistency
# Duration to check replication policy to avoid replicator inconsistency
# due to missing ZooKeeper watch (disable with value 0)
replicatioPolicyCheckDurationSeconds=600

Expand Down
6 changes: 3 additions & 3 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -485,9 +485,9 @@ The Apache Software License, Version 2.0
* Prometheus
- io.prometheus-simpleclient_httpserver-0.5.0.jar
* Java JSON WebTokens
- io.jsonwebtoken-jjwt-api-0.10.5.jar
- io.jsonwebtoken-jjwt-impl-0.10.5.jar
- io.jsonwebtoken-jjwt-jackson-0.10.5.jar
- io.jsonwebtoken-jjwt-api-0.11.1.jar
- io.jsonwebtoken-jjwt-impl-0.11.1.jar
- io.jsonwebtoken-jjwt-jackson-0.11.1.jar
* JavaX Injection
- javax.inject-javax.inject-1.jar
* JCTools - Java Concurrency Tools for the JVM
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ flexible messaging model and an intuitive client API.</description>
<flink.version>1.6.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<debezium.version>1.0.0.Final</debezium.version>
<jsonwebtoken.version>0.10.5</jsonwebtoken.version>
<jsonwebtoken.version>0.11.1</jsonwebtoken.version>
<opencensus.version>0.18.0</opencensus.version>
<zstd.version>1.3.7-3</zstd.version>
<snappy.version>1.1.1.3</snappy.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.net.SocketAddress;
import java.security.Key;

import java.util.List;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;

Expand Down Expand Up @@ -56,11 +57,19 @@ public class AuthenticationProviderToken implements AuthenticationProvider {
// When using public key's, the algorithm of the key
final static String CONF_TOKEN_PUBLIC_ALG = "tokenPublicAlg";

// The token audience "claim" name, e.g. "aud", that will be used to get the audience from token.
final static String CONF_TOKEN_AUDIENCE_CLAIM = "tokenAudienceClaim";

// The token audience stands for this broker. The field `tokenAudienceClaim` of a valid token, need contains this.
final static String CONF_TOKEN_AUDIENCE = "tokenAudience";

final static String TOKEN = "token";

private Key validationKey;
private String roleClaim;
private SignatureAlgorithm publicKeyAlg;
private String audienceClaim;
private String audience;

@Override
public void close() throws IOException {
Expand All @@ -73,6 +82,13 @@ public void initialize(ServiceConfiguration config) throws IOException, IllegalA
this.publicKeyAlg = getPublicKeyAlgType(config);
this.validationKey = getValidationKey(config);
this.roleClaim = getTokenRoleClaim(config);
this.audienceClaim = getTokenAudienceClaim(config);
this.audience = getTokenAudience(config);

if (audienceClaim != null && audience == null ) {
throw new IllegalArgumentException("Token Audience Claim [" + audienceClaim
+ "] configured, but Audience stands for this broker not.");
}
}

@Override
Expand Down Expand Up @@ -126,9 +142,35 @@ private static String validateToken(final String token) throws AuthenticationExc
@SuppressWarnings("unchecked")
private Jwt<?, Claims> authenticateToken(final String token) throws AuthenticationException {
try {
return Jwts.parser()
Jwt<?, Claims> jwt = Jwts.parser()
.setSigningKey(validationKey)
.parse(token);

if (audienceClaim != null) {
Object object = jwt.getBody().get(audienceClaim);
if (object == null) {
throw new JwtException("Found null Audience in token, for claimed field: " + audienceClaim);
}

if (object instanceof List) {
List<String> audiences = (List<String>) object;
// audience not contains this broker, throw exception.
if (!audiences.stream().anyMatch(audienceInToken -> audienceInToken.equals(audience))) {
throw new AuthenticationException("Audiences in token: [" + String.join(", ", audiences)
+ "] not contains this broker: " + audience);
}
} else if (object instanceof String) {
if (!object.equals(audience)) {
throw new AuthenticationException("Audiences in token: [" + object
+ "] not contains this broker: " + audience);
}
} else {
// should not reach here.
throw new AuthenticationException("Audiences in token is not in expected format: " + object);
}
}

return jwt;
} catch (JwtException e) {
throw new AuthenticationException("Failed to authentication token: " + e.getMessage());
}
Expand Down Expand Up @@ -180,6 +222,26 @@ private SignatureAlgorithm getPublicKeyAlgType(ServiceConfiguration conf) throws
}
}

// get Token Audience Claim from configuration, if not configured return null.
private String getTokenAudienceClaim(ServiceConfiguration conf) throws IllegalArgumentException {
if (conf.getProperty(CONF_TOKEN_AUDIENCE_CLAIM) != null
&& StringUtils.isNotBlank((String) conf.getProperty(CONF_TOKEN_AUDIENCE_CLAIM))) {
return (String) conf.getProperty(CONF_TOKEN_AUDIENCE_CLAIM);
} else {
return null;
}
}

// get Token Audience that stands for this broker from configuration, if not configured return null.
private String getTokenAudience(ServiceConfiguration conf) throws IllegalArgumentException {
if (conf.getProperty(CONF_TOKEN_AUDIENCE) != null
&& StringUtils.isNotBlank((String) conf.getProperty(CONF_TOKEN_AUDIENCE))) {
return (String) conf.getProperty(CONF_TOKEN_AUDIENCE);
} else {
return null;
}
}

private static final class TokenAuthenticationState implements AuthenticationState {
private final AuthenticationProviderToken provider;
private AuthenticationDataSource authenticationDataSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Lists;
import io.jsonwebtoken.Claims;
import io.jsonwebtoken.Jwt;
import io.jsonwebtoken.JwtBuilder;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import io.jsonwebtoken.io.Decoders;
import io.jsonwebtoken.security.Keys;
import java.security.Key;
import java.util.List;
import lombok.Cleanup;

import java.io.File;
Expand Down Expand Up @@ -632,4 +636,138 @@ public void testExpiringToken() throws Exception {
AuthData brokerData = authState.refreshAuthentication();
assertNull(brokerData);
}

// tests for Token Audience
@Test
public void testRightTokenAudienceClaim() throws Exception {
String brokerAudience = "testBroker_" + System.currentTimeMillis();
Properties properties = new Properties();
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE_CLAIM, "aud");
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE, brokerAudience);

testTokenAudienceWithDifferentConfig(properties, brokerAudience);
}

@Test(expectedExceptions = AuthenticationException.class)
public void testWrongTokenAudience() throws Exception {
String brokerAudience = "testBroker_" + System.currentTimeMillis();

Properties properties = new Properties();
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE_CLAIM, "aud");
// set wrong audience in token, should throw exception.
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE, brokerAudience + "-wrong");
testTokenAudienceWithDifferentConfig(properties, brokerAudience);
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testNoBrokerTokenAudience() throws Exception {
String brokerAudience = "testBroker_" + System.currentTimeMillis();

Properties properties = new Properties();
// Not set broker audience, should throw exception.
//properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE, brokerAudience);
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE_CLAIM, "aud");
testTokenAudienceWithDifferentConfig(properties, brokerAudience);
}

@Test
public void testSelfDefineTokenAudienceClaim() throws Exception {
String audienceClaim = "audience_claim_" + System.currentTimeMillis();
String brokerAudience = "testBroker_" + System.currentTimeMillis();

Properties properties = new Properties();
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE, brokerAudience);
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE_CLAIM, audienceClaim);
testTokenAudienceWithDifferentConfig(properties, audienceClaim, Lists.newArrayList(brokerAudience));
}

@Test(expectedExceptions = AuthenticationException.class)
public void testWrongSelfDefineTokenAudienceClaim() throws Exception {
String audienceClaim = "audience_claim_" + System.currentTimeMillis();
String brokerAudience = "testBroker_" + System.currentTimeMillis();

Properties properties = new Properties();
// Set wrong broker audience, should throw exception.
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE, brokerAudience);
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE_CLAIM, audienceClaim);
testTokenAudienceWithDifferentConfig(properties,
audienceClaim + "_wrong",
Lists.newArrayList(brokerAudience));
}

@Test
public void testMultiTokenAudience() throws Exception {
String audienceClaim = "audience_claim_" + System.currentTimeMillis();
String brokerAudience = "testBroker_" + System.currentTimeMillis();

List<String> audiences = Lists.newArrayList("AnotherBrokerAudience", brokerAudience);

Properties properties = new Properties();
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE, brokerAudience);
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE_CLAIM, audienceClaim);
testTokenAudienceWithDifferentConfig(properties, audienceClaim, audiences);
}

@Test(expectedExceptions = AuthenticationException.class)
public void testMultiTokenAudienceNotInclude() throws Exception {
String audienceClaim = "audience_claim_" + System.currentTimeMillis();
String brokerAudience = "testBroker_" + System.currentTimeMillis();

List<String> audiences = Lists.newArrayList("AnotherBrokerAudience", brokerAudience + "_wrong");

Properties properties = new Properties();
// Broker audience not included in token's audiences, should throw exception.
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE, brokerAudience);
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE_CLAIM, audienceClaim);
testTokenAudienceWithDifferentConfig(properties, audienceClaim, audiences);
}

private static String createTokenWithAudience(Key signingKey, String audienceClaim, List<String> audience) {
JwtBuilder builder = Jwts.builder()
.setSubject(SUBJECT)
.signWith(signingKey);

builder.claim(audienceClaim, audience);
return builder.compact();
}

private static void testTokenAudienceWithDifferentConfig(Properties properties,
String brokerAudience) throws Exception {
testTokenAudienceWithDifferentConfig(properties,
"aud",
Lists.newArrayList(brokerAudience));
}

private static void testTokenAudienceWithDifferentConfig(Properties properties,
String audienceClaim, List<String> audiences) throws Exception {
@Cleanup
AuthenticationProviderToken provider = new AuthenticationProviderToken();
SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);

File secretKeyFile = File.createTempFile("pulsar-test-secret-key-valid", ".key");
secretKeyFile.deleteOnExit();
Files.write(Paths.get(secretKeyFile.toString()), secretKey.getEncoded());

properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_SECRET_KEY, secretKeyFile.toString());
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
provider.initialize(conf);

String token = createTokenWithAudience(secretKey, audienceClaim, audiences);

// Pulsar protocol auth
String subject = provider.authenticate(new AuthenticationDataSource() {
@Override
public boolean hasDataFromCommand() {
return true;
}

@Override
public String getCommandData() {
return token;
}
});
assertEquals(subject, SUBJECT);
provider.close();
}
}
2 changes: 2 additions & 0 deletions site2/docs/reference-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|tokenPublicKey| Configure the public key to be used to validate auth tokens. The key can be specified like: `tokenPublicKey=data:base64,xxxxxxxxx` or `tokenPublicKey=file:///my/secret.key`||
|tokenPublicAlg| Configure the algorithm to be used to validate auth tokens. This can be any of the asymettric algorithms supported by Java JWT (https://github.com/jwtk/jjwt#signature-algorithms-keys) |RS256|
|tokenAuthClaim| Specify which of the token's claims will be used as the authentication "principal" or "role". The default "sub" claim will be used if this is left blank ||
|tokenAudienceClaim| The token audience "claim" name, e.g. "aud", that will be used to get the audience from token. If not set, audience will not be verified. ||
|tokenAudience| The token audience stands for this broker. The field `tokenAudienceClaim` of a valid token, need contains this. ||
|maxUnackedMessagesPerConsumer| Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will stop sending messages to consumer once, this limit reaches until consumer starts acknowledging messages back. Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction |50000|
|maxUnackedMessagesPerSubscription| Max number of unacknowledged messages allowed per shared subscription. Broker will stop dispatching messages to all consumers of the subscription once this limit reaches until consumer starts acknowledging messages back and unack count reaches to limit/2. Using a value of 0, is disabling unackedMessage-limit check and dispatcher can dispatch messages without any restriction |200000|
|subscriptionRedeliveryTrackerEnabled| Enable subscription message redelivery tracker |true|
Expand Down

0 comments on commit d6709ae

Please sign in to comment.