From 8cc979de411f9bca05ea95b16807f1860b79382e Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Fri, 24 Feb 2023 11:03:37 +0800 Subject: [PATCH] [improve][fn] Support e2e cryption in python instance (#18738) --- .../src/main/python/python_instance.py | 28 +++++++++++++++++-- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index f77ef38f76e4d..ac723232f4839 100755 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -22,6 +22,7 @@ """python_instance.py: Python Instance for running python functions """ import base64 +import json import os import signal import time @@ -188,13 +189,16 @@ def run(self): consumer_conf.schemaProperties) Log.debug("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) + crypto_key_reader = self.get_crypto_reader(consumer_conf.cryptoSpec) + consumer_args = { "consumer_type": mode, "schema": self.input_schema[topic], "message_listener": partial(self.message_listener, self.input_serdes[topic], self.input_schema[topic]), "unacked_messages_timeout_ms": int(self.timeout_ms) if self.timeout_ms else None, "initial_position": position, - "properties": properties + "properties": properties, + "crypto_key_reader": crypto_key_reader } if consumer_conf.HasField("receiverQueueSize"): consumer_args["receiver_queue_size"] = consumer_conf.receiverQueueSize.value @@ -343,6 +347,10 @@ def setup_producer(self): self.output_schema = self.get_schema(self.instance_config.function_details.sink.schemaType, self.instance_config.function_details.sink.typeClassName, self.instance_config.function_details.sink.schemaProperties) + crypto_key_reader = self.get_crypto_reader(self.instance_config.function_details.sink.producerSpec.cryptoSpec) + encryption_key = None + if crypto_key_reader is not None: + encryption_key = self.instance_config.function_details.sink.producerSpec.cryptoSpec.producerEncryptionKeyName[0] self.producer = self.pulsar_client.create_producer( str(self.instance_config.function_details.sink.topic), @@ -355,6 +363,9 @@ def setup_producer(self): # set send timeout to be infinity to prevent potential deadlock with consumer # that might happen when consumer is blocked due to unacked messages send_timeout_millis=0, + # python client only supports one key for encryption + encryption_key=encryption_key, + crypto_key_reader=crypto_key_reader, properties=util.get_properties(util.getFullyQualifiedFunctionName( self.instance_config.function_details.tenant, self.instance_config.function_details.namespace, @@ -484,7 +495,6 @@ def close(self): if self.pulsar_client: self.pulsar_client.close() - # TODO: support other schemas: PROTOBUF, PROTOBUF_NATIVE, and KeyValue def get_schema(self, schema_type, type_class_name, schema_properties): schema = DEFAULT_SCHEMA @@ -526,4 +536,16 @@ def get_record_class(self, class_name): record_kclass = util.import_class(os.path.dirname(self.user_code), class_name) except: pass - return record_kclass \ No newline at end of file + return record_kclass + def get_crypto_reader(self, crypto_spec): + crypto_key_reader = None + if crypto_spec is not None: + try: + crypto_config = json.loads(crypto_spec.cryptoKeyReaderConfig) + if crypto_spec.cryptoKeyReaderClassName == "" or crypto_spec.cryptoKeyReaderClassName is None: + crypto_key_reader = pulsar.CryptoKeyReader(**crypto_config) + else: + crypto_key_reader = util.import_class(os.path.dirname(self.user_code), crypto_spec.cryptoKeyReaderClassName)(**crypto_config) + except Exception as e: + Log.error("Failed to load the crypto key reader from spec: %s, error: %s" % (crypto_spec, e)) + return crypto_key_reader