diff --git a/conf/broker.conf b/conf/broker.conf
index 73a4402da8b86..9a00ba1137076 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -122,6 +122,8 @@ superUserRoles=
brokerClientAuthenticationPlugin=
brokerClientAuthenticationParameters=
+# Supported Athenz provider domain names(comma separated) for authentication
+athenzDomainNames=
### --- BookKeeper Client --- ###
diff --git a/conf/standalone.conf b/conf/standalone.conf
index c7efc28cbab96..1cb9629c38c7e 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -95,6 +95,8 @@ superUserRoles=
brokerClientAuthenticationPlugin=
brokerClientAuthenticationParameters=
+# Supported Athenz provider domain names(comma separated) for authentication
+athenzDomainNames=
### --- BookKeeper Client --- ###
diff --git a/pom.xml b/pom.xml
index e79b07203f90e..a2ece21af26ae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,6 +81,8 @@ flexible messaging model and an intuitive client API.
pulsar-zookeeper-utils
pulsar-checksum
pulsar-testclient
+ pulsar-broker-auth-athenz
+ pulsar-client-auth-athenz
all
@@ -388,6 +390,19 @@ flexible messaging model and an intuitive client API.
caffeine
2.3.3
+
+
+ com.yahoo.athenz
+ zts_java_client
+ 1.1.3
+
+
+
+ com.yahoo.athenz
+ zpe_java_client
+ 1.1.3
+
+
@@ -710,6 +725,14 @@ flexible messaging model and an intuitive client API.
bookkeeper-yahoo-mvn-repo
https://raw.githubusercontent.com/yahoo/bookkeeper/mvn-repo
+
+
+ false
+
+ bintray-yahoo-maven
+ bintray
+ http://yahoo.bintray.com/maven
+
diff --git a/pulsar-broker-auth-athenz/pom.xml b/pulsar-broker-auth-athenz/pom.xml
new file mode 100644
index 0000000000000..14568ae22f7c0
--- /dev/null
+++ b/pulsar-broker-auth-athenz/pom.xml
@@ -0,0 +1,47 @@
+
+
+
+ 4.0.0
+
+ com.yahoo.pulsar
+ pulsar
+ 1.17-SNAPSHOT
+
+
+ pulsar-broker-auth-athenz
+ jar
+ Athenz authentication plugin for broker
+
+
+
+
+ ${project.groupId}
+ pulsar-broker
+ ${project.version}
+
+
+
+ com.yahoo.athenz
+ zpe_java_client
+
+
+
+
diff --git a/pulsar-broker-auth-athenz/src/main/java/com/yahoo/pulsar/broker/authentication/AuthenticationProviderAthenz.java b/pulsar-broker-auth-athenz/src/main/java/com/yahoo/pulsar/broker/authentication/AuthenticationProviderAthenz.java
new file mode 100644
index 0000000000000..1663e908aa1a9
--- /dev/null
+++ b/pulsar-broker-auth-athenz/src/main/java/com/yahoo/pulsar/broker/authentication/AuthenticationProviderAthenz.java
@@ -0,0 +1,116 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.pulsar.broker.authentication;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.List;
+import java.security.PublicKey;
+
+import javax.naming.AuthenticationException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.yahoo.athenz.auth.token.RoleToken;
+import com.yahoo.athenz.zpe.AuthZpeClient;
+import com.yahoo.pulsar.broker.ServiceConfiguration;
+import com.yahoo.pulsar.broker.authentication.AuthenticationDataSource;
+import com.yahoo.pulsar.broker.authentication.AuthenticationProvider;
+
+public class AuthenticationProviderAthenz implements AuthenticationProvider {
+
+ private static final String DOMAIN_NAME_LIST = "athenzDomainNames";
+
+ private List domainNameList = null;
+
+ @Override
+ public void initialize(ServiceConfiguration config) throws IOException {
+ if (config.getProperty(DOMAIN_NAME_LIST) == null) {
+ throw new IOException("No athenz domain name specified");
+ }
+ String domainNames = (String) config.getProperty(DOMAIN_NAME_LIST);
+ domainNameList = Lists.newArrayList(domainNames.split(","));
+ log.info("Supported domain names for athenz: {}", domainNameList);
+ }
+
+ @Override
+ public String getAuthMethodName() {
+ return "athenz";
+ }
+
+ @Override
+ public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
+ SocketAddress clientAddress;
+ String roleToken;
+
+ if (authData.hasDataFromPeer()) {
+ clientAddress = authData.getPeerAddress();
+ } else {
+ throw new AuthenticationException("Authentication data source does not have a client address");
+ }
+
+ if (authData.hasDataFromCommand()) {
+ roleToken = authData.getCommandData();
+ } else if (authData.hasDataFromHttp()) {
+ roleToken = authData.getHttpHeader(AuthZpeClient.ZPE_TOKEN_HDR);
+ } else {
+ throw new AuthenticationException("Authentication data source does not have a role token");
+ }
+
+ if (roleToken == null) {
+ throw new AuthenticationException("Athenz token is null, can't authenticate");
+ }
+ if (roleToken.isEmpty()) {
+ throw new AuthenticationException("Athenz RoleToken is empty, Server is Using Athenz Authentication");
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Athenz RoleToken : [{}] received from Client: {}", roleToken, clientAddress);
+ }
+
+ RoleToken token = new RoleToken(roleToken);
+
+ if (!domainNameList.contains(token.getDomain())) {
+ throw new AuthenticationException(
+ String.format("Athenz RoleToken Domain mismatch, Expected: %s, Found: %s", domainNameList.toString(), token.getDomain()));
+ }
+
+ // Synchronize for non-thread safe static calls inside athenz library
+ synchronized (this) {
+ PublicKey ztsPublicKey = AuthZpeClient.getZtsPublicKey(token.getKeyId());
+ int allowedOffset = 0;
+
+ if (ztsPublicKey == null) {
+ throw new AuthenticationException("Unable to retrieve ZTS Public Key");
+ }
+
+ if (token.validate(ztsPublicKey, allowedOffset, null)) {
+ log.info("Athenz Role Token : {}, Authorized for Client: {}", roleToken, clientAddress);
+ return token.getPrincipal();
+ } else {
+ throw new AuthenticationException(
+ String.format("Athenz Role Token Not Authorized from Client: %s", clientAddress));
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(AuthenticationProviderAthenz.class);
+}
diff --git a/pulsar-client-auth-athenz/pom.xml b/pulsar-client-auth-athenz/pom.xml
new file mode 100644
index 0000000000000..807c75f051e5f
--- /dev/null
+++ b/pulsar-client-auth-athenz/pom.xml
@@ -0,0 +1,47 @@
+
+
+ 4.0.0
+
+
+ com.yahoo.pulsar
+ pulsar
+ 1.17-SNAPSHOT
+ ..
+
+
+ pulsar-client-auth-athenz
+ jar
+ Athenz authentication plugin for java client
+
+
+
+
+ ${project.groupId}
+ pulsar-client
+ ${project.parent.version}
+
+
+
+ com.yahoo.athenz
+ zts_java_client
+
+
+
+
diff --git a/pulsar-client-auth-athenz/src/main/java/com/yahoo/pulsar/client/impl/auth/AuthenticationAthenz.java b/pulsar-client-auth-athenz/src/main/java/com/yahoo/pulsar/client/impl/auth/AuthenticationAthenz.java
new file mode 100644
index 0000000000000..b93508a741380
--- /dev/null
+++ b/pulsar-client-auth-athenz/src/main/java/com/yahoo/pulsar/client/impl/auth/AuthenticationAthenz.java
@@ -0,0 +1,107 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.pulsar.client.impl.auth;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.security.PrivateKey;
+
+import com.yahoo.athenz.zts.RoleToken;
+import com.yahoo.athenz.zts.ZTSClient;
+import com.yahoo.athenz.auth.ServiceIdentityProvider;
+import com.yahoo.athenz.auth.impl.SimpleServiceIdentityProvider;
+import com.yahoo.athenz.auth.util.Crypto;
+import com.yahoo.pulsar.client.api.Authentication;
+import com.yahoo.pulsar.client.api.AuthenticationDataProvider;
+import com.yahoo.pulsar.client.api.PulsarClientException;
+import com.yahoo.pulsar.client.api.PulsarClientException.GettingAuthenticationDataException;
+
+public class AuthenticationAthenz implements Authentication {
+
+ private transient ZTSClient ztsClient = null;
+ private String tenantDomain;
+ private String tenantService;
+ private String providerDomain;
+ private String privateKeyPath;
+ private String keyId = "0";
+ private long cachedRoleTokenTimestamp;
+ private String roleToken;
+ private final int minValidity = 2 * 60 * 60; // athenz will only give this token if it's at least valid for 2hrs
+ private final int maxValidity = 24 * 60 * 60; // token has upto 24 hours validity
+ private final int cacheDurationInHour = 1; // we will cache role token for an hour then ask athenz lib again
+
+ public AuthenticationAthenz() {
+ }
+
+ @Override
+ public String getAuthMethodName() {
+ return "athenz";
+ }
+
+ @Override
+ synchronized public AuthenticationDataProvider getAuthData() throws PulsarClientException {
+ if (cachedRoleTokenIsValid()) {
+ return new AuthenticationDataAthenz(roleToken, getZtsClient().getHeader());
+ }
+ try {
+ // the following would set up the API call that requests tokens from the server
+ // that can only be used if they are 10 minutes from expiration and last twenty four hours
+ RoleToken token = getZtsClient().getRoleToken(providerDomain, null, minValidity, maxValidity, false);
+ roleToken = token.getToken();
+ cachedRoleTokenTimestamp = System.nanoTime();
+ return new AuthenticationDataAthenz(roleToken, getZtsClient().getHeader());
+ } catch (Throwable t) {
+ throw new GettingAuthenticationDataException(t);
+ }
+ }
+
+ private boolean cachedRoleTokenIsValid() {
+ if (roleToken == null) {
+ return false;
+ }
+ // Ensure we refresh the Athenz role token every hour to avoid using an expired role token
+ return (System.nanoTime() - cachedRoleTokenTimestamp) < TimeUnit.HOURS.toNanos(cacheDurationInHour);
+ }
+
+ @Override
+ public void configure(Map authParams) {
+ this.tenantDomain = authParams.get("tenant_domain");
+ this.tenantService = authParams.get("tenant_service");
+ this.providerDomain = authParams.get("provider_domain");
+ this.privateKeyPath = authParams.get("private_key_path");
+ this.keyId = authParams.getOrDefault("key_id", "0");
+ }
+
+ @Override
+ public void start() throws PulsarClientException {
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ ZTSClient getZtsClient() {
+ if (ztsClient == null) {
+ PrivateKey privateKey = Crypto.loadPrivateKey(new File(privateKeyPath));
+ ServiceIdentityProvider siaProvider = new SimpleServiceIdentityProvider(tenantDomain, tenantService,
+ privateKey, keyId);
+ ztsClient = new ZTSClient(null, tenantDomain, tenantService, siaProvider);
+ }
+ return ztsClient;
+ }
+}
diff --git a/pulsar-client-auth-athenz/src/main/java/com/yahoo/pulsar/client/impl/auth/AuthenticationDataAthenz.java b/pulsar-client-auth-athenz/src/main/java/com/yahoo/pulsar/client/impl/auth/AuthenticationDataAthenz.java
new file mode 100644
index 0000000000000..6bdba9a804147
--- /dev/null
+++ b/pulsar-client-auth-athenz/src/main/java/com/yahoo/pulsar/client/impl/auth/AuthenticationDataAthenz.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.pulsar.client.impl.auth;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import com.yahoo.pulsar.client.api.AuthenticationDataProvider;
+
+public class AuthenticationDataAthenz implements AuthenticationDataProvider {
+ protected String roleToken;
+ protected String httpHeaderName;
+
+ /**
+ * @param roleToken
+ */
+ public AuthenticationDataAthenz(String roleToken, String httpHeaderName) {
+ this.roleToken = roleToken;
+ this.httpHeaderName = httpHeaderName;
+ }
+
+ @Override
+ public boolean hasDataForHttp() {
+ return true;
+ }
+
+ @Override
+ public Set> getHttpHeaders() {
+ Map headers = new HashMap<>();
+ headers.put(httpHeaderName, roleToken);
+ return headers.entrySet();
+ }
+
+ @Override
+ public boolean hasDataFromCommand() {
+ return true;
+ }
+
+ @Override
+ public String getCommandData() {
+ return roleToken;
+ }
+
+}