Skip to content

Commit

Permalink
Added ability to specify producer config for functions and sources (a…
Browse files Browse the repository at this point in the history
…pache#7721)

* Added ability to specify producer config for functions and sources

* Fixed test

* Fix test

* Add generated function proto

* Add header

* Address comments

Co-authored-by: Sanjeev Kulkarni <[email protected]>
  • Loading branch information
srkukarni and Sanjeev Kulkarni authored Aug 5, 2020
1 parent 7bf4549 commit 22d7a6c
Show file tree
Hide file tree
Showing 13 changed files with 610 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ public enum Runtime {

private String output;

// Any configuration that need to be applied for producers
private ProducerConfig producerConfig;

/**
* Represents either a builtin schema type (eg: 'avro', 'json', ect) or the class name for a Schema
* implementation.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.functions;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

/**
* Configuration of the producer inside the function.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
public class ProducerConfig {
private Integer maxPendingMessages;
private Integer maxPendingMessagesAcrossPartitions;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.functions.Resources;

/**
Expand All @@ -41,6 +42,8 @@ public class SourceConfig {

private String topicName;

private ProducerConfig producerConfig;

private String serdeClassName;

private String schemaType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,

this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer().blockIfQueueFull(true).enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
if (config.getFunctionDetails().getSink().getProducerSpec() != null) {
if (config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages() != 0) {
this.producerBuilder.maxPendingMessages(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages());
}
if (config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions() != 0) {
this.producerBuilder.maxPendingMessagesAcrossPartitions(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions());
}
}

if (config.getFunctionDetails().getUserConfig().isEmpty()) {
userConfigs = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,10 @@ private void setupOutput(ContextImpl contextImpl) throws Exception {
pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());
pulsarSinkConfig.setSchemaProperties(sinkSpec.getSchemaPropertiesMap());

if (this.instanceConfig.getFunctionDetails().getSink().getProducerSpec() != null) {
pulsarSinkConfig.setProducerSpec(this.instanceConfig.getFunctionDetails().getSink().getProducerSpec());
}

object = new PulsarSink(this.client, pulsarSinkConfig, this.properties, this.stats, this.functionClassLoader);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ public Producer<T> createProducer(PulsarClient client, String topic, String prod
if (producerName != null) {
builder.producerName(producerName);
}
if (pulsarSinkConfig.getProducerSpec() != null) {
if (pulsarSinkConfig.getProducerSpec().getMaxPendingMessages() != 0) {
builder.maxPendingMessages(pulsarSinkConfig.getProducerSpec().getMaxPendingMessages());
}
if (pulsarSinkConfig.getProducerSpec().getMaxPendingMessagesAcrossPartitions() != 0) {
builder.maxPendingMessagesAcrossPartitions(pulsarSinkConfig.getProducerSpec().getMaxPendingMessagesAcrossPartitions());
}
}

return builder.properties(properties).create();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import lombok.Setter;
import lombok.ToString;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.proto.Function;

import java.util.Map;

Expand All @@ -36,4 +37,5 @@ public class PulsarSinkConfig {
private Map<String, String> schemaProperties;
private String typeClassName;
private boolean forwardSourceMessageProperty;
private Function.ProducerSpec producerSpec;
}
Loading

0 comments on commit 22d7a6c

Please sign in to comment.