Skip to content

Commit

Permalink
Support key_based batch builder for functions and sources (apache#8523)
Browse files Browse the repository at this point in the history
### Motivation

Currently, we support the Key_Shared subscription mode in Pulsar Function. In order to ensure that when batch is turned on, we can also ensure that messages are distributed to different consumers in the correct order, so we need to support the batch builder of `KEY_BASED` in Pulsar Functions.

### Modifications

- Add `--batch-builder` for Pulsar Functions
- Add `--batch-builder` for Pulsar Sources
- Add test case
  • Loading branch information
wolfstudy authored Nov 16, 2020
1 parent f8848c8 commit 8da9422
Show file tree
Hide file tree
Showing 12 changed files with 50 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ abstract class FunctionDetailsCommand extends BaseCommand {
protected Boolean DEPRECATED_retainOrdering;
@Parameter(names = "--retain-ordering", description = "Function consumes and processes messages in order")
protected Boolean retainOrdering;
@Parameter(names = "--batch-builder", description = "BatcherBuilder provides two types of batch construction methods, DEFAULT and KEY_BASED. The default value is: DEFAULT")
protected String batchBuilder;
@Parameter(names = "--forward-source-message-property", description = "Forwarding input message's properties to output topic when processing")
protected Boolean forwardSourceMessageProperty = true;
@Parameter(names = "--subs-name", description = "Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer")
Expand Down Expand Up @@ -419,6 +421,10 @@ void processArguments() throws Exception {
functionConfig.setRetainOrdering(retainOrdering);
}

if (isNotBlank(batchBuilder)) {
functionConfig.setBatchBuilder(batchBuilder);
}

if (null != forwardSourceMessageProperty) {
functionConfig.setForwardSourceMessageProperty(forwardSourceMessageProperty);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ abstract class SourceDetailsCommand extends BaseCommand {
@Parameter(names = "--producer-config", description = "The custom producer configuration (as a JSON string)")
protected String producerConfig;

@Parameter(names = "--batch-builder", description = "BatchBuilder provides two types of batch construction methods, DEFAULT and KEY_BASED. The default value is: DEFAULT")
protected String batchBuilder;

@Parameter(names = "--deserializationClassName", description = "The SerDe classname for the source", hidden = true)
protected String DEPRECATED_deserializationClassName;
@Parameter(names = "--deserialization-classname", description = "The SerDe classname for the source")
Expand Down Expand Up @@ -360,6 +363,10 @@ void processArguments() throws Exception {
sourceConfig.setSchemaType(schemaType);
}

if (null != batchBuilder) {
sourceConfig.setBatchBuilder(batchBuilder);
}

if (null != processingGuarantees) {
sourceConfig.setProcessingGuarantees(processingGuarantees);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public enum Runtime {
private Boolean retainOrdering;
// Do we want the same function instance to process all data keyed by the input topic's message key
private Boolean retainKeyOrdering;
// batchBuilder provides two types of batch construction methods, DEFAULT and KEY_BASED
private String batchBuilder;
private Boolean forwardSourceMessageProperty;
private Map<String, Object> userConfig;
// This is a map of secretName(aka how the secret is going to be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ public class ProducerConfig {
private Integer maxPendingMessagesAcrossPartitions;
private Boolean useThreadLocalProducers;
private CryptoConfig cryptoConfig;
private String batchBuilder;
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,6 @@ public class SourceConfig {

// If this is a BatchSource, its batch related configs are stored here
private BatchSourceConfig batchSourceConfig;
// batchBuilder provides two types of batch construction methods, DEFAULT and KEY_BASED
private String batchBuilder;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.HashingScheme;
Expand Down Expand Up @@ -133,6 +134,13 @@ public Producer<T> createProducer(PulsarClient client, String topic, String prod
builder.addEncryptionKey(encryptionKeyName);
}
}
if (producerConfig.getBatchBuilder() != null) {
if (producerConfig.getBatchBuilder().equals("KEY_BASED")) {
builder.batcherBuilder(BatcherBuilder.KEY_BASED);
} else {
builder.batcherBuilder(BatcherBuilder.DEFAULT);
}
}
}
return builder.properties(properties).create();
}
Expand Down
1 change: 1 addition & 0 deletions pulsar-functions/proto/src/main/proto/Function.proto
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ message ProducerSpec {
int32 maxPendingMessagesAcrossPartitions = 2;
bool useThreadLocalProducers = 3;
CryptoSpec cryptoSpec = 4;
string batchBuilder = 5;
}

message CryptoSpec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader
if (producerConf.getCryptoConfig() != null) {
pbldr.setCryptoSpec(CryptoUtils.convert(producerConf.getCryptoConfig()));
}
if (producerConf.getBatchBuilder() != null) {
pbldr.setBatchBuilder(producerConf.getBatchBuilder());
}
sinkSpecBuilder.setProducerSpec(pbldr.build());
}
functionDetailsBuilder.setSink(sinkSpecBuilder);
Expand Down Expand Up @@ -388,6 +391,9 @@ public static FunctionConfig convertFromDetails(FunctionDetails functionDetails)
if (spec.hasCryptoSpec()) {
producerConfig.setCryptoConfig(CryptoUtils.convertFromSpec(spec.getCryptoSpec()));
}
if (spec.getBatchBuilder() != null) {
producerConfig.setBatchBuilder(spec.getBatchBuilder());
}
producerConfig.setUseThreadLocalProducers(spec.getUseThreadLocalProducers());
functionConfig.setProducerConfig(producerConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ public static FunctionDetails convert(SourceConfig sourceConfig, ExtractedSource
if (conf.getCryptoConfig() != null) {
pbldr.setCryptoSpec(CryptoUtils.convert(conf.getCryptoConfig()));
}
if (conf.getBatchBuilder() != null) {
pbldr.setBatchBuilder(conf.getBatchBuilder());
}
sinkSpecBuilder.setProducerSpec(pbldr.build());
}

Expand Down Expand Up @@ -247,6 +250,9 @@ public static SourceConfig convertFromDetails(FunctionDetails functionDetails) {
if (spec.hasCryptoSpec()) {
producerConfig.setCryptoConfig(CryptoUtils.convertFromSpec(spec.getCryptoSpec()));
}
if (spec.getBatchBuilder() != null) {
producerConfig.setBatchBuilder(spec.getBatchBuilder());
}
producerConfig.setUseThreadLocalProducers(spec.getUseThreadLocalProducers());
sourceConfig.setProducerConfig(producerConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public void testConvertBackFidelity() {
producerConfig.setMaxPendingMessages(100);
producerConfig.setMaxPendingMessagesAcrossPartitions(1000);
producerConfig.setUseThreadLocalProducers(true);
producerConfig.setBatchBuilder("DEFAULT");
functionConfig.setProducerConfig(producerConfig);
Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null);
FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
Expand Down Expand Up @@ -115,6 +116,7 @@ public void testConvertWindow() {
producerConfig.setMaxPendingMessages(100);
producerConfig.setMaxPendingMessagesAcrossPartitions(1000);
producerConfig.setUseThreadLocalProducers(true);
producerConfig.setBatchBuilder("KEY_BASED");
functionConfig.setProducerConfig(producerConfig);
Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null);
FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
Expand Down Expand Up @@ -449,6 +451,7 @@ private FunctionConfig createFunctionConfig() {
functionConfig.setRetainOrdering(false);
functionConfig.setRetainKeyOrdering(false);
functionConfig.setSubscriptionPosition(SubscriptionInitialPosition.Earliest);
functionConfig.setBatchBuilder("DEFAULT");
functionConfig.setForwardSourceMessageProperty(false);
functionConfig.setUserConfig(new HashMap<>());
functionConfig.setAutoAck(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ private SourceConfig createSourceConfig() {
producerConfig.setMaxPendingMessages(100);
producerConfig.setMaxPendingMessagesAcrossPartitions(1000);
producerConfig.setUseThreadLocalProducers(true);
producerConfig.setBatchBuilder("DEFAULT");
sourceConfig.setProducerConfig(producerConfig);

sourceConfig.setConfigs(configs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public enum Runtime {
private Runtime runtime;
private Integer parallelism;
private String adminUrl;
private String batchBuilder;
private Integer windowLengthCount;
private Long windowLengthDurationMs;
private Integer slidingIntervalCount;
Expand Down Expand Up @@ -154,6 +155,9 @@ public String generateCreateFunctionCommand(String codeFile) {
if (logTopic != null) {
commandBuilder.append(" --logTopic " + logTopic);
}
if (batchBuilder != null) {
commandBuilder.append("--batch-builder" + batchBuilder);
}
if (customSereSourceTopics != null && !customSereSourceTopics.isEmpty()) {
commandBuilder.append(" --customSerdeInputs \'" + new Gson().toJson(customSereSourceTopics) + "\'");
}
Expand Down Expand Up @@ -239,6 +243,9 @@ public String generateUpdateFunctionCommand(String codeFile) {
if (customSereSourceTopics != null && !customSereSourceTopics.isEmpty()) {
commandBuilder.append(" --customSerdeInputs \'" + new Gson().toJson(customSereSourceTopics) + "\'");
}
if (batchBuilder != null) {
commandBuilder.append("--batch-builder" + batchBuilder);
}
if (sinkTopic != null) {
commandBuilder.append(" --output " + sinkTopic);
}
Expand Down

0 comments on commit 8da9422

Please sign in to comment.