Skip to content

Commit

Permalink
[client authentication] add authentication client with oauth2 support (
Browse files Browse the repository at this point in the history
…apache#7420)

### Motivation

Pulsar supports authenticating clients using OAuth 2.0 access tokens. You can use tokens to identify a Pulsar client and associate with some "principal" (or "role") that is permitted to do some actions (eg: publish to a topic or consume from a topic). 

This module is to support Pulsar Client Authentication Plugin for OAuth 2.0 directly. Client side communicate with Oauth 2.0 server,  then the client will get an `access token` from Oauth 2.0 server, and will pass this `access token` to Pulsar broker to do the authentication.

So the Broker side could still use `org.apache.pulsar.broker.authentication.AuthenticationProviderToken`,
also user can add their own `AuthenticationProvider` to work with this module.

### Modifications

- add related code;
- add related test;
- add related doc.

The init of this client authentication module would be like:
```java
Authentication oauth2Authentication = AuthenticationFactoryOAuth2.clientCredentials(
                new URL("https://dev-kt-aa9ne.us.auth0.com/oauth/token"),
                new URL("file:///path/to/credential/file.json"),  // key file path
                "https://dev-kt-aa9ne.us.auth0.com/api/v2/"
        );
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://broker.example.com:6650/")
    .authentication(oauth2Authentication)
    .build();
```

### Verifying this change

tests passed.
  • Loading branch information
jiazhai authored Jul 2, 2020
1 parent 1e152bd commit 768813e
Show file tree
Hide file tree
Showing 24 changed files with 1,753 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import static org.mockito.Mockito.spy;

import com.google.common.collect.Sets;
import java.net.URI;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/**
* Test Token authentication with:
* client: org.apache.pulsar.client.impl.auth.AuthenticationToken
* broker: org.apache.pulsar.broker.authentication.AuthenticationProviderToken
*/
public class TokenAuthenticatedProducerConsumerTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(TokenAuthenticatedProducerConsumerTest.class);

// pre-create a public/private_key pair. Public key used for broker to verify client passed in token
private final String TOKEN_PUBLIC_KEY = "data:;base64,MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAhHKgdY6arG7eE75bUPtznN5WjMu0sxLq7pI5Aaiw2Ijerbz33iO/Fdd2yJVuAZNDZPD/AVSaeliEh/BP+s2rN8KNuiywD+SlL1NGf2JDS5BvGT4Q8eHfDDRd/iY5zkK58wYwlke6C8fKCx10MTH9iYAJpzaaxs+Tu1RaatK+691aYSiMkYIfgbqAKmSCpK+48al/PkmENfuhzaTBPhCnEblhNvUhS5MjzBcAcGzecpEuVSxUzDtm8rU8DEQR6kkdXS1QnGHVNis/vgk8QzctkJKbtgDIaGzNUmDvTCyPZ8WLWSWJWb1oPxRZwpfXVP69ijU0Rme4/YkuHt6IEw6ANQIDAQAB";
// admin token created based on private_key.
private final String ADMIN_TOKEN = "eyJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImV4cCI6MTYyNTEzNjQyMn0.DAfbUPZwQURgGvor4scO0NoqoyHkCulKZkhP7kksCWFvgx6B22iKuXGX42EFlFSRMWYYgIJXV7UZATCLCjJpn_ijrO6AWBmooib3f94OPoLDdkF3qXnqaLnvJtl8_sCoLCSghR_O3hQFgQW2GRjMDdfJgl2_HXCWuzedtI5cQJdbpfU0NU10nzo7RtrpCmUdgQYQEHegYOawLqQVvr53ZGjrZilBXY9HHz1mSlnwZGNGVNNdvRthBuGtXtfKgtfSDF5jLqABvK8TUpdNJybibeiOspdzuY19-wVt4eVXzNAGsP4V4Zs91MgIUYV5lWKnBUuVWalppkMWhRF4Jf-KWQ";

@BeforeMethod
@Override
protected void setup() throws Exception {
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);

Set<String> superUserRoles = new HashSet<>();
superUserRoles.add("admin");
conf.setSuperUserRoles(superUserRoles);

Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderToken.class.getName());
conf.setAuthenticationProviders(providers);

conf.setClusterName("test");

// Set provider domain name
Properties properties = new Properties();
properties.setProperty("tokenPublicKey", TOKEN_PUBLIC_KEY);

conf.setProperties(properties);
super.init();
}

// setup both admin and pulsar client
protected final void clientSetup() throws Exception {
admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
.authentication(AuthenticationFactory.token(ADMIN_TOKEN))
.build());

pulsarClient = PulsarClient.builder().serviceUrl(new URI(pulsar.getBrokerServiceUrl()).toString())
.statsInterval(0, TimeUnit.SECONDS)
.authentication(AuthenticationFactory.token(ADMIN_TOKEN))
.build();
}

@AfterMethod
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@DataProvider(name = "batch")
public Object[][] codecProvider() {
return new Object[][] { { 0 }, { 1000 } };
}

public void testSyncProducerAndConsumer() throws Exception {
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic")
.subscriptionName("my-subscriber-name").subscribe();

ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic");

Producer<byte[]> producer = producerBuilder.create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}

Message<byte[]> msg = null;
Set<String> messageSet = Sets.newHashSet();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
consumer.close();
}

@Test
public void testTokenProducerAndConsumer() throws Exception {
log.info("-- Starting {} test --", methodName);
clientSetup();

// test rest by admin
admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
admin.tenants().createTenant("my-property",
new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));

// test protocol by producer/consumer
testSyncProducerAndConsumer();

log.info("-- Exiting {} test --", methodName);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import static org.mockito.Mockito.spy;

import com.google.common.collect.Sets;
import java.net.URI;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/**
* Test Token authentication with:
* client: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
* broker: org.apache.pulsar.broker.authentication.AuthenticationProviderToken
*/
public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(TokenOauth2AuthenticatedProducerConsumerTest.class);

// public key in oauth2 server to verify the client passed in token. get from https://jwt.io/
private final String TOKEN_TEST_PUBLIC_KEY = "data:;base64,MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA2tZd/4gJda3U2Pc3tpgRAN7JPGWx/Gn17v/0IiZlNNRbP/Mmf0Vc6G1qsnaRaWNWOR+t6/a6ekFHJMikQ1N2X6yfz4UjMc8/G2FDPRmWjA+GURzARjVhxc/BBEYGoD0Kwvbq/u9CZm2QjlKrYaLfg3AeB09j0btNrDJ8rBsNzU6AuzChRvXj9IdcE/A/4N/UQ+S9cJ4UXP6NJbToLwajQ5km+CnxdGE6nfB7LWHvOFHjn9C2Rb9e37CFlmeKmIVFkagFM0gbmGOb6bnGI8Bp/VNGV0APef4YaBvBTqwoZ1Z4aDHy5eRxXfAMdtBkBupmBXqL6bpd15XRYUbu/7ck9QIDAQAB";

private final String ADMIN_ROLE = "Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x@clients";

// Credentials File, which contains "client_id" and "client_secret"
private final String CREDENTIALS_FILE = "./src/test/resources/authentication/token/credentials_file.json";

@BeforeMethod
@Override
protected void setup() throws Exception {
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);

Set<String> superUserRoles = new HashSet<>();
superUserRoles.add(ADMIN_ROLE);
conf.setSuperUserRoles(superUserRoles);

Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderToken.class.getName());
conf.setAuthenticationProviders(providers);

conf.setClusterName("test");

// Set provider domain name
Properties properties = new Properties();
properties.setProperty("tokenPublicKey", TOKEN_TEST_PUBLIC_KEY);

conf.setProperties(properties);
super.init();
}

// setup both admin and pulsar client
protected final void clientSetup() throws Exception {
Path path = Paths.get(CREDENTIALS_FILE).toAbsolutePath();
log.info("Credentials File path: {}", path.toString());

// AuthenticationOAuth2
Authentication authentication = AuthenticationFactoryOAuth2.clientCredentials(
new URL("https://dev-kt-aa9ne.us.auth0.com/oauth/token"),
new URL("file://" + path.toString()), // key file path
"https://dev-kt-aa9ne.us.auth0.com/api/v2/"
);

admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
.authentication(authentication)
.build());

pulsarClient = PulsarClient.builder().serviceUrl(new URI(pulsar.getBrokerServiceUrl()).toString())
.statsInterval(0, TimeUnit.SECONDS)
.authentication(authentication)
.build();
}

@AfterMethod
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@DataProvider(name = "batch")
public Object[][] codecProvider() {
return new Object[][] { { 0 }, { 1000 } };
}

public void testSyncProducerAndConsumer() throws Exception {
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic")
.subscriptionName("my-subscriber-name").subscribe();

ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic");

Producer<byte[]> producer = producerBuilder.create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}

Message<byte[]> msg = null;
Set<String> messageSet = Sets.newHashSet();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
consumer.close();
}

@Test
public void testTokenProducerAndConsumer() throws Exception {
log.info("-- Starting {} test --", methodName);
clientSetup();

// test rest by admin
admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
admin.tenants().createTenant("my-property",
new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));

// test protocol by producer/consumer
testSyncProducerAndConsumer();

log.info("-- Exiting {} test --", methodName);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"client_id":"Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x",
"client_secret":"rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ-g07ZH52N_poGAb"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.auth.oauth2;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.pulsar.client.api.AuthenticationDataProvider;

/**
* Provide OAuth 2.0 authentication data.
*/
class AuthenticationDataOAuth2 implements AuthenticationDataProvider {
public static final String HTTP_HEADER_NAME = "Authorization";

private final String accessToken;
private final Set<Map.Entry<String, String>> headers;

public AuthenticationDataOAuth2(String accessToken) {
this.accessToken = accessToken;
this.headers = Collections.singletonMap(HTTP_HEADER_NAME, "Bearer " + accessToken).entrySet();
}

@Override
public boolean hasDataForHttp() {
return true;
}

@Override
public Set<Map.Entry<String, String>> getHttpHeaders() {
return this.headers;
}

@Override
public boolean hasDataFromCommand() {
return true;
}

@Override
public String getCommandData() {
return this.accessToken;
}

}
Loading

0 comments on commit 768813e

Please sign in to comment.