Skip to content

Commit

Permalink
[feature][python-client]support python end to end encryption (apache#…
Browse files Browse the repository at this point in the history
…9588)

* Support python end to end encryption

* Add test

* Add document for new args

* Fixed test by use absolute path
  • Loading branch information
tuteng authored Feb 18, 2021
1 parent f3dc022 commit cf63ae8
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 4 deletions.
4 changes: 3 additions & 1 deletion pulsar-client-cpp/include/pulsar/CryptoKeyReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class PULSAR_PUBLIC CryptoKeyReader {

}; /* namespace pulsar */

typedef std::shared_ptr<CryptoKeyReader> CryptoKeyReaderPtr;

class PULSAR_PUBLIC DefaultCryptoKeyReader : public CryptoKeyReader {
private:
std::string publicKeyPath_;
Expand All @@ -76,9 +78,9 @@ class PULSAR_PUBLIC DefaultCryptoKeyReader : public CryptoKeyReader {
EncryptionKeyInfo& encKeyInfo) const;
Result getPrivateKey(const std::string& keyName, std::map<std::string, std::string>& metadata,
EncryptionKeyInfo& encKeyInfo) const;
static CryptoKeyReaderPtr create(const std::string& publicKeyPath, const std::string& privateKeyPath);
}; /* namespace pulsar */

typedef std::shared_ptr<CryptoKeyReader> CryptoKeyReaderPtr;
} // namespace pulsar

#endif /* CRYPTOKEYREADER_H_ */
5 changes: 5 additions & 0 deletions pulsar-client-cpp/lib/CryptoKeyReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,9 @@ Result DefaultCryptoKeyReader::getPrivateKey(const std::string& keyName,

encKeyInfo.setKey(keyContents);
return ResultOk;
}

CryptoKeyReaderPtr DefaultCryptoKeyReader::create(const std::string& publicKeyPath,
const std::string& privateKeyPath) {
return CryptoKeyReaderPtr(new DefaultCryptoKeyReader(publicKeyPath, privateKeyPath));
}
3 changes: 2 additions & 1 deletion pulsar-client-cpp/python/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ ADD_LIBRARY(_pulsar SHARED src/pulsar.cc
src/message.cc
src/authentication.cc
src/reader.cc
src/schema.cc)
src/schema.cc
src/cryptoKeyReader.cc)

SET(CMAKE_SHARED_LIBRARY_PREFIX )
SET(CMAKE_SHARED_LIBRARY_SUFFIX .so)
Expand Down
39 changes: 38 additions & 1 deletion pulsar-client-cpp/python/pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,8 @@ def create_producer(self, topic,
message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
properties=None,
batching_type=BatchingType.Default,
encryption_key=None,
crypto_key_reader=None
):
"""
Create a new producer on a given topic.
Expand Down Expand Up @@ -519,6 +521,11 @@ def create_producer(self, topic,
(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
batched into single batch message:
[(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
* encryption_key:
The key used for symmetric encryption, configured on the producer side
* crypto_key_reader:
Symmetric encryption class implementation, configuring public key encryption messages for the producer
and private key decryption messages for the consumer
"""
_check_type(str, topic, 'topic')
_check_type_or_none(str, producer_name, 'producer_name')
Expand All @@ -535,6 +542,8 @@ def create_producer(self, topic,
_check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms')
_check_type_or_none(dict, properties, 'properties')
_check_type(BatchingType, batching_type, 'batching_type')
_check_type_or_none(str, encryption_key, 'encryption_key')
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')

conf = _pulsar.ProducerConfiguration()
conf.send_timeout_millis(send_timeout_millis)
Expand All @@ -557,6 +566,10 @@ def create_producer(self, topic,
conf.property(k, v)

conf.schema(schema.schema_info())
if encryption_key:
conf.encryption_key(encryption_key)
if crypto_key_reader:
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)

p = Producer()
p._producer = self._client.create_producer(topic, conf)
Expand All @@ -576,7 +589,8 @@ def subscribe(self, topic, subscription_name,
is_read_compacted=False,
properties=None,
pattern_auto_discovery_period=60,
initial_position=InitialPosition.Latest
initial_position=InitialPosition.Latest,
crypto_key_reader=None
):
"""
Subscribe to the given topic and subscription combination.
Expand Down Expand Up @@ -649,6 +663,9 @@ def my_listener(consumer, message):
Set the initial position of a consumer when subscribing to the topic.
It could be either: `InitialPosition.Earliest` or `InitialPosition.Latest`.
Default: `Latest`.
* crypto_key_reader:
Symmetric encryption class implementation, configuring public key encryption messages for the producer
and private key decryption messages for the consumer
"""
_check_type(str, subscription_name, 'subscription_name')
_check_type(ConsumerType, consumer_type, 'consumer_type')
Expand All @@ -664,6 +681,7 @@ def my_listener(consumer, message):
_check_type(bool, is_read_compacted, 'is_read_compacted')
_check_type_or_none(dict, properties, 'properties')
_check_type(InitialPosition, initial_position, 'initial_position')
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')

conf = _pulsar.ConsumerConfiguration()
conf.consumer_type(consumer_type)
Expand All @@ -686,6 +704,9 @@ def my_listener(consumer, message):

conf.schema(schema.schema_info())

if crypto_key_reader:
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)

c = Consumer()
if isinstance(topic, str):
# Single topic
Expand Down Expand Up @@ -1224,6 +1245,22 @@ def close(self):
self._reader.close()
self._client._consumers.remove(self)

class CryptoKeyReader:
"""
Default crypto key reader implementation
"""
def __init__(self, public_key_path, private_key_path):
"""
Create crypto key reader.
**Args**
* `public_key_path`: Path to the public key
* `private_key_path`: Path to private key
"""
_check_type(str, public_key_path, 'public_key_path')
_check_type(str, private_key_path, 'private_key_path')
self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path)

def _check_type(var_type, var, name):
if not isinstance(var, var_type):
Expand Down
22 changes: 21 additions & 1 deletion pulsar-client-cpp/python/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
from datetime import timedelta
from pulsar import Client, MessageId, \
CompressionType, ConsumerType, PartitionsRoutingMode, \
AuthenticationTLS, Authentication, AuthenticationToken, InitialPosition
AuthenticationTLS, Authentication, AuthenticationToken, InitialPosition, \
CryptoKeyReader

from _pulsar import ProducerConfiguration, ConsumerConfiguration

Expand Down Expand Up @@ -357,6 +358,25 @@ def test_tls_auth2(self):

client.close()

def test_encryption(self):
publicKeyPath = "/pulsar//pulsar-broker/src/test/resources/certificate/public-key.client-rsa.pem"
privateKeyPath = "/pulsar/pulsar-broker/src/test/resources/certificate/private-key.client-rsa.pem"
crypto_key_reader = CryptoKeyReader(publicKeyPath, privateKeyPath)
client = Client(self.serviceUrl)
topic = 'my-python-test-end-to-end-encryption'
consumer = client.subscribe(topic=topic,
subscription_name='my-subscription',
crypto_key_reader=crypto_key_reader)
producer = client.create_producer(topic=topic,
encryption_key="client-rsa.pem",
crypto_key_reader=crypto_key_reader)
producer.send('hello')
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.value(), 'hello')
consumer.unsubscribe()
client.close()

def test_tls_auth3(self):
certs_dir = '/pulsar/pulsar-broker/src/test/resources/authentication/tls/'
if not os.path.exists(certs_dir):
Expand Down
17 changes: 17 additions & 0 deletions pulsar-client-cpp/python/src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,20 @@ static ClientConfiguration& ClientConfiguration_setAuthentication(ClientConfigur
return conf;
}

static ConsumerConfiguration& ConsumerConfiguration_setCryptoKeyReader(ConsumerConfiguration& conf,
py::object cryptoKeyReader) {
CryptoKeyReaderWrapper cryptoKeyReaderWrapper = py::extract<CryptoKeyReaderWrapper>(cryptoKeyReader);
conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader);
return conf;
}

static ProducerConfiguration& ProducerConfiguration_setCryptoKeyReader(ProducerConfiguration& conf,
py::object cryptoKeyReader) {
CryptoKeyReaderWrapper cryptoKeyReaderWrapper = py::extract<CryptoKeyReaderWrapper>(cryptoKeyReader);
conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader);
return conf;
}

void export_config() {
using namespace boost::python;

Expand Down Expand Up @@ -128,6 +142,8 @@ void export_config() {
.def("property", &ProducerConfiguration::setProperty, return_self<>())
.def("batching_type", &ProducerConfiguration::setBatchingType, return_self<>())
.def("batching_type", &ProducerConfiguration::getBatchingType)
.def("encryption_key", &ProducerConfiguration::addEncryptionKey, return_self<>())
.def("crypto_key_reader", &ProducerConfiguration_setCryptoKeyReader, return_self<>())
;

class_<ConsumerConfiguration>("ConsumerConfiguration")
Expand Down Expand Up @@ -155,6 +171,7 @@ void export_config() {
.def("property", &ConsumerConfiguration::setProperty, return_self<>())
.def("subscription_initial_position", &ConsumerConfiguration::getSubscriptionInitialPosition)
.def("subscription_initial_position", &ConsumerConfiguration::setSubscriptionInitialPosition)
.def("crypto_key_reader", &ConsumerConfiguration_setCryptoKeyReader, return_self<>())
;

class_<ReaderConfiguration>("ReaderConfiguration")
Expand Down
32 changes: 32 additions & 0 deletions pulsar-client-cpp/python/src/cryptoKeyReader.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "utils.h"

CryptoKeyReaderWrapper::CryptoKeyReaderWrapper() {}

CryptoKeyReaderWrapper::CryptoKeyReaderWrapper(const std::string& publicKeyPath,
const std::string& privateKeyPath) {
this->cryptoKeyReader = DefaultCryptoKeyReader::create(publicKeyPath, privateKeyPath);
}

void export_cryptoKeyReader() {
using namespace boost::python;

class_<CryptoKeyReaderWrapper>("CryptoKeyReader", init<const std::string&, const std::string&>());
}
2 changes: 2 additions & 0 deletions pulsar-client-cpp/python/src/pulsar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ void export_config();
void export_enums();
void export_authentication();
void export_schema();
void export_cryptoKeyReader();


static void translateException(const PulsarException& ex) {
Expand All @@ -53,4 +54,5 @@ BOOST_PYTHON_MODULE(_pulsar)
export_enums();
export_authentication();
export_schema();
export_cryptoKeyReader();
}
7 changes: 7 additions & 0 deletions pulsar-client-cpp/python/src/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,10 @@ struct AuthenticationWrapper {
AuthenticationWrapper();
AuthenticationWrapper(const std::string& dynamicLibPath, const std::string& authParamsString);
};

struct CryptoKeyReaderWrapper {
CryptoKeyReaderPtr cryptoKeyReader;

CryptoKeyReaderWrapper();
CryptoKeyReaderWrapper(const std::string& publicKeyPath, const std::string& privateKeyPath);
};

0 comments on commit cf63ae8

Please sign in to comment.