Skip to content

Commit

Permalink
Added encryption key metadata to send additional information about th… (
Browse files Browse the repository at this point in the history
apache#798)

* Added encryption key metadata to send additional information about the key

* Use KeyValue type for encryption key metadata
  • Loading branch information
saandrews authored Sep 28, 2017
1 parent 350e67a commit 6010db0
Show file tree
Hide file tree
Showing 11 changed files with 435 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -2170,12 +2171,14 @@ public void testECDSAEncryption() throws Exception {

class EncKeyReader implements CryptoKeyReader {

EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
@Override
public byte[] getPublicKey(String keyName) {
public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
return Files.readAllBytes(Paths.get(CERT_FILE_PATH));
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
return keyInfo;
} catch (IOException e) {
Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
}
Expand All @@ -2186,11 +2189,12 @@ public byte[] getPublicKey(String keyName) {
}

@Override
public byte[] getPrivateKey(String keyName) {
public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
return Files.readAllBytes(Paths.get(CERT_FILE_PATH));
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
return keyInfo;
} catch (IOException e) {
Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
}
Expand Down Expand Up @@ -2241,12 +2245,14 @@ public void testRSAEncryption() throws Exception {

class EncKeyReader implements CryptoKeyReader {

EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
@Override
public byte[] getPublicKey(String keyName) {
public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
return Files.readAllBytes(Paths.get(CERT_FILE_PATH));
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
return keyInfo;
} catch (IOException e) {
Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
}
Expand All @@ -2257,11 +2263,12 @@ public byte[] getPublicKey(String keyName) {
}

@Override
public byte[] getPrivateKey(String keyName) {
public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
return Files.readAllBytes(Paths.get(CERT_FILE_PATH));
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
return keyInfo;
} catch (IOException e) {
Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
}
Expand Down Expand Up @@ -2317,12 +2324,14 @@ public void testEncryptionFailure() throws Exception {

class EncKeyReader implements CryptoKeyReader {

EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
@Override
public byte[] getPublicKey(String keyName) {
public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
return Files.readAllBytes(Paths.get(CERT_FILE_PATH));
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
return keyInfo;
} catch (IOException e) {
log.error("Failed to read certificate from {}", CERT_FILE_PATH);
}
Expand All @@ -2331,11 +2340,12 @@ public byte[] getPublicKey(String keyName) {
}

@Override
public byte[] getPrivateKey(String keyName) {
public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
return Files.readAllBytes(Paths.get(CERT_FILE_PATH));
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
return keyInfo;
} catch (IOException e) {
log.error("Failed to read certificate from {}", CERT_FILE_PATH);
}
Expand Down Expand Up @@ -2379,7 +2389,7 @@ public byte[] getPrivateKey(String keyName) {

// 3. KeyReder is not set by consumer
// Receive should fail since key reader is not setup
msg = consumer.receive(1, TimeUnit.SECONDS);
msg = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNull(msg, "Receive should have failed with no keyreader");

// 4. Set consumer config to consume even if decryption fails
Expand All @@ -2391,7 +2401,7 @@ public byte[] getPrivateKey(String keyName) {
int msgNum = 0;
try {
// Receive should proceed and deliver encrypted message
msg = consumer.receive(1, TimeUnit.SECONDS);
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
String expectedMessage = "my-message-" + msgNum++;
Assert.assertNotEquals(receivedMessage, expectedMessage,
Expand All @@ -2410,7 +2420,7 @@ public byte[] getPrivateKey(String keyName) {
conf);

for (int i = msgNum; i < totalMsg-1; i++) {
msg = consumer.receive(1, TimeUnit.SECONDS);
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
Expand All @@ -2428,7 +2438,7 @@ public byte[] getPrivateKey(String keyName) {
consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/myenc-topic1", "my-subscriber-name", conf2);

// Receive should proceed and discard encrypted messages
msg = consumer.receive(1, TimeUnit.SECONDS);
msg = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNull(msg, "Message received even aftet ConsumerCryptoFailureAction.DISCARD is set.");

log.info("-- Exiting {} test --", methodName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,34 @@
*/
package org.apache.pulsar.client.api;

import java.util.List;
import java.util.Map;

public interface CryptoKeyReader {

/*
* Return the encryption key corresponding to the key name in the argument
* <p>
* This method should be implemented to return the key in byte array. This method will be
* This method should be implemented to return the EncryptionKeyInfo. This method will be
* called at the time of producer creation as well as consumer receiving messages.
* Hence, application should not make any blocking calls within the implementation.
* <p>
*
* @param keyName
* Unique name to identify the key
* @return byte array of the public key value
* @param metadata
* Additional information needed to identify the key
* @return EncryptionKeyInfo with details about the public key
* */
byte [] getPublicKey(String keyName);
EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> metadata);

/*
* @param keyName
* Unique name to identify the key
* @param metadata
* Additional information needed to identify the key
* @return byte array of the private key value
*/
byte [] getPrivateKey(String keyName);
EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import java.util.Map;

public class EncryptionKeyInfo {

/*
* This object contains the encryption key and corresponding metadata which contains
* additional information about the key such as version, timestammp
*/
private Map<String,String> metadata = null;
private byte[] key = null;

public EncryptionKeyInfo() {
this.key = null;
this.metadata = null;
}

public EncryptionKeyInfo(byte[] key, Map<String, String> metadata) {
this.key = key;
this.metadata = metadata;
}

public byte[] getKey() {
return key;
}

public void setKey(byte[] key) {
this.key = key;
}

public Map<String, String> getMetadata() {
return metadata;
}

public void setMetadata(Map<String, String> metadata) {
this.metadata = metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,12 @@ public void addEncryptionKey(String key) {
this.encryptionKeys.add(key);
}

public void removeEncryptionKey(String key) {
if (this.encryptionKeys != null) {
this.encryptionKeys.remove(key);
}
}

/**
* Sets the ProducerCryptoFailureAction to the value specified
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ static enum SubscriptionMode {

// Create msgCrypto if not created already
if (conf.getCryptoKeyReader() != null) {
this.msgCrypto = new MessageCrypto(false);
String logCtx = "[" + topic + "] [" + subscription + "]";
this.msgCrypto = new MessageCrypto(logCtx , false);
}

grabCnx();
Expand Down
Loading

0 comments on commit 6010db0

Please sign in to comment.