Skip to content

Commit

Permalink
[Issue 8382][Pulsar Function] Enable e2e encryption for Pulsar Functi…
Browse files Browse the repository at this point in the history
…on (apache#8432)

Fixes apache#8382 


### Motivation

Add the e2e encryption support for Pulsar Functions

### Modifications

- Add `CryptoConfig` the encapsulate all the crypto related configs set by user
- Add `CryptoSpec` to `Function` protobuf to container crypto information internally
- Add `CryptoUtils` to help create instance, convert between `CryptoConfig` and `CryptoSpec`
- Add crypto validation method in `ValidatorUtils` to ensure the provided `CryptoKeyReader` Class has a ctor with `Map` arg
- Updated the cli to allow user set crypto for consumer/producer when submitting the function
- Update `PulsarSource`, `PulsarSink` to use the crypto config if provided

### Verifying this change

- [x] Make sure that the change passes the CI checks.
  • Loading branch information
nlu90 authored Nov 8, 2020
1 parent 26fd380 commit 2c9fe27
Show file tree
Hide file tree
Showing 19 changed files with 577 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.functions.Utils;
Expand Down Expand Up @@ -219,6 +220,8 @@ abstract class FunctionDetailsCommand extends BaseCommand {

@Parameter(names = {"-o", "--output"}, description = "The output topic of a Pulsar Function (If none is specified, no output is written)")
protected String output;
@Parameter(names = "--producer-config", description = "The custom producer configuration (as a JSON string)" )
protected String producerConfig;
// for backwards compatibility purposes
@Parameter(names = "--logTopic", description = "The topic to which the logs of a Pulsar Function are produced", hidden = true)
protected String DEPRECATED_logTopic;
Expand Down Expand Up @@ -391,6 +394,10 @@ void processArguments() throws Exception {
if (null != output) {
functionConfig.setOutput(output);
}
if (null != producerConfig) {
Type type = new TypeToken<ProducerConfig>() {}.getType();
functionConfig.setProducerConfig(new Gson().fromJson(producerConfig, type));
}
if (null != logTopic) {
functionConfig.setLogTopic(logTopic);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.admin.cli.utils.CmdUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.io.ConnectorDefinition;
Expand Down Expand Up @@ -270,6 +271,8 @@ abstract class SourceDetailsCommand extends BaseCommand {
protected String DEPRECATED_destinationTopicName;
@Parameter(names = "--destination-topic-name", description = "The Pulsar topic to which data is sent")
protected String destinationTopicName;
@Parameter(names = "--producer-config", description = "The custom producer configuration (as a JSON string)")
protected String producerConfig;

@Parameter(names = "--deserializationClassName", description = "The SerDe classname for the source", hidden = true)
protected String DEPRECATED_deserializationClassName;
Expand Down Expand Up @@ -346,6 +349,10 @@ void processArguments() throws Exception {
if (null != destinationTopicName) {
sourceConfig.setTopicName(destinationTopicName);
}
if (null != producerConfig) {
Type type = new TypeToken<ProducerConfig>() {}.getType();
sourceConfig.setProducerConfig(new Gson().fromJson(producerConfig, type));
}
if (null != deserializationClassName) {
sourceConfig.setSerdeClassName(deserializationClassName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class ConsumerConfig {
@Builder.Default
private Map<String, String> consumerProperties = new HashMap<>();
private Integer receiverQueueSize;
private CryptoConfig cryptoConfig;

public ConsumerConfig(String schemaType) {
this.schemaType = schemaType;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.common.functions;

import java.util.Map;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;

/**
* Configuration of the producer inside the function.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
public class CryptoConfig {
private String cryptoKeyReaderClassName;
private Map<String, Object> cryptoKeyReaderConfig;

private String[] encryptionKeys;
private ProducerCryptoFailureAction producerCryptoFailureAction;

private ConsumerCryptoFailureAction consumerCryptoFailureAction;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ public class ProducerConfig {
private Integer maxPendingMessages;
private Integer maxPendingMessagesAcrossPartitions;
private Boolean useThreadLocalProducers;
private CryptoConfig cryptoConfig;
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public static <T> T createInstance(String userClassName,
} catch (NoSuchMethodException e) {
throw new RuntimeException("User class must have a no-arg constructor", e);
} catch (IllegalAccessException e) {
throw new RuntimeException("User class must a public constructor", e);
throw new RuntimeException("User class must have a public constructor", e);
} catch (InvocationTargetException e) {
throw new RuntimeException("User class constructor throws exception", e);
}
Expand Down
6 changes: 6 additions & 0 deletions pulsar-functions/instance/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-messagecrypto-bc</artifactId>
<version>${project.parent.version}</version>
</dependency>

<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>stream-storage-java-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.CryptoConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
Expand All @@ -73,6 +75,7 @@
import org.apache.pulsar.functions.source.PulsarSourceConfig;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.source.batch.BatchSourceExecutor;
import org.apache.pulsar.functions.utils.CryptoUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.io.core.Sink;
Expand Down Expand Up @@ -690,6 +693,10 @@ private void setupInput(ContextImpl contextImpl) throws Exception {
if (conf.hasReceiverQueueSize()) {
consumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize().getValue());
}
if (conf.hasCryptoSpec()) {
consumerConfig.setCryptoConfig(CryptoUtils.convertFromSpec(conf.getCryptoSpec()));
}

pulsarSourceConfig.getTopicSchema().put(topic, consumerConfig);
});

Expand Down Expand Up @@ -815,7 +822,13 @@ private void setupOutput(ContextImpl contextImpl) throws Exception {
pulsarSinkConfig.setSchemaProperties(sinkSpec.getSchemaPropertiesMap());

if (this.instanceConfig.getFunctionDetails().getSink().getProducerSpec() != null) {
pulsarSinkConfig.setProducerSpec(this.instanceConfig.getFunctionDetails().getSink().getProducerSpec());
org.apache.pulsar.functions.proto.Function.ProducerSpec conf = this.instanceConfig.getFunctionDetails().getSink().getProducerSpec();
ProducerConfig.ProducerConfigBuilder builder = ProducerConfig.builder()
.maxPendingMessages(conf.getMaxPendingMessages())
.maxPendingMessagesAcrossPartitions(conf.getMaxPendingMessagesAcrossPartitions())
.useThreadLocalProducers(conf.getUseThreadLocalProducers())
.cryptoConfig(CryptoUtils.convertFromSpec(conf.getCryptoSpec()));
pulsarSinkConfig.setProducerConfig(builder.build());
}

object = new PulsarSink(this.client, pulsarSinkConfig, this.properties, this.stats, this.functionClassLoader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,28 @@
package org.apache.pulsar.functions.sink;

import com.google.common.annotations.VisibleForTesting;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.CryptoConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.FunctionResultRouter;
Expand All @@ -42,9 +49,12 @@
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.utils.CryptoUtils;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.bouncycastle.jce.provider.BouncyCastleProvider;

import java.security.Security;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
Expand All @@ -55,6 +65,8 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static org.apache.commons.lang.StringUtils.isEmpty;

@Slf4j
public class PulsarSink<T> implements Sink<T> {

Expand All @@ -81,9 +93,11 @@ private interface PulsarSinkProcessor<T> {
private abstract class PulsarSinkProcessorBase implements PulsarSinkProcessor<T> {
protected Map<String, Producer<T>> publishProducers = new ConcurrentHashMap<>();
protected Schema schema;
protected Crypto crypto;

protected PulsarSinkProcessorBase(Schema schema) {
protected PulsarSinkProcessorBase(Schema schema, Crypto crypto) {
this.schema = schema;
this.crypto = crypto;
}

public Producer<T> createProducer(PulsarClient client, String topic, String producerName, Schema<T> schema)
Expand All @@ -103,15 +117,23 @@ public Producer<T> createProducer(PulsarClient client, String topic, String prod
if (producerName != null) {
builder.producerName(producerName);
}
if (pulsarSinkConfig.getProducerSpec() != null) {
if (pulsarSinkConfig.getProducerSpec().getMaxPendingMessages() != 0) {
builder.maxPendingMessages(pulsarSinkConfig.getProducerSpec().getMaxPendingMessages());
if (pulsarSinkConfig.getProducerConfig() != null) {
ProducerConfig producerConfig = pulsarSinkConfig.getProducerConfig();
if (producerConfig.getMaxPendingMessages() != 0) {
builder.maxPendingMessages(producerConfig.getMaxPendingMessages());
}
if (producerConfig.getMaxPendingMessagesAcrossPartitions() != 0) {
builder.maxPendingMessagesAcrossPartitions(producerConfig.getMaxPendingMessagesAcrossPartitions());
}
if (pulsarSinkConfig.getProducerSpec().getMaxPendingMessagesAcrossPartitions() != 0) {
builder.maxPendingMessagesAcrossPartitions(pulsarSinkConfig.getProducerSpec().getMaxPendingMessagesAcrossPartitions());
if (producerConfig.getCryptoConfig() != null) {
CryptoConfig cryptoConfig = producerConfig.getCryptoConfig();
builder.cryptoKeyReader(crypto.keyReader);
builder.cryptoFailureAction(crypto.failureAction);
for (String encryptionKeyName : crypto.getEncryptionKeys()) {
builder.addEncryptionKey(encryptionKeyName);
}
}
}

return builder.properties(properties).create();
}

Expand Down Expand Up @@ -175,8 +197,8 @@ public Function<Throwable, Void> getPublishErrorHandler(Record<T> record, boolea

@VisibleForTesting
class PulsarSinkAtMostOnceProcessor extends PulsarSinkProcessorBase {
public PulsarSinkAtMostOnceProcessor(Schema schema) {
super(schema);
public PulsarSinkAtMostOnceProcessor(Schema schema, Crypto crypto) {
super(schema, crypto);
// initialize default topic
try {
publishProducers.put(pulsarSinkConfig.getTopic(),
Expand Down Expand Up @@ -211,8 +233,8 @@ public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) {

@VisibleForTesting
class PulsarSinkAtLeastOnceProcessor extends PulsarSinkAtMostOnceProcessor {
public PulsarSinkAtLeastOnceProcessor(Schema schema) {
super(schema);
public PulsarSinkAtLeastOnceProcessor(Schema schema, Crypto crypto) {
super(schema, crypto);
}

@Override
Expand All @@ -226,8 +248,8 @@ public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) {
@VisibleForTesting
class PulsarSinkEffectivelyOnceProcessor extends PulsarSinkProcessorBase {

public PulsarSinkEffectivelyOnceProcessor(Schema schema) {
super(schema);
public PulsarSinkEffectivelyOnceProcessor(Schema schema, Crypto crypto) {
super(schema, crypto);
}

@Override
Expand Down Expand Up @@ -284,16 +306,21 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
return;
}

Crypto crypto = initializeCrypto();
if (crypto == null) {
log.info("crypto key reader is not provided, not enabling end to end encryption");
}

FunctionConfig.ProcessingGuarantees processingGuarantees = this.pulsarSinkConfig.getProcessingGuarantees();
switch (processingGuarantees) {
case ATMOST_ONCE:
this.pulsarSinkProcessor = new PulsarSinkAtMostOnceProcessor(schema);
this.pulsarSinkProcessor = new PulsarSinkAtMostOnceProcessor(schema, crypto);
break;
case ATLEAST_ONCE:
this.pulsarSinkProcessor = new PulsarSinkAtLeastOnceProcessor(schema);
this.pulsarSinkProcessor = new PulsarSinkAtLeastOnceProcessor(schema, crypto);
break;
case EFFECTIVELY_ONCE:
this.pulsarSinkProcessor = new PulsarSinkEffectivelyOnceProcessor(schema);
this.pulsarSinkProcessor = new PulsarSinkEffectivelyOnceProcessor(schema, crypto);
break;
}
}
Expand Down Expand Up @@ -360,4 +387,38 @@ Schema<T> initializeSchema() throws ClassNotFoundException {
consumerConfig, false, functionClassLoader);
}
}

@SuppressWarnings("unchecked")
@VisibleForTesting
Crypto initializeCrypto() throws ClassNotFoundException {
if (pulsarSinkConfig.getProducerConfig() == null
|| pulsarSinkConfig.getProducerConfig().getCryptoConfig() == null
|| isEmpty(pulsarSinkConfig.getProducerConfig().getCryptoConfig().getCryptoKeyReaderClassName())) {
return null;
}

CryptoConfig cryptoConfig = pulsarSinkConfig.getProducerConfig().getCryptoConfig();

// add provider only if it's not in the JVM
if (Security.getProvider(BouncyCastleProvider.PROVIDER_NAME) == null) {
Security.addProvider(new BouncyCastleProvider());
}

Crypto.CryptoBuilder bldr = Crypto.builder()
.failureAction(cryptoConfig.getProducerCryptoFailureAction())
.encryptionKeys(cryptoConfig.getEncryptionKeys());

bldr.keyReader(CryptoUtils.getCryptoKeyReaderInstance(
cryptoConfig.getCryptoKeyReaderClassName(), cryptoConfig.getCryptoKeyReaderConfig(), functionClassLoader));

return bldr.build();
}

@Data
@Builder
private static class Crypto {
private CryptoKeyReader keyReader;
private ProducerCryptoFailureAction failureAction;
private String[] encryptionKeys;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import lombok.Setter;
import lombok.ToString;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.common.functions.ProducerConfig;

import java.util.Map;

Expand All @@ -37,5 +37,5 @@ public class PulsarSinkConfig {
private Map<String, String> schemaProperties;
private String typeClassName;
private boolean forwardSourceMessageProperty;
private Function.ProducerSpec producerSpec;
private ProducerConfig producerConfig;
}
Loading

0 comments on commit 2c9fe27

Please sign in to comment.