Skip to content

Commit

Permalink
[Flink] Allow to customize PulsarProducer (apache#3955)
Browse files Browse the repository at this point in the history
This is an improvement over apache#3894.

Because of how Flink instantiates functions, instead of passing a
custom `PulsarProducer` client we need to pass an object that is
serializable. The current implementation will default to always
call `createProducer()` because `producer` is `transient`, so it will
always be null when Flink creates new instances of the sink.
  • Loading branch information
casidiablo authored and sijie committed Apr 2, 2019
1 parent 181c341 commit 4101168
Showing 1 changed file with 16 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.flink.util.Preconditions.checkNotNull;

import java.util.function.Function;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.RuntimeContext;
Expand All @@ -37,8 +38,10 @@
import org.apache.flink.util.SerializableObject;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -72,6 +75,11 @@ public class FlinkPulsarProducer<IN>
*/
protected final PulsarKeyExtractor<IN> flinkPulsarKeyExtractor;

/**
* {@link Producer} configuration map (will be materialized as a {@link ProducerConfigurationData} instance)
*/
protected final Map<String, Object> producerConfig;

/**
* Produce Mode.
*/
Expand Down Expand Up @@ -122,15 +130,15 @@ public FlinkPulsarProducer(String serviceUrl,
String defaultTopicName,
SerializationSchema<IN> serializationSchema,
PulsarKeyExtractor<IN> keyExtractor,
Producer<byte[]> producer) {
Map<String, Object> producerConfig) {
checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url cannot be blank");
checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName cannot be blank");
this.serviceUrl = serviceUrl;
this.defaultTopicName = defaultTopicName;
this.schema = checkNotNull(serializationSchema, "Serialization Schema not set");
this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
ClosureCleaner.ensureSerializable(serializationSchema);
this.producer = producer;
this.producerConfig = producerConfig;
}

// ---------------------------------- Properties --------------------------
Expand Down Expand Up @@ -183,7 +191,11 @@ private static final <T> PulsarKeyExtractor<T> getOrNullKeyExtractor(PulsarKeyEx

private Producer<byte[]> createProducer() throws Exception {
PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
return client.newProducer().topic(defaultTopicName).create();
ProducerBuilder<byte[]> producerBuilder = client.newProducer();
if (producerConfig != null) {
producerBuilder = producerBuilder.loadConf(producerConfig);
}
return producerBuilder.topic(defaultTopicName).create();
}

/**
Expand All @@ -194,10 +206,7 @@ private Producer<byte[]> createProducer() throws Exception {
*/
@Override
public void open(Configuration parameters) throws Exception {
if (producer == null) {
// If no custom producer was specified create a default one
this.producer = createProducer();
}
this.producer = createProducer();

RuntimeContext ctx = getRuntimeContext();

Expand Down

0 comments on commit 4101168

Please sign in to comment.