Skip to content

Commit

Permalink
[feature][cpp-client]Expose cpp end to end encryption interface (apac…
Browse files Browse the repository at this point in the history
…he#9074)

### Motivation

Currently some users want to use end-to-end encryption on other clients, such as python or node clients, and this pr is used to expose the end-to-end encryption interface.

### Modifications

* Add a default class `DefaultCryptoKeyReader` to implement reading public and private keys
* The client calls the `pulsar_consumer_configuration_set_default_crypto_key_reader` function to specify the path of the public and private keys to be passed to the cpp client
* Add `DefaultCryptoKeyReader` class to the test

### Verifying this change

* Update test

The end-to-end tests already exist in the cpp client, so let's go ahead and use this example https://github.com/apache/pulsar/blob/041424cf06f16bedf4ef5787c9b96b7c5daf5fce/pulsar-client-cpp/tests/BasicEndToEndTest.cc#L1320 to test our code
  • Loading branch information
tuteng authored Dec 31, 2020
1 parent e75de48 commit 956328d
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 39 deletions.
19 changes: 18 additions & 1 deletion pulsar-client-cpp/include/pulsar/CryptoKeyReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ namespace pulsar {

class PULSAR_PUBLIC CryptoKeyReader {
public:
virtual ~CryptoKeyReader() {}
CryptoKeyReader();
virtual ~CryptoKeyReader();

/*
* Return the encryption key corresponding to the key name in the argument
Expand Down Expand Up @@ -61,6 +62,22 @@ class PULSAR_PUBLIC CryptoKeyReader {

}; /* namespace pulsar */

class PULSAR_PUBLIC DefaultCryptoKeyReader : public CryptoKeyReader {
private:
std::string publicKeyPath_;
std::string privateKeyPath_;
void readFile(std::string fileName, std::string& fileContents) const;

public:
DefaultCryptoKeyReader(const std::string& publicKeyPath, const std::string& privateKeyPath);
~DefaultCryptoKeyReader();

Result getPublicKey(const std::string& keyName, std::map<std::string, std::string>& metadata,
EncryptionKeyInfo& encKeyInfo) const;
Result getPrivateKey(const std::string& keyName, std::map<std::string, std::string>& metadata,
EncryptionKeyInfo& encKeyInfo) const;
}; /* namespace pulsar */

typedef std::shared_ptr<CryptoKeyReader> CryptoKeyReaderPtr;
} // namespace pulsar

Expand Down
23 changes: 23 additions & 0 deletions pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,18 @@ typedef enum {
initial_position_earliest
} initial_position;

typedef enum {
// This is the default option to fail consume until crypto succeeds
pulsar_ConsumerFail,
// Message is silently acknowledged and not delivered to the application
pulsar_ConsumerDiscard,
// Deliver the encrypted message to the application. It's the application's
// responsibility to decrypt the message. If message is also compressed,
// decompression will fail. If message contain batch messages, client will
// not be able to retrieve individual messages in the batch
pulsar_ConsumerConsume
} pulsar_consumer_crypto_failure_action;

/// Callback definition for MessageListener
typedef void (*pulsar_message_listener)(pulsar_consumer_t *consumer, pulsar_message_t *msg, void *ctx);

Expand Down Expand Up @@ -238,6 +250,17 @@ PULSAR_PUBLIC long pulsar_configure_get_ack_grouping_max_size(
PULSAR_PUBLIC int pulsar_consumer_is_encryption_enabled(
pulsar_consumer_configuration_t *consumer_configuration);

PULSAR_PUBLIC void pulsar_consumer_configuration_set_default_crypto_key_reader(
pulsar_consumer_configuration_t *consumer_configuration, const char *public_key_path,
const char *private_key_path);

PULSAR_PUBLIC pulsar_consumer_crypto_failure_action pulsar_consumer_configuration_get_crypto_failure_action(
pulsar_consumer_configuration_t *consumer_configuration);

PULSAR_PUBLIC void pulsar_consumer_configuration_set_crypto_failure_action(
pulsar_consumer_configuration_t *consumer_configuration,
pulsar_consumer_crypto_failure_action cryptoFailureAction);

PULSAR_PUBLIC int pulsar_consumer_is_read_compacted(pulsar_consumer_configuration_t *consumer_configuration);

PULSAR_PUBLIC void pulsar_consumer_set_read_compacted(pulsar_consumer_configuration_t *consumer_configuration,
Expand Down
23 changes: 23 additions & 0 deletions pulsar-client-cpp/include/pulsar/c/producer_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,17 @@ typedef enum {
pulsar_AutoPublish = -4,
} pulsar_schema_type;

typedef enum {
// This is the default option to fail send if crypto operation fails
pulsar_ProducerFail,
// Ignore crypto failure and proceed with sending unencrypted messages
pulsar_ProducerSend
} pulsar_producer_crypto_failure_action;

typedef struct _pulsar_producer_configuration pulsar_producer_configuration_t;

typedef struct _pulsar_crypto_key_reader pulsar_crypto_key_reader;

PULSAR_PUBLIC pulsar_producer_configuration_t *pulsar_producer_configuration_create();

PULSAR_PUBLIC void pulsar_producer_configuration_free(pulsar_producer_configuration_t *conf);
Expand Down Expand Up @@ -168,6 +177,20 @@ PULSAR_PUBLIC unsigned long pulsar_producer_configuration_get_batching_max_publi
PULSAR_PUBLIC void pulsar_producer_configuration_set_property(pulsar_producer_configuration_t *conf,
const char *name, const char *value);

PULSAR_PUBLIC int pulsar_producer_is_encryption_enabled(pulsar_producer_configuration_t *conf);

PULSAR_PUBLIC void pulsar_producer_configuration_set_default_crypto_key_reader(
pulsar_producer_configuration_t *conf, const char *public_key_path, const char *private_key_path);

PULSAR_PUBLIC pulsar_producer_crypto_failure_action
pulsar_producer_configuration_get_crypto_failure_action(pulsar_producer_configuration_t *conf);

PULSAR_PUBLIC void pulsar_producer_configuration_set_crypto_failure_action(
pulsar_producer_configuration_t *conf, pulsar_producer_crypto_failure_action cryptoFailureAction);

PULSAR_PUBLIC void pulsar_producer_configuration_set_encryption_key(pulsar_producer_configuration_t *conf,
const char *key);

// const CryptoKeyReaderPtr getCryptoKeyReader() const;
// ProducerConfiguration &setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader);
//
Expand Down
75 changes: 75 additions & 0 deletions pulsar-client-cpp/lib/CryptoKeyReader.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <fstream>
#include <sstream>
#include <pulsar/EncryptionKeyInfo.h>
#include <pulsar/CryptoKeyReader.h>
#include <pulsar/Result.h>

using namespace pulsar;

CryptoKeyReader::CryptoKeyReader() {}
CryptoKeyReader::~CryptoKeyReader() {}

Result CryptoKeyReader::getPublicKey(const std::string& keyName, std::map<std::string, std::string>& metadata,
EncryptionKeyInfo& encKeyInfo) const {
return ResultInvalidConfiguration;
}

Result CryptoKeyReader::getPrivateKey(const std::string& keyName,
std::map<std::string, std::string>& metadata,
EncryptionKeyInfo& encKeyInfo) const {
return ResultInvalidConfiguration;
}

DefaultCryptoKeyReader::DefaultCryptoKeyReader(const std::string& publicKeyPath,
const std::string& privateKeyPath) {
publicKeyPath_ = publicKeyPath;
privateKeyPath_ = privateKeyPath;
}

DefaultCryptoKeyReader::~DefaultCryptoKeyReader() {}

void DefaultCryptoKeyReader::readFile(std::string fileName, std::string& fileContents) const {
std::ifstream ifs(fileName);
std::stringstream fileStream;
fileStream << ifs.rdbuf();

fileContents = fileStream.str();
}

Result DefaultCryptoKeyReader::getPublicKey(const std::string& keyName,
std::map<std::string, std::string>& metadata,
EncryptionKeyInfo& encKeyInfo) const {
std::string keyContents;
readFile(publicKeyPath_, keyContents);

encKeyInfo.setKey(keyContents);
return ResultOk;
}

Result DefaultCryptoKeyReader::getPrivateKey(const std::string& keyName,
std::map<std::string, std::string>& metadata,
EncryptionKeyInfo& encKeyInfo) const {
std::string keyContents;
readFile(privateKeyPath_, keyContents);

encKeyInfo.setKey(keyContents);
return ResultOk;
}
21 changes: 21 additions & 0 deletions pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,27 @@ int pulsar_consumer_is_encryption_enabled(pulsar_consumer_configuration_t *consu
return consumer_configuration->consumerConfiguration.isEncryptionEnabled();
}

void pulsar_consumer_configuration_set_default_crypto_key_reader(
pulsar_consumer_configuration_t *consumer_configuration, const char *public_key_path,
const char *private_key_path) {
std::shared_ptr<pulsar::DefaultCryptoKeyReader> keyReader =
std::make_shared<pulsar::DefaultCryptoKeyReader>(public_key_path, private_key_path);
consumer_configuration->consumerConfiguration.setCryptoKeyReader(keyReader);
}

pulsar_consumer_crypto_failure_action pulsar_consumer_configuration_get_crypto_failure_action(
pulsar_consumer_configuration_t *consumer_configuration) {
return (pulsar_consumer_crypto_failure_action)
consumer_configuration->consumerConfiguration.getCryptoFailureAction();
}

void pulsar_consumer_configuration_set_crypto_failure_action(
pulsar_consumer_configuration_t *consumer_configuration,
pulsar_consumer_crypto_failure_action cryptoFailureAction) {
consumer_configuration->consumerConfiguration.setCryptoFailureAction(
(pulsar::ConsumerCryptoFailureAction)cryptoFailureAction);
}

int pulsar_consumer_is_read_compacted(pulsar_consumer_configuration_t *consumer_configuration) {
return consumer_configuration->consumerConfiguration.isReadCompacted();
}
Expand Down
27 changes: 27 additions & 0 deletions pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,30 @@ void pulsar_producer_configuration_set_property(pulsar_producer_configuration_t
const char *value) {
conf->conf.setProperty(name, value);
}

int pulsar_producer_is_encryption_enabled(pulsar_producer_configuration_t *conf) {
return conf->conf.isEncryptionEnabled();
}

void pulsar_producer_configuration_set_default_crypto_key_reader(pulsar_producer_configuration_t *conf,
const char *public_key_path,
const char *private_key_path) {
std::shared_ptr<pulsar::DefaultCryptoKeyReader> keyReader =
std::make_shared<pulsar::DefaultCryptoKeyReader>(public_key_path, private_key_path);
conf->conf.setCryptoKeyReader(keyReader);
}

pulsar_producer_crypto_failure_action pulsar_producer_configuration_set_crypto_failure_action(
pulsar_producer_configuration_t *conf) {
return (pulsar_producer_crypto_failure_action)conf->conf.getCryptoFailureAction();
}

void pulsar_producer_configuration_set_crypto_failure_action(
pulsar_producer_configuration_t *conf, pulsar_producer_crypto_failure_action cryptoFailureAction) {
conf->conf.setCryptoFailureAction((pulsar::ProducerCryptoFailureAction)cryptoFailureAction);
}

void pulsar_producer_configuration_set_encryption_key(pulsar_producer_configuration_t *conf,
const char *key) {
conf->conf.addEncryptionKey(key);
}
62 changes: 24 additions & 38 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <pulsar/Client.h>
#include <pulsar/Consumer.h>
#include <pulsar/MessageBuilder.h>
#include <pulsar/CryptoKeyReader.h>

#include <lib/Latch.h>
#include <lib/Utils.h>
Expand Down Expand Up @@ -116,42 +117,6 @@ static void sendCallBackWithDelay(Result r, const MessageId &msgId, std::string
sendCallBack(r, msgId, prefix, count);
}

class EncKeyReader : public CryptoKeyReader {
private:
void readFile(std::string fileName, std::string &fileContents) const {
std::ifstream ifs(fileName);
std::stringstream fileStream;
fileStream << ifs.rdbuf();

fileContents = fileStream.str();
}

public:
EncKeyReader() {}

Result getPublicKey(const std::string &keyName, std::map<std::string, std::string> &metadata,
EncryptionKeyInfo &encKeyInfo) const {
std::string CERT_FILE_PATH =
"../../pulsar-broker/src/test/resources/certificate/public-key." + keyName;
std::string keyContents;
readFile(CERT_FILE_PATH, keyContents);

encKeyInfo.setKey(keyContents);
return ResultOk;
}

Result getPrivateKey(const std::string &keyName, std::map<std::string, std::string> &metadata,
EncryptionKeyInfo &encKeyInfo) const {
std::string CERT_FILE_PATH =
"../../pulsar-broker/src/test/resources/certificate/private-key." + keyName;
std::string keyContents;
readFile(CERT_FILE_PATH, keyContents);

encKeyInfo.setKey(keyContents);
return ResultOk;
}
};

TEST(BasicEndToEndTest, testBatchMessages) {
ClientConfiguration config;
Client client(lookupUrl);
Expand Down Expand Up @@ -1323,7 +1288,14 @@ TEST(BasicEndToEndTest, testRSAEncryption) {
std::string subName = "my-sub-name";
Producer producer;

std::shared_ptr<EncKeyReader> keyReader = std::make_shared<EncKeyReader>();
std::string PUBLIC_CERT_FILE_PATH =
"../../pulsar-broker/src/test/resources/certificate/public-key.client-rsa.pem";

std::string PRIVATE_CERT_FILE_PATH =
"../../pulsar-broker/src/test/resources/certificate/private-key.client-rsa.pem";

std::shared_ptr<pulsar::DefaultCryptoKeyReader> keyReader =
std::make_shared<pulsar::DefaultCryptoKeyReader>(PUBLIC_CERT_FILE_PATH, PRIVATE_CERT_FILE_PATH);
ProducerConfiguration conf;
conf.setCompressionType(CompressionLZ4);
conf.addEncryptionKey("client-rsa.pem");
Expand Down Expand Up @@ -1381,7 +1353,14 @@ TEST(BasicEndToEndTest, testEncryptionFailure) {
std::string subName = "my-sub-name";
Producer producer;

std::shared_ptr<EncKeyReader> keyReader = std::make_shared<EncKeyReader>();
std::string PUBLIC_CERT_FILE_PATH =
"../../pulsar-broker/src/test/resources/certificate/public-key.client-rsa-test.pem";

std::string PRIVATE_CERT_FILE_PATH =
"../../pulsar-broker/src/test/resources/certificate/private-key.client-rsa-test.pem";

std::shared_ptr<pulsar::DefaultCryptoKeyReader> keyReader =
std::make_shared<pulsar::DefaultCryptoKeyReader>(PUBLIC_CERT_FILE_PATH, PRIVATE_CERT_FILE_PATH);

ConsumerConfiguration consConfig;

Expand Down Expand Up @@ -1418,6 +1397,13 @@ TEST(BasicEndToEndTest, testEncryptionFailure) {

// 2. Add valid key
{
PUBLIC_CERT_FILE_PATH =
"../../pulsar-broker/src/test/resources/certificate/public-key.client-rsa.pem";

PRIVATE_CERT_FILE_PATH =
"../../pulsar-broker/src/test/resources/certificate/private-key.client-rsa.pem";
keyReader =
std::make_shared<pulsar::DefaultCryptoKeyReader>(PUBLIC_CERT_FILE_PATH, PRIVATE_CERT_FILE_PATH);
ProducerConfiguration prodConf;
prodConf.setCryptoKeyReader(keyReader);
prodConf.setBatchingEnabled(false);
Expand Down

0 comments on commit 956328d

Please sign in to comment.