Skip to content

Commit

Permalink
[issue 3954] [flink] Use (client/consumer/producer) pojo to pass conf…
Browse files Browse the repository at this point in the history
…ig to pulsar client (apache#4232)

* Flink client to accept all pulsar client conf

In this patch, we provide handles for flink connecotr to accept ClientConfigurationData, ProducerConfigurationData, ConsumerConfigurationData so flink client can:
1. accept all params of client, producer and consumer
2. Keep pace with pulsar-client

* Flink client to accept all pulsar client conf

Added test cases

* Removing commented code
  • Loading branch information
shiv4289 authored and jiazhai committed May 9, 2019
1 parent 5a2b10b commit 8c59fd3
Show file tree
Hide file tree
Showing 21 changed files with 642 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.io.Serializable;
import java.util.concurrent.TimeUnit;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NoArgsConstructor;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ServiceUrlProvider;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
Expand All @@ -33,6 +36,9 @@
* This is a simple holder of the client configuration values.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ClientConfigurationData implements Serializable, Cloneable {
private static final long serialVersionUID = 1L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.CryptoKeyReader;
Expand All @@ -42,6 +45,9 @@
import org.apache.pulsar.client.api.SubscriptionType;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
private static final long serialVersionUID = 1L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NoArgsConstructor;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.HashingScheme;
Expand All @@ -39,6 +42,9 @@
import lombok.Data;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ProducerConfigurationData implements Serializable, Cloneable {

private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

/**
Expand All @@ -45,20 +48,34 @@ public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T> {
private transient Function<Throwable, MessageId> failureCallback;
private static volatile Producer<byte[]> producer;

protected final String serviceUrl;
protected final String topicName;
private final Authentication authentication;
protected SerializationSchema<T> serializationSchema;

private ClientConfigurationData clientConf;
private ProducerConfigurationData producerConf;


protected BasePulsarOutputFormat(final String serviceUrl, final String topicName, final Authentication authentication) {
Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl), "serviceUrl cannot be blank.");
Preconditions.checkArgument(StringUtils.isNotBlank(topicName), "topicName cannot be blank.");

this.serviceUrl = serviceUrl;
this.topicName = topicName;
this.authentication = authentication;
clientConf = new ClientConfigurationData();
producerConf = new ProducerConfigurationData();

this.clientConf.setServiceUrl(serviceUrl);
this.clientConf.setAuthentication(authentication);
this.producerConf.setTopicName(topicName);

LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}", this.producerConf.getTopicName());
}

protected BasePulsarOutputFormat(ClientConfigurationData clientConf, ProducerConfigurationData producerConf) {
this.clientConf = Preconditions.checkNotNull(clientConf, "client config data should not be null");
this.producerConf = Preconditions.checkNotNull(producerConf, "producer config data should not be null");

Preconditions.checkArgument(StringUtils.isNotBlank(clientConf.getServiceUrl()), "serviceUrl cannot be blank.");
Preconditions.checkArgument(StringUtils.isNotBlank(producerConf.getTopicName()), "topicName cannot be blank.");

LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}", this.topicName);
LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}", this.producerConf.getTopicName());
}

@Override
Expand All @@ -68,7 +85,7 @@ public void configure(Configuration configuration) {

@Override
public void open(int taskNumber, int numTasks) throws IOException {
this.producer = getProducerInstance(serviceUrl, topicName, authentication);
this.producer = getProducerInstance();

this.failureCallback = cause -> {
LOG.error("Error while sending record to Pulsar: " + cause.getMessage(), cause);
Expand All @@ -88,27 +105,27 @@ public void close() throws IOException {

}

private static Producer<byte[]> getProducerInstance(String serviceUrl, String topicName, Authentication authentication)
private Producer<byte[]> getProducerInstance()
throws PulsarClientException {
if(producer == null){
synchronized (PulsarOutputFormat.class) {
if(producer == null){
producer = Preconditions.checkNotNull(createPulsarProducer(serviceUrl, topicName, authentication),
producer = Preconditions.checkNotNull(createPulsarProducer(),
"Pulsar producer cannot be null.");
}
}
}
return producer;
}

private static Producer<byte[]> createPulsarProducer(String serviceUrl, String topicName, Authentication authentication)
private Producer<byte[]> createPulsarProducer()
throws PulsarClientException {
try {
PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build();
return client.newProducer().topic(topicName).create();
} catch (PulsarClientException e) {
PulsarClientImpl client = new PulsarClientImpl(clientConf);
return client.createProducerAsync(producerConf).get();
} catch (PulsarClientException | InterruptedException | ExecutionException e) {
LOG.error("Pulsar producer cannot be created.", e);
throw e;
throw new PulsarClientException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.avro.specific.SpecificRecord;
import org.apache.flink.batch.connectors.pulsar.serialization.AvroSerializationSchema;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;

/**
* Pulsar Avro Output Format to write Flink DataSets into a Pulsar topic in Avro format.
Expand All @@ -34,4 +36,9 @@ public PulsarAvroOutputFormat(String serviceUrl, String topicName, Authenticatio
this.serializationSchema = new AvroSerializationSchema();
}

public PulsarAvroOutputFormat(ClientConfigurationData clientConfigurationData, ProducerConfigurationData producerConfigurationData) {
super(clientConfigurationData, producerConfigurationData);
this.serializationSchema = new AvroSerializationSchema();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.batch.connectors.pulsar.serialization.CsvSerializationSchema;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;

/**
* Pulsar Csv Output Format to write Flink DataSets into a Pulsar topic in Csv format.
Expand All @@ -34,4 +36,9 @@ public PulsarCsvOutputFormat(String serviceUrl, String topicName, Authentication
this.serializationSchema = new CsvSerializationSchema<>();
}

public PulsarCsvOutputFormat(ClientConfigurationData clientConfigurationData, ProducerConfigurationData producerConfigurationData) {
super(clientConfigurationData, producerConfigurationData);
this.serializationSchema = new CsvSerializationSchema<>();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.batch.connectors.pulsar.serialization.JsonSerializationSchema;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;

/**
* Pulsar Json Output Format to write Flink DataSets into a Pulsar topic in Json format.
Expand All @@ -32,4 +34,9 @@ public PulsarJsonOutputFormat(String serviceUrl, String topicName, Authenticatio
super(serviceUrl, topicName, authentication);
this.serializationSchema = new JsonSerializationSchema();
}

public PulsarJsonOutputFormat(ClientConfigurationData clientConfigurationData, ProducerConfigurationData producerConfigurationData) {
super(clientConfigurationData, producerConfigurationData);
this.serializationSchema = new JsonSerializationSchema();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;

/**
* Pulsar Output Format to write Flink DataSets into a Pulsar topic in user-defined format.
Expand All @@ -35,4 +37,10 @@ public PulsarOutputFormat(String serviceUrl, String topicName, Authentication au
this.serializationSchema = serializationSchema;
}

public PulsarOutputFormat(ClientConfigurationData clientConfigurationData, ProducerConfigurationData producerConfigurationData) {
super(clientConfigurationData, producerConfigurationData);
Preconditions.checkNotNull(serializationSchema, "serializationSchema cannot be null.");
this.serializationSchema = serializationSchema;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@
import org.apache.flink.util.SerializableObject;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -55,21 +55,8 @@ public class FlinkPulsarProducer<IN>

private static final Logger LOG = LoggerFactory.getLogger(FlinkPulsarProducer.class);

/**
* The pulsar service url.
*/
protected final String serviceUrl;

/**
* The name of the default topic this producer is writing data to.
*/
protected final String defaultTopicName;

/**
* Pulsar client will use this authentication information, if required.
*/
private final Authentication authentication;

private ClientConfigurationData clientConf;
private ProducerConfigurationData producerConf;

/**
* (Serializable) SerializationSchema for turning objects used with Flink into.
Expand All @@ -82,11 +69,6 @@ public class FlinkPulsarProducer<IN>
*/
protected final PulsarKeyExtractor<IN> flinkPulsarKeyExtractor;

/**
* {@link Producer} configuration map (will be materialized as a {@link ProducerConfigurationData} instance)
*/
protected final Map<String, Object> producerConfig;

/**
* Produce Mode.
*/
Expand Down Expand Up @@ -131,24 +113,30 @@ public FlinkPulsarProducer(String serviceUrl,
Authentication authentication,
SerializationSchema<IN> serializationSchema,
PulsarKeyExtractor<IN> keyExtractor) {
this(serviceUrl, defaultTopicName, authentication, serializationSchema, keyExtractor, null);
checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url cannot be blank");
checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName cannot be blank");
checkNotNull(authentication, "auth cannot be null, set disabled for no auth");

clientConf = new ClientConfigurationData();
producerConf = new ProducerConfigurationData();

this.clientConf.setServiceUrl(serviceUrl);
this.clientConf.setAuthentication(authentication);
this.producerConf.setTopicName(defaultTopicName);
this.schema = checkNotNull(serializationSchema, "Serialization Schema not set");
this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
ClosureCleaner.ensureSerializable(serializationSchema);
}

public FlinkPulsarProducer(String serviceUrl,
String defaultTopicName,
Authentication authentication,
public FlinkPulsarProducer(ClientConfigurationData clientConfigurationData,
ProducerConfigurationData producerConfigurationData,
SerializationSchema<IN> serializationSchema,
PulsarKeyExtractor<IN> keyExtractor,
Map<String, Object> producerConfig) {
checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url cannot be blank");
checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName cannot be blank");
this.serviceUrl = serviceUrl;
this.defaultTopicName = defaultTopicName;
this.authentication = authentication;
PulsarKeyExtractor<IN> keyExtractor) {
this.clientConf = checkNotNull(clientConfigurationData, "client conf can not be null");
this.producerConf = checkNotNull(producerConfigurationData, "producer conf can not be null");
this.schema = checkNotNull(serializationSchema, "Serialization Schema not set");
this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
ClosureCleaner.ensureSerializable(serializationSchema);
this.producerConfig = producerConfig;
}

// ---------------------------------- Properties --------------------------
Expand Down Expand Up @@ -200,12 +188,8 @@ private static final <T> PulsarKeyExtractor<T> getOrNullKeyExtractor(PulsarKeyEx
}

private Producer<byte[]> createProducer() throws Exception {
PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build();
ProducerBuilder<byte[]> producerBuilder = client.newProducer();
if (producerConfig != null) {
producerBuilder = producerBuilder.loadConf(producerConfig);
}
return producerBuilder.topic(defaultTopicName).create();
PulsarClientImpl client = new PulsarClientImpl(clientConf);
return client.createProducerAsync(producerConf).get();
}

/**
Expand All @@ -221,7 +205,7 @@ public void open(Configuration parameters) throws Exception {
RuntimeContext ctx = getRuntimeContext();

LOG.info("Starting FlinkPulsarProducer ({}/{}) to produce into pulsar topic {}",
ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), defaultTopicName);
ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), producerConf.getTopicName());

if (flushOnCheckpoint && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
Expand Down
Loading

0 comments on commit 8c59fd3

Please sign in to comment.