Skip to content

Commit

Permalink
[flink] Allow to specify a custom Pulsar producer (apache#3894)
Browse files Browse the repository at this point in the history
This is necessary in pretty much any non-trivial use-case. The ability
to control the settings of the Pulsar producer is paramount to
building real-life applications
  • Loading branch information
casidiablo authored and sijie committed Mar 25, 2019
1 parent f93650f commit 19fd91c
Showing 1 changed file with 13 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,22 @@ public FlinkPulsarProducer(String serviceUrl,
String defaultTopicName,
SerializationSchema<IN> serializationSchema,
PulsarKeyExtractor<IN> keyExtractor) {
this(serviceUrl, defaultTopicName, serializationSchema, keyExtractor, null);
}

public FlinkPulsarProducer(String serviceUrl,
String defaultTopicName,
SerializationSchema<IN> serializationSchema,
PulsarKeyExtractor<IN> keyExtractor,
Producer<byte[]> producer) {
checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url cannot be blank");
checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName cannot be blank");
this.serviceUrl = serviceUrl;
this.defaultTopicName = defaultTopicName;
this.schema = checkNotNull(serializationSchema, "Serialization Schema not set");
this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
ClosureCleaner.ensureSerializable(serializationSchema);
this.producer = producer;
}

// ---------------------------------- Properties --------------------------
Expand Down Expand Up @@ -185,7 +194,10 @@ private Producer<byte[]> createProducer() throws Exception {
*/
@Override
public void open(Configuration parameters) throws Exception {
this.producer = createProducer();
if (producer == null) {
// If no custom producer was specified create a default one
this.producer = createProducer();
}

RuntimeContext ctx = getRuntimeContext();

Expand Down

0 comments on commit 19fd91c

Please sign in to comment.