Skip to content

Commit

Permalink
[improve][fn] Support e2e cryption in python instance (apache#18738)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangpengcheng authored Feb 24, 2023
1 parent bf982f4 commit 8cc979d
Showing 1 changed file with 25 additions and 3 deletions.
28 changes: 25 additions & 3 deletions pulsar-functions/instance/src/main/python/python_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"""python_instance.py: Python Instance for running python functions
"""
import base64
import json
import os
import signal
import time
Expand Down Expand Up @@ -188,13 +189,16 @@ def run(self):
consumer_conf.schemaProperties)
Log.debug("Setting up consumer for topic %s with subname %s" % (topic, subscription_name))

crypto_key_reader = self.get_crypto_reader(consumer_conf.cryptoSpec)

consumer_args = {
"consumer_type": mode,
"schema": self.input_schema[topic],
"message_listener": partial(self.message_listener, self.input_serdes[topic], self.input_schema[topic]),
"unacked_messages_timeout_ms": int(self.timeout_ms) if self.timeout_ms else None,
"initial_position": position,
"properties": properties
"properties": properties,
"crypto_key_reader": crypto_key_reader
}
if consumer_conf.HasField("receiverQueueSize"):
consumer_args["receiver_queue_size"] = consumer_conf.receiverQueueSize.value
Expand Down Expand Up @@ -343,6 +347,10 @@ def setup_producer(self):
self.output_schema = self.get_schema(self.instance_config.function_details.sink.schemaType,
self.instance_config.function_details.sink.typeClassName,
self.instance_config.function_details.sink.schemaProperties)
crypto_key_reader = self.get_crypto_reader(self.instance_config.function_details.sink.producerSpec.cryptoSpec)
encryption_key = None
if crypto_key_reader is not None:
encryption_key = self.instance_config.function_details.sink.producerSpec.cryptoSpec.producerEncryptionKeyName[0]

self.producer = self.pulsar_client.create_producer(
str(self.instance_config.function_details.sink.topic),
Expand All @@ -355,6 +363,9 @@ def setup_producer(self):
# set send timeout to be infinity to prevent potential deadlock with consumer
# that might happen when consumer is blocked due to unacked messages
send_timeout_millis=0,
# python client only supports one key for encryption
encryption_key=encryption_key,
crypto_key_reader=crypto_key_reader,
properties=util.get_properties(util.getFullyQualifiedFunctionName(
self.instance_config.function_details.tenant,
self.instance_config.function_details.namespace,
Expand Down Expand Up @@ -484,7 +495,6 @@ def close(self):
if self.pulsar_client:
self.pulsar_client.close()


# TODO: support other schemas: PROTOBUF, PROTOBUF_NATIVE, and KeyValue
def get_schema(self, schema_type, type_class_name, schema_properties):
schema = DEFAULT_SCHEMA
Expand Down Expand Up @@ -526,4 +536,16 @@ def get_record_class(self, class_name):
record_kclass = util.import_class(os.path.dirname(self.user_code), class_name)
except:
pass
return record_kclass
return record_kclass
def get_crypto_reader(self, crypto_spec):
crypto_key_reader = None
if crypto_spec is not None:
try:
crypto_config = json.loads(crypto_spec.cryptoKeyReaderConfig)
if crypto_spec.cryptoKeyReaderClassName == "" or crypto_spec.cryptoKeyReaderClassName is None:
crypto_key_reader = pulsar.CryptoKeyReader(**crypto_config)
else:
crypto_key_reader = util.import_class(os.path.dirname(self.user_code), crypto_spec.cryptoKeyReaderClassName)(**crypto_config)
except Exception as e:
Log.error("Failed to load the crypto key reader from spec: %s, error: %s" % (crypto_spec, e))
return crypto_key_reader

0 comments on commit 8cc979d

Please sign in to comment.