Skip to content

Commit

Permalink
issue#3939 : Allow client authentication from pulsar-flink package (a…
Browse files Browse the repository at this point in the history
…pache#3949)

Problem:
========
pulsar-flink module (aka flink connector) internally uses pulsar-client. Though the pulsar client allows setting tokens in the client builder, the flink connector does not provide a way to pass authentication token to the pulsar client it uses internally.

Solution:
========
Accept authetication information as an input in pulsar-flink module. Pass this authentication information to pulsar-client.
  • Loading branch information
shiv4289 authored and merlimat committed Apr 2, 2019
1 parent 7b68376 commit 8adff85
Show file tree
Hide file tree
Showing 28 changed files with 174 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.avro.generated.NasaMission;
import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;

import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -63,7 +64,7 @@ public static void main(String[] args) throws Exception {
System.out.println("\tTopic:\t" + topic);

// create PulsarAvroOutputFormat instance
final OutputFormat<NasaMission> pulsarAvroOutputFormat = new PulsarAvroOutputFormat<>(serviceUrl, topic);
final OutputFormat<NasaMission> pulsarAvroOutputFormat = new PulsarAvroOutputFormat<>(serviceUrl, topic, new AuthenticationDisabled());

// create DataSet
DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.batch.connectors.pulsar.PulsarCsvOutputFormat;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;

import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -65,7 +66,7 @@ public static void main(String[] args) throws Exception {

// create PulsarCsvOutputFormat instance
final OutputFormat<Tuple4<Integer, String, Integer, Integer>> pulsarCsvOutputFormat =
new PulsarCsvOutputFormat<>(serviceUrl, topic);
new PulsarCsvOutputFormat<>(serviceUrl, topic, new AuthenticationDisabled());

// create DataSet
DataSet<Tuple4<Integer, String, Integer, Integer>> nasaMissionDS = env.fromCollection(nasaMissions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;

import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -62,7 +63,7 @@ public static void main(String[] args) throws Exception {
System.out.println("\tTopic:\t" + topic);

// create PulsarJsonOutputFormat instance
final OutputFormat<NasaMission> pulsarJsonOutputFormat = new PulsarJsonOutputFormat<>(serviceUrl, topic);
final OutputFormat<NasaMission> pulsarJsonOutputFormat = new PulsarJsonOutputFormat<>(serviceUrl, topic, new AuthenticationDisabled());

// create DataSet
DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat;
import org.apache.flink.util.Collector;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;

/**
* Implements a batch word-count program on Pulsar topic by writing Flink DataSet.
Expand Down Expand Up @@ -60,7 +61,7 @@ public static void main(String[] args) throws Exception {

// create PulsarOutputFormat instance
final OutputFormat pulsarOutputFormat =
new PulsarOutputFormat(serviceUrl, topic, wordWithCount -> wordWithCount.toString().getBytes());
new PulsarOutputFormat(serviceUrl, topic, new AuthenticationDisabled(), wordWithCount -> wordWithCount.toString().getBytes());

// create DataSet
DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarProducer;
import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;

/**
* Implements a streaming wordcount program on pulsar topics.
Expand Down Expand Up @@ -97,6 +98,7 @@ public static void main(String[] args) throws Exception {
wc.addSink(new FlinkPulsarProducer<>(
serviceUrl,
outputTopic,
new AuthenticationDisabled(),
wordWithCount -> wordWithCount.toString().getBytes(UTF_8),
wordWithCount -> wordWithCount.word
)).setParallelism(parallelism);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;

/**
* Implements a streaming wordcount program on pulsar topics.
Expand Down Expand Up @@ -107,7 +108,7 @@ public static void main(String[] args) throws Exception {
table.printSchema();
TableSink sink = null;
if (null != outputTopic) {
sink = new PulsarAvroTableSink(serviceUrl, outputTopic, ROUTING_KEY, WordWithCount.class);
sink = new PulsarAvroTableSink(serviceUrl, outputTopic, new AuthenticationDisabled(), ROUTING_KEY, WordWithCount.class);
} else {
// print the results with a csv file
sink = new CsvTableSink("./examples/file", "|");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;

/**
* Implements a streaming wordcount program on pulsar topics.
Expand Down Expand Up @@ -108,7 +109,7 @@ public static void main(String[] args) throws Exception {
table.printSchema();
TableSink sink = null;
if (null != outputTopic) {
sink = new PulsarJsonTableSink(serviceUrl, outputTopic, ROUTING_KEY);
sink = new PulsarJsonTableSink(serviceUrl, outputTopic, new AuthenticationDisabled(), ROUTING_KEY);
} else {
// print the results with a csv file
sink = new CsvTableSink("./examples/file", "|");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.avro.generated.NasaMission
import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled

/**
* Implements a batch Scala program on Pulsar topic by writing Flink DataSet as Avro.
Expand Down Expand Up @@ -59,7 +60,7 @@ object FlinkPulsarBatchAvroSinkScalaExample {

// create PulsarCsvOutputFormat instance
val pulsarAvroOutputFormat =
new PulsarAvroOutputFormat[NasaMission](serviceUrl, topic)
new PulsarAvroOutputFormat[NasaMission](serviceUrl, topic, new AuthenticationDisabled())

// create DataSet
val textDS = env.fromCollection(nasaMissions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple4
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.batch.connectors.pulsar.PulsarCsvOutputFormat
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled

/**
* Implements a batch Scala program on Pulsar topic by writing Flink DataSet as Csv.
Expand Down Expand Up @@ -65,7 +66,7 @@ object FlinkPulsarBatchCsvSinkScalaExample {

// create PulsarCsvOutputFormat instance
val pulsarCsvOutputFormat =
new PulsarCsvOutputFormat[NasaMission](serviceUrl, topic)
new PulsarCsvOutputFormat[NasaMission](serviceUrl, topic, new AuthenticationDisabled())

// create DataSet
val textDS = env.fromCollection(nasaMissions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat
import scala.beans.BeanProperty
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled

/**
* Implements a batch Scala program on Pulsar topic by writing Flink DataSet as Json.
Expand Down Expand Up @@ -66,7 +67,7 @@ object FlinkPulsarBatchJsonSinkScalaExample {
println("\tTopic:\t" + topic)

// create PulsarJsonOutputFormat instance
val pulsarJsonOutputFormat = new PulsarJsonOutputFormat[NasaMission](serviceUrl, topic)
val pulsarJsonOutputFormat = new PulsarJsonOutputFormat[NasaMission](serviceUrl, topic, new AuthenticationDisabled())

// create DataSet
val nasaMissionDS = env.fromCollection(nasaMissions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat
import org.apache.flink.util.Collector
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled

/**
* Data type for words with count.
Expand Down Expand Up @@ -63,7 +64,7 @@ object FlinkPulsarBatchSinkScalaExample {

// create PulsarOutputFormat instance
val pulsarOutputFormat =
new PulsarOutputFormat[WordWithCount](serviceUrl, topic, new SerializationSchema[WordWithCount] {
new PulsarOutputFormat[WordWithCount](serviceUrl, topic, new AuthenticationDisabled(), new SerializationSchema[WordWithCount] {
override def serialize(wordWithCount: WordWithCount): Array[Byte] = wordWithCount.toString.getBytes
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Authentication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,14 +47,16 @@ public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T> {

protected final String serviceUrl;
protected final String topicName;
private final Authentication authentication;
protected SerializationSchema<T> serializationSchema;

protected BasePulsarOutputFormat(final String serviceUrl, final String topicName) {
protected BasePulsarOutputFormat(final String serviceUrl, final String topicName, final Authentication authentication) {
Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl), "serviceUrl cannot be blank.");
Preconditions.checkArgument(StringUtils.isNotBlank(topicName), "topicName cannot be blank.");

this.serviceUrl = serviceUrl;
this.topicName = topicName;
this.authentication = authentication;

LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}", this.topicName);
}
Expand All @@ -65,7 +68,7 @@ public void configure(Configuration configuration) {

@Override
public void open(int taskNumber, int numTasks) throws IOException {
this.producer = getProducerInstance(serviceUrl, topicName);
this.producer = getProducerInstance(serviceUrl, topicName, authentication);

this.failureCallback = cause -> {
LOG.error("Error while sending record to Pulsar: " + cause.getMessage(), cause);
Expand All @@ -85,21 +88,23 @@ public void close() throws IOException {

}

private static Producer<byte[]> getProducerInstance(String serviceUrl, String topicName) throws PulsarClientException {
private static Producer<byte[]> getProducerInstance(String serviceUrl, String topicName, Authentication authentication)
throws PulsarClientException {
if(producer == null){
synchronized (PulsarOutputFormat.class) {
if(producer == null){
producer = Preconditions.checkNotNull(createPulsarProducer(serviceUrl, topicName),
producer = Preconditions.checkNotNull(createPulsarProducer(serviceUrl, topicName, authentication),
"Pulsar producer cannot be null.");
}
}
}
return producer;
}

private static Producer<byte[]> createPulsarProducer(String serviceUrl, String topicName) throws PulsarClientException {
private static Producer<byte[]> createPulsarProducer(String serviceUrl, String topicName, Authentication authentication)
throws PulsarClientException {
try {
PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build();
return client.newProducer().topic(topicName).create();
} catch (PulsarClientException e) {
LOG.error("Pulsar producer cannot be created.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.avro.specific.SpecificRecord;
import org.apache.flink.batch.connectors.pulsar.serialization.AvroSerializationSchema;
import org.apache.pulsar.client.api.Authentication;

/**
* Pulsar Avro Output Format to write Flink DataSets into a Pulsar topic in Avro format.
Expand All @@ -28,8 +29,8 @@ public class PulsarAvroOutputFormat<T extends SpecificRecord> extends BasePulsar

private static final long serialVersionUID = -6794070714728773530L;

public PulsarAvroOutputFormat(String serviceUrl, String topicName) {
super(serviceUrl, topicName);
public PulsarAvroOutputFormat(String serviceUrl, String topicName, Authentication authentication) {
super(serviceUrl, topicName, authentication);
this.serializationSchema = new AvroSerializationSchema();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.batch.connectors.pulsar.serialization.CsvSerializationSchema;
import org.apache.pulsar.client.api.Authentication;

/**
* Pulsar Csv Output Format to write Flink DataSets into a Pulsar topic in Csv format.
Expand All @@ -28,8 +29,8 @@ public class PulsarCsvOutputFormat<T extends Tuple> extends BasePulsarOutputForm

private static final long serialVersionUID = -4461671510903404196L;

public PulsarCsvOutputFormat(String serviceUrl, String topicName) {
super(serviceUrl, topicName);
public PulsarCsvOutputFormat(String serviceUrl, String topicName, Authentication authentication) {
super(serviceUrl, topicName, authentication);
this.serializationSchema = new CsvSerializationSchema<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.batch.connectors.pulsar;

import org.apache.flink.batch.connectors.pulsar.serialization.JsonSerializationSchema;
import org.apache.pulsar.client.api.Authentication;

/**
* Pulsar Json Output Format to write Flink DataSets into a Pulsar topic in Json format.
Expand All @@ -27,8 +28,8 @@ public class PulsarJsonOutputFormat<T> extends BasePulsarOutputFormat<T> {

private static final long serialVersionUID = 8499620770848461958L;

public PulsarJsonOutputFormat(String serviceUrl, String topicName) {
super(serviceUrl, topicName);
public PulsarJsonOutputFormat(String serviceUrl, String topicName, Authentication authentication) {
super(serviceUrl, topicName, authentication);
this.serializationSchema = new JsonSerializationSchema();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.Authentication;

/**
* Pulsar Output Format to write Flink DataSets into a Pulsar topic in user-defined format.
Expand All @@ -28,8 +29,8 @@ public class PulsarOutputFormat<T> extends BasePulsarOutputFormat<T> {

private static final long serialVersionUID = 2997027580167793000L;

public PulsarOutputFormat(String serviceUrl, String topicName, final SerializationSchema<T> serializationSchema) {
super(serviceUrl, topicName);
public PulsarOutputFormat(String serviceUrl, String topicName, Authentication authentication, final SerializationSchema<T> serializationSchema) {
super(serviceUrl, topicName, authentication);
Preconditions.checkNotNull(serializationSchema, "serializationSchema cannot be null.");
this.serializationSchema = serializationSchema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -64,6 +65,12 @@ public class FlinkPulsarProducer<IN>
*/
protected final String defaultTopicName;

/**
* Pulsar client will use this authentication information, if required.
*/
private final Authentication authentication;


/**
* (Serializable) SerializationSchema for turning objects used with Flink into.
* byte[] for Pulsar.
Expand Down Expand Up @@ -121,20 +128,23 @@ public class FlinkPulsarProducer<IN>

public FlinkPulsarProducer(String serviceUrl,
String defaultTopicName,
Authentication authentication,
SerializationSchema<IN> serializationSchema,
PulsarKeyExtractor<IN> keyExtractor) {
this(serviceUrl, defaultTopicName, serializationSchema, keyExtractor, null);
this(serviceUrl, defaultTopicName, authentication, serializationSchema, keyExtractor, null);
}

public FlinkPulsarProducer(String serviceUrl,
String defaultTopicName,
Authentication authentication,
SerializationSchema<IN> serializationSchema,
PulsarKeyExtractor<IN> keyExtractor,
Map<String, Object> producerConfig) {
checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url cannot be blank");
checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName cannot be blank");
this.serviceUrl = serviceUrl;
this.defaultTopicName = defaultTopicName;
this.authentication = authentication;
this.schema = checkNotNull(serializationSchema, "Serialization Schema not set");
this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
ClosureCleaner.ensureSerializable(serializationSchema);
Expand Down Expand Up @@ -190,7 +200,7 @@ private static final <T> PulsarKeyExtractor<T> getOrNullKeyExtractor(PulsarKeyEx
}

private Producer<byte[]> createProducer() throws Exception {
PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build();
ProducerBuilder<byte[]> producerBuilder = client.newProducer();
if (producerConfig != null) {
producerBuilder = producerBuilder.loadConf(producerConfig);
Expand Down
Loading

0 comments on commit 8adff85

Please sign in to comment.