Skip to content

Commit

Permalink
[cli] pulsar-client-tools supports end-to-end encryption (apache#9615)
Browse files Browse the repository at this point in the history
* pulsar-client-tools supports end-to-end encryption

* Address comments
  • Loading branch information
Masahiro Sakamoto authored Feb 18, 2021
1 parent 0f9e211 commit f3dc022
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 0 deletions.
5 changes: 5 additions & 0 deletions pulsar-client-tools-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-broker</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -209,4 +211,55 @@ public void testDurableSubscribe() throws Exception {
Assert.assertNotNull(subscriptions);
Assert.assertEquals(subscriptions.size(), 1);
}

@Test(timeOut = 20000)
public void testEncryption() throws Exception {
Properties properties = new Properties();
properties.setProperty("serviceUrl", brokerUrl.toString());
properties.setProperty("useTls", "false");

final String topicName = "persistent://prop/ns-abc/test/topic-" + UUID.randomUUID().toString();
final String keyUriBase = "file:../pulsar-broker/src/test/resources/certificate/";
final int numberOfMessages = 10;

ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> future = new CompletableFuture<>();
executor.execute(() -> {
try {
PulsarClientTool pulsarClientToolConsumer = new PulsarClientTool(properties);
String[] args = {"consume", "-s", "sub-name", "-n", Integer.toString(numberOfMessages), "-ekv",
keyUriBase + "private-key.client-rsa.pem", topicName};
Assert.assertEquals(pulsarClientToolConsumer.run(args), 0);
future.complete(null);
} catch (Throwable t) {
future.completeExceptionally(t);
}
});

// Make sure subscription has been created
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> {
boolean isCreated = false;
try {
List<String> subscriptions = admin.topics().getSubscriptions(topicName);
isCreated = (subscriptions.size() == 1);
} catch (Exception e) {
}
return isCreated;
});

PulsarClientTool pulsarClientToolProducer = new PulsarClientTool(properties);
String[] args = {"produce", "-m", "Have a nice day", "-n", Integer.toString(numberOfMessages), "-ekn",
"my-app-key", "-ekv", keyUriBase + "public-key.client-rsa.pem", topicName};
Assert.assertEquals(pulsarClientToolProducer.run(args), 0);

try {
future.get(10, TimeUnit.SECONDS);
Assert.assertFalse(future.isCompletedExceptionally());
} catch (Exception e) {
Assert.fail("consumer was unable to decrypt messages", e);
} finally {
executor.shutdown();
}
}

}
5 changes: 5 additions & 0 deletions pulsar-client-tools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@
<artifactId>pulsar-client-original</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-messagecrypto-bc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.asynchttpclient</groupId>
<artifactId>async-http-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.client.cli;

import static org.apache.commons.lang3.StringUtils.isNotBlank;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
Expand Down Expand Up @@ -115,6 +117,11 @@ public class CmdConsume {
@Parameter(names = { "-ac",
"--auto_ack_chunk_q_full" }, description = "Auto ack for oldest message on queue is full")
private boolean autoAckOldestChunkedMessageOnQueueFull = false;

@Parameter(names = { "-ekv",
"--encryption-key-value" }, description = "The URI of private key to decrypt payload, for example "
+ "file:///path/to/private.key or data:application/x-pem-file;base64,*****")
private String encKeyValue;

private ClientBuilder clientBuilder;
private Authentication authentication;
Expand Down Expand Up @@ -219,6 +226,10 @@ private int consume(String topic) {

builder.autoAckOldestChunkedMessageOnQueueFull(this.autoAckOldestChunkedMessageOnQueueFull);

if (isNotBlank(this.encKeyValue)) {
builder.defaultCryptoKeyReader(this.encKeyValue);
}

Consumer<byte[]> consumer = builder.subscribe();

RateLimiter limiter = (this.consumeRate > 0) ? RateLimiter.create(this.consumeRate) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.client.cli;

import static org.apache.commons.lang3.StringUtils.isNotBlank;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
Expand Down Expand Up @@ -111,6 +113,13 @@ public class CmdProduce {
@Parameter(names = { "-k", "--key"}, description = "message key to add ")
private String key;

@Parameter(names = { "-ekn", "--encryption-key-name" }, description = "The public key name to encrypt payload")
private String encKeyName = null;

@Parameter(names = { "-ekv",
"--encryption-key-value" }, description = "The URI of public key to encrypt payload, for example "
+ "file:///path/to/public.key or data:application/x-pem-file;base64,*****")
private String encKeyValue = null;

private ClientBuilder clientBuilder;
private Authentication authentication;
Expand Down Expand Up @@ -207,6 +216,10 @@ private int publish(String topic) {
producerBuilder.enableChunking(true);
producerBuilder.enableBatching(false);
}
if (isNotBlank(this.encKeyName) && isNotBlank(this.encKeyValue)) {
producerBuilder.addEncryptionKey(this.encKeyName);
producerBuilder.defaultCryptoKeyReader(this.encKeyValue);
}
Producer<byte[]> producer = producerBuilder.create();

List<byte[]> messageBodies = generateMessageBodies(this.messages, this.messageFileNames);
Expand Down

0 comments on commit f3dc022

Please sign in to comment.