diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java index db77bd17e4ecb..6c80f43b407ce 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java @@ -21,6 +21,9 @@ import java.io.Serializable; import java.util.concurrent.TimeUnit; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.NoArgsConstructor; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.ServiceUrlProvider; import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; @@ -33,6 +36,9 @@ * This is a simple holder of the client configuration values. */ @Data +@Builder +@NoArgsConstructor +@AllArgsConstructor public class ClientConfigurationData implements Serializable, Cloneable { private static final long serialVersionUID = 1L; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index aab52340898df..fd4b719994991 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -31,7 +31,10 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.ConsumerEventListener; import org.apache.pulsar.client.api.CryptoKeyReader; @@ -42,6 +45,9 @@ import org.apache.pulsar.client.api.SubscriptionType; @Data +@Builder +@NoArgsConstructor +@AllArgsConstructor public class ConsumerConfigurationData implements Serializable, Cloneable { private static final long serialVersionUID = 1L; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java index 409962b319370..32f5e61b1d51b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java @@ -25,6 +25,9 @@ import java.util.TreeSet; import java.util.concurrent.TimeUnit; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.NoArgsConstructor; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.HashingScheme; @@ -39,6 +42,9 @@ import lombok.Data; @Data +@Builder +@NoArgsConstructor +@AllArgsConstructor public class ProducerConfigurationData implements Serializable, Cloneable { private static final long serialVersionUID = 1L; diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java index 644c8e9726216..438af59662ddd 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java +++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java @@ -25,13 +25,16 @@ import org.apache.flink.util.Preconditions; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.concurrent.ExecutionException; import java.util.function.Function; /** @@ -45,20 +48,34 @@ public abstract class BasePulsarOutputFormat extends RichOutputFormat { private transient Function failureCallback; private static volatile Producer producer; - protected final String serviceUrl; - protected final String topicName; - private final Authentication authentication; protected SerializationSchema serializationSchema; + private ClientConfigurationData clientConf; + private ProducerConfigurationData producerConf; + + protected BasePulsarOutputFormat(final String serviceUrl, final String topicName, final Authentication authentication) { Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl), "serviceUrl cannot be blank."); Preconditions.checkArgument(StringUtils.isNotBlank(topicName), "topicName cannot be blank."); - this.serviceUrl = serviceUrl; - this.topicName = topicName; - this.authentication = authentication; + clientConf = new ClientConfigurationData(); + producerConf = new ProducerConfigurationData(); + + this.clientConf.setServiceUrl(serviceUrl); + this.clientConf.setAuthentication(authentication); + this.producerConf.setTopicName(topicName); + + LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}", this.producerConf.getTopicName()); + } + + protected BasePulsarOutputFormat(ClientConfigurationData clientConf, ProducerConfigurationData producerConf) { + this.clientConf = Preconditions.checkNotNull(clientConf, "client config data should not be null"); + this.producerConf = Preconditions.checkNotNull(producerConf, "producer config data should not be null"); + + Preconditions.checkArgument(StringUtils.isNotBlank(clientConf.getServiceUrl()), "serviceUrl cannot be blank."); + Preconditions.checkArgument(StringUtils.isNotBlank(producerConf.getTopicName()), "topicName cannot be blank."); - LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}", this.topicName); + LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}", this.producerConf.getTopicName()); } @Override @@ -68,7 +85,7 @@ public void configure(Configuration configuration) { @Override public void open(int taskNumber, int numTasks) throws IOException { - this.producer = getProducerInstance(serviceUrl, topicName, authentication); + this.producer = getProducerInstance(); this.failureCallback = cause -> { LOG.error("Error while sending record to Pulsar: " + cause.getMessage(), cause); @@ -88,12 +105,12 @@ public void close() throws IOException { } - private static Producer getProducerInstance(String serviceUrl, String topicName, Authentication authentication) + private Producer getProducerInstance() throws PulsarClientException { if(producer == null){ synchronized (PulsarOutputFormat.class) { if(producer == null){ - producer = Preconditions.checkNotNull(createPulsarProducer(serviceUrl, topicName, authentication), + producer = Preconditions.checkNotNull(createPulsarProducer(), "Pulsar producer cannot be null."); } } @@ -101,14 +118,14 @@ private static Producer getProducerInstance(String serviceUrl, String to return producer; } - private static Producer createPulsarProducer(String serviceUrl, String topicName, Authentication authentication) + private Producer createPulsarProducer() throws PulsarClientException { try { - PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build(); - return client.newProducer().topic(topicName).create(); - } catch (PulsarClientException e) { + PulsarClientImpl client = new PulsarClientImpl(clientConf); + return client.createProducerAsync(producerConf).get(); + } catch (PulsarClientException | InterruptedException | ExecutionException e) { LOG.error("Pulsar producer cannot be created.", e); - throw e; + throw new PulsarClientException(e); } } } diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java index 52484ef1ccf0f..6e11e4c371155 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java +++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java @@ -21,6 +21,8 @@ import org.apache.avro.specific.SpecificRecord; import org.apache.flink.batch.connectors.pulsar.serialization.AvroSerializationSchema; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; /** * Pulsar Avro Output Format to write Flink DataSets into a Pulsar topic in Avro format. @@ -34,4 +36,9 @@ public PulsarAvroOutputFormat(String serviceUrl, String topicName, Authenticatio this.serializationSchema = new AvroSerializationSchema(); } + public PulsarAvroOutputFormat(ClientConfigurationData clientConfigurationData, ProducerConfigurationData producerConfigurationData) { + super(clientConfigurationData, producerConfigurationData); + this.serializationSchema = new AvroSerializationSchema(); + } + } diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java index d36a260fefbc9..8c3b441c4fee5 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java +++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java @@ -21,6 +21,8 @@ import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.batch.connectors.pulsar.serialization.CsvSerializationSchema; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; /** * Pulsar Csv Output Format to write Flink DataSets into a Pulsar topic in Csv format. @@ -34,4 +36,9 @@ public PulsarCsvOutputFormat(String serviceUrl, String topicName, Authentication this.serializationSchema = new CsvSerializationSchema<>(); } + public PulsarCsvOutputFormat(ClientConfigurationData clientConfigurationData, ProducerConfigurationData producerConfigurationData) { + super(clientConfigurationData, producerConfigurationData); + this.serializationSchema = new CsvSerializationSchema<>(); + } + } diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java index 96d7a01e8ba84..093e5c9503bbb 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java +++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java @@ -20,6 +20,8 @@ import org.apache.flink.batch.connectors.pulsar.serialization.JsonSerializationSchema; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; /** * Pulsar Json Output Format to write Flink DataSets into a Pulsar topic in Json format. @@ -32,4 +34,9 @@ public PulsarJsonOutputFormat(String serviceUrl, String topicName, Authenticatio super(serviceUrl, topicName, authentication); this.serializationSchema = new JsonSerializationSchema(); } + + public PulsarJsonOutputFormat(ClientConfigurationData clientConfigurationData, ProducerConfigurationData producerConfigurationData) { + super(clientConfigurationData, producerConfigurationData); + this.serializationSchema = new JsonSerializationSchema(); + } } diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java index 393faafa4905e..c2fb9847f415f 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java +++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java @@ -21,6 +21,8 @@ import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.util.Preconditions; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; /** * Pulsar Output Format to write Flink DataSets into a Pulsar topic in user-defined format. @@ -35,4 +37,10 @@ public PulsarOutputFormat(String serviceUrl, String topicName, Authentication au this.serializationSchema = serializationSchema; } + public PulsarOutputFormat(ClientConfigurationData clientConfigurationData, ProducerConfigurationData producerConfigurationData) { + super(clientConfigurationData, producerConfigurationData); + Preconditions.checkNotNull(serializationSchema, "serializationSchema cannot be null."); + this.serializationSchema = serializationSchema; + } + } diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java index 55eb61936c492..29cfe0ea5cdd9 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java +++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java @@ -38,10 +38,10 @@ import org.apache.flink.util.SerializableObject; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerBuilder; -import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,21 +55,8 @@ public class FlinkPulsarProducer private static final Logger LOG = LoggerFactory.getLogger(FlinkPulsarProducer.class); - /** - * The pulsar service url. - */ - protected final String serviceUrl; - - /** - * The name of the default topic this producer is writing data to. - */ - protected final String defaultTopicName; - - /** - * Pulsar client will use this authentication information, if required. - */ - private final Authentication authentication; - + private ClientConfigurationData clientConf; + private ProducerConfigurationData producerConf; /** * (Serializable) SerializationSchema for turning objects used with Flink into. @@ -82,11 +69,6 @@ public class FlinkPulsarProducer */ protected final PulsarKeyExtractor flinkPulsarKeyExtractor; - /** - * {@link Producer} configuration map (will be materialized as a {@link ProducerConfigurationData} instance) - */ - protected final Map producerConfig; - /** * Produce Mode. */ @@ -131,24 +113,30 @@ public FlinkPulsarProducer(String serviceUrl, Authentication authentication, SerializationSchema serializationSchema, PulsarKeyExtractor keyExtractor) { - this(serviceUrl, defaultTopicName, authentication, serializationSchema, keyExtractor, null); + checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url cannot be blank"); + checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName cannot be blank"); + checkNotNull(authentication, "auth cannot be null, set disabled for no auth"); + + clientConf = new ClientConfigurationData(); + producerConf = new ProducerConfigurationData(); + + this.clientConf.setServiceUrl(serviceUrl); + this.clientConf.setAuthentication(authentication); + this.producerConf.setTopicName(defaultTopicName); + this.schema = checkNotNull(serializationSchema, "Serialization Schema not set"); + this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor); + ClosureCleaner.ensureSerializable(serializationSchema); } - public FlinkPulsarProducer(String serviceUrl, - String defaultTopicName, - Authentication authentication, + public FlinkPulsarProducer(ClientConfigurationData clientConfigurationData, + ProducerConfigurationData producerConfigurationData, SerializationSchema serializationSchema, - PulsarKeyExtractor keyExtractor, - Map producerConfig) { - checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url cannot be blank"); - checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName cannot be blank"); - this.serviceUrl = serviceUrl; - this.defaultTopicName = defaultTopicName; - this.authentication = authentication; + PulsarKeyExtractor keyExtractor) { + this.clientConf = checkNotNull(clientConfigurationData, "client conf can not be null"); + this.producerConf = checkNotNull(producerConfigurationData, "producer conf can not be null"); this.schema = checkNotNull(serializationSchema, "Serialization Schema not set"); this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor); ClosureCleaner.ensureSerializable(serializationSchema); - this.producerConfig = producerConfig; } // ---------------------------------- Properties -------------------------- @@ -200,12 +188,8 @@ private static final PulsarKeyExtractor getOrNullKeyExtractor(PulsarKeyEx } private Producer createProducer() throws Exception { - PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build(); - ProducerBuilder producerBuilder = client.newProducer(); - if (producerConfig != null) { - producerBuilder = producerBuilder.loadConf(producerConfig); - } - return producerBuilder.topic(defaultTopicName).create(); + PulsarClientImpl client = new PulsarClientImpl(clientConf); + return client.createProducerAsync(producerConf).get(); } /** @@ -221,7 +205,7 @@ public void open(Configuration parameters) throws Exception { RuntimeContext ctx = getRuntimeContext(); LOG.info("Starting FlinkPulsarProducer ({}/{}) to produce into pulsar topic {}", - ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), defaultTopicName); + ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), producerConf.getTopicName()); if (flushOnCheckpoint && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) { LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing."); diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java index 20999fd4478f1..d4602d959bc11 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java +++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java @@ -27,6 +27,7 @@ import org.apache.avro.Schema; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificRecord; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; @@ -37,15 +38,16 @@ import org.apache.flink.table.sinks.TableSink; import org.apache.flink.types.Row; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; /** * An append-only table sink to emit a streaming table as a Pulsar stream that serializes data in Avro format. */ public class PulsarAvroTableSink implements AppendStreamTableSink { - protected final String serviceUrl; - protected final String topic; - protected final Authentication authentication; + protected ClientConfigurationData clientConfigurationData; + protected ProducerConfigurationData producerConfigurationData; protected final String routingKeyFieldName; protected SerializationSchema serializationSchema; protected String[] fieldNames; @@ -66,9 +68,31 @@ public PulsarAvroTableSink( Authentication authentication, String routingKeyFieldName, Class recordClazz) { - this.serviceUrl = checkNotNull(serviceUrl, "Service url not set"); - this.topic = checkNotNull(topic, "Topic is null"); - this.authentication = checkNotNull(authentication, "authentication is null, set new AuthenticationDisabled() instead"); + checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url not set"); + checkArgument(StringUtils.isNotBlank(topic), "Topic is null"); + checkNotNull(authentication, "authentication is null, set new AuthenticationDisabled() instead"); + + clientConfigurationData = new ClientConfigurationData(); + producerConfigurationData = new ProducerConfigurationData(); + + clientConfigurationData.setServiceUrl(serviceUrl); + clientConfigurationData.setAuthentication(authentication); + producerConfigurationData.setTopicName(topic); + this.routingKeyFieldName = routingKeyFieldName; + this.recordClazz = recordClazz; + } + + public PulsarAvroTableSink( + ClientConfigurationData clientConfigurationData, + ProducerConfigurationData producerConfigurationData, + String routingKeyFieldName, + Class recordClazz) { + this.clientConfigurationData = checkNotNull(clientConfigurationData, "client config can not be null"); + this.producerConfigurationData = checkNotNull(producerConfigurationData, "producer config can not be null"); + + checkArgument(StringUtils.isNotBlank(clientConfigurationData.getServiceUrl()), "Service url not set"); + checkArgument(StringUtils.isNotBlank(producerConfigurationData.getTopicName()), "Topic is null"); + this.routingKeyFieldName = routingKeyFieldName; this.recordClazz = recordClazz; } @@ -79,9 +103,8 @@ public PulsarAvroTableSink( protected FlinkPulsarProducer createFlinkPulsarProducer() { serializationSchema = new AvroRowSerializationSchema(recordClazz); return new FlinkPulsarProducer( - serviceUrl, - topic, - authentication, + clientConfigurationData, + producerConfigurationData, serializationSchema, keyExtractor); } @@ -114,7 +137,8 @@ public TypeInformation[] getFieldTypes() { @Override public TableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { - PulsarAvroTableSink sink = new PulsarAvroTableSink(serviceUrl, topic, authentication, routingKeyFieldName, recordClazz); + PulsarAvroTableSink sink = new PulsarAvroTableSink( + clientConfigurationData, producerConfigurationData, routingKeyFieldName, recordClazz); sink.fieldNames = checkNotNull(fieldNames, "Field names are null"); sink.fieldTypes = checkNotNull(fieldTypes, "Field types are null"); diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java index 046c7d3624a00..642d69d6cbeef 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java +++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java @@ -22,19 +22,13 @@ import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; -import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.util.IOUtils; - -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.SubscriptionInitialPosition; -import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +38,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.regex.Pattern; /** * Pulsar source (consumer) which receives messages from a topic and acknowledges messages. @@ -58,11 +51,10 @@ class PulsarConsumerSource extends MessageAcknowledgingSourceBase topicNames; - private final Authentication authentication; - private final Pattern topicsPattern; - private final String subscriptionName; + + private ClientConfigurationData clientConfigurationData; + private ConsumerConfigurationData consumerConfigurationData; + private final DeserializationSchema deserializer; private PulsarClient client; @@ -72,20 +64,19 @@ class PulsarConsumerSource extends MessageAcknowledgingSourceBase builder) { super(MessageId.class); - this.serviceUrl = builder.serviceUrl; - this.authentication = builder.authentication; - this.topicNames = builder.topicNames; - this.topicsPattern = builder.topicsPattern; + + clientConfigurationData = new ClientConfigurationData(); + consumerConfigurationData = new ConsumerConfigurationData<>(); + + this.clientConfigurationData = builder.clientConfigurationData; + this.consumerConfigurationData = builder.consumerConfigurationData; this.deserializer = builder.deserializationSchema; - this.subscriptionName = builder.subscriptionName; this.acknowledgementBatchSize = builder.acknowledgementBatchSize; - this.initialPosition = builder.initialPosition; } @Override @@ -195,26 +186,10 @@ boolean isCheckpointingEnabled() { } PulsarClient createClient() throws PulsarClientException { - return PulsarClient.builder() - .serviceUrl(serviceUrl) - .authentication(authentication) - .build(); + return new PulsarClientImpl(clientConfigurationData); } Consumer createConsumer(PulsarClient client) throws PulsarClientException { - if (topicsPattern != null) { - return client.newConsumer().topicsPattern(topicsPattern) - .subscriptionName(subscriptionName) - .subscriptionType(SubscriptionType.Failover) - .subscriptionInitialPosition(initialPosition) - .subscribe(); - } else { - return client.newConsumer() - .topics(Lists.newArrayList(topicNames)) - .subscriptionName(subscriptionName) - .subscriptionType(SubscriptionType.Failover) - .subscriptionInitialPosition(initialPosition) - .subscribe(); - } + return ((PulsarClientImpl)client).subscribeAsync(consumerConfigurationData).join(); } } diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java index c37250d5d581f..b6f82a5fd8d10 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java +++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java @@ -23,6 +23,8 @@ import org.apache.flink.formats.json.JsonRowSerializationSchema; import org.apache.flink.types.Row; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; /** * Base class for {@link PulsarTableSink} that serializes data in JSON format. @@ -45,6 +47,13 @@ public PulsarJsonTableSink( super(serviceUrl, topic, authentication, routingKeyFieldName); } + public PulsarJsonTableSink( + ClientConfigurationData clientConfigurationData, + ProducerConfigurationData producerConfigurationData, + String routingKeyFieldName) { + super(clientConfigurationData, producerConfigurationData, routingKeyFieldName); + } + @Override protected SerializationSchema createSerializationSchema(RowTypeInfo rowSchema) { return new JsonRowSerializationSchema(rowSchema); @@ -53,9 +62,8 @@ protected SerializationSchema createSerializationSchema(RowTypeInfo rowSche @Override protected PulsarTableSink createSink() { return new PulsarJsonTableSink( - serviceUrl, - topic, - authentication, + clientConfigurationData, + producerConfigurationData, routingKeyFieldName); } } diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java index 4ca8361d27a11..4521add458b90 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java +++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java @@ -27,10 +27,11 @@ import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import java.util.Arrays; import java.util.List; -import java.util.Set; import java.util.TreeSet; import java.util.regex.Pattern; import java.util.Map; @@ -44,18 +45,24 @@ public class PulsarSourceBuilder { private static final String SERVICE_URL = "pulsar://localhost:6650"; private static final long ACKNOWLEDGEMENT_BATCH_SIZE = 100; private static final long MAX_ACKNOWLEDGEMENT_BATCH_SIZE = 1000; + private static final String SUBSCRIPTION_NAME = "flink-sub"; final DeserializationSchema deserializationSchema; - String serviceUrl = SERVICE_URL; - final Set topicNames = new TreeSet<>(); - Authentication authentication; - Pattern topicsPattern; - String subscriptionName = "flink-sub"; + + ClientConfigurationData clientConfigurationData; + ConsumerConfigurationData consumerConfigurationData; + long acknowledgementBatchSize = ACKNOWLEDGEMENT_BATCH_SIZE; - SubscriptionInitialPosition initialPosition = SubscriptionInitialPosition.Latest; private PulsarSourceBuilder(DeserializationSchema deserializationSchema) { this.deserializationSchema = deserializationSchema; + + clientConfigurationData = new ClientConfigurationData(); + consumerConfigurationData = new ConsumerConfigurationData<>(); + clientConfigurationData.setServiceUrl(SERVICE_URL); + consumerConfigurationData.setTopicNames(new TreeSet<>()); + consumerConfigurationData.setSubscriptionName(SUBSCRIPTION_NAME); + consumerConfigurationData.setSubscriptionInitialPosition(SubscriptionInitialPosition.Latest); } /** @@ -66,7 +73,7 @@ private PulsarSourceBuilder(DeserializationSchema deserializationSchema) { */ public PulsarSourceBuilder serviceUrl(String serviceUrl) { Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl), "serviceUrl cannot be blank"); - this.serviceUrl = serviceUrl; + this.clientConfigurationData.setServiceUrl(serviceUrl); return this; } @@ -86,7 +93,7 @@ public PulsarSourceBuilder topic(String... topics) { for (String topic : topics) { Preconditions.checkArgument(StringUtils.isNotBlank(topic), "topicNames cannot have blank topic"); } - this.topicNames.addAll(Arrays.asList(topics)); + this.consumerConfigurationData.getTopicNames().addAll(Arrays.asList(topics)); return this; } @@ -104,7 +111,7 @@ public PulsarSourceBuilder topics(List topics) { Preconditions.checkArgument(topics != null && !topics.isEmpty(), "topics cannot be blank"); topics.forEach(topicName -> Preconditions.checkArgument(StringUtils.isNotBlank(topicName), "topicNames cannot have blank topic")); - this.topicNames.addAll(topics); + this.consumerConfigurationData.getTopicNames().addAll(topics); return this; } @@ -120,8 +127,8 @@ public PulsarSourceBuilder topics(List topics) { */ public PulsarSourceBuilder topicsPattern(Pattern topicsPattern) { Preconditions.checkArgument(topicsPattern != null, "Param topicsPattern cannot be null"); - Preconditions.checkArgument(this.topicsPattern == null, "Pattern has already been set."); - this.topicsPattern = topicsPattern; + Preconditions.checkArgument(this.consumerConfigurationData.getTopicsPattern() == null, "Pattern has already been set."); + this.consumerConfigurationData.setTopicsPattern(topicsPattern); return this; } @@ -137,8 +144,8 @@ public PulsarSourceBuilder topicsPattern(Pattern topicsPattern) { */ public PulsarSourceBuilder topicsPatternString(String topicsPattern) { Preconditions.checkArgument(StringUtils.isNotBlank(topicsPattern), "Topics pattern string cannot be blank"); - Preconditions.checkArgument(this.topicsPattern == null, "Pattern has already been set."); - this.topicsPattern = Pattern.compile(topicsPattern); + Preconditions.checkArgument(this.consumerConfigurationData.getTopicsPattern() == null, "Pattern has already been set."); + this.consumerConfigurationData.setTopicsPattern(Pattern.compile(topicsPattern)); return this; } @@ -151,7 +158,7 @@ public PulsarSourceBuilder topicsPatternString(String topicsPattern) { public PulsarSourceBuilder subscriptionName(String subscriptionName) { Preconditions.checkArgument(StringUtils.isNotBlank(subscriptionName), "subscriptionName cannot be blank"); - this.subscriptionName = subscriptionName; + this.consumerConfigurationData.setSubscriptionName(subscriptionName); return this; } @@ -163,7 +170,7 @@ public PulsarSourceBuilder subscriptionName(String subscriptionName) { */ public PulsarSourceBuilder subscriptionInitialPosition(SubscriptionInitialPosition initialPosition) { Preconditions.checkNotNull(initialPosition,"subscription initial position cannot be null"); - this.initialPosition = initialPosition; + this.consumerConfigurationData.setSubscriptionInitialPosition(initialPosition); return this; } @@ -191,7 +198,7 @@ public PulsarSourceBuilder acknowledgementBatchSize(long size) { public PulsarSourceBuilder authentication(Authentication authentication) { Preconditions.checkArgument(authentication != null, "authentication instance can not be null, use new AuthenticationDisabled() to disable authentication"); - this.authentication = authentication; + this.clientConfigurationData.setAuthentication(authentication); return this; } @@ -212,7 +219,7 @@ public PulsarSourceBuilder authentication(String authPluginClassName, String "Authentication-Plugin class name can not be blank"); Preconditions.checkArgument(StringUtils.isNotBlank(authParamsString), "Authentication-Plugin parameters can not be blank"); - this.authentication = AuthenticationFactory.create(authPluginClassName, authParamsString); + this.clientConfigurationData.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParamsString)); return this; } @@ -234,15 +241,42 @@ public PulsarSourceBuilder authentication(String authPluginClassName, Map pulsarAllClientConf(ClientConfigurationData clientConfigurationData) { + Preconditions.checkNotNull(clientConfigurationData, "client conf should not be null"); + this.clientConfigurationData = clientConfigurationData; return this; } + /** + * + * @param consumerConfigurationData All consumer conf wrapped in a POJO + * @return this builder + */ + public PulsarSourceBuilder pulsarAllConsumerConf(ConsumerConfigurationData consumerConfigurationData) { + Preconditions.checkNotNull(consumerConfigurationData, "consumer conf should not be null"); + this.consumerConfigurationData = consumerConfigurationData; + return this; + } + + public SourceFunction build() { - Preconditions.checkNotNull(serviceUrl, "a service url is required"); - Preconditions.checkArgument((topicNames != null && !topicNames.isEmpty()) || topicsPattern != null, + Preconditions.checkArgument(StringUtils.isNotBlank(this.clientConfigurationData.getServiceUrl()), + "a service url is required"); + Preconditions.checkArgument((this.consumerConfigurationData.getTopicNames() != null && + !this.consumerConfigurationData.getTopicNames().isEmpty()) || + this.consumerConfigurationData.getTopicsPattern() != null, "At least one topic or topics pattern is required"); - Preconditions.checkNotNull(subscriptionName, "a subscription name is required"); + Preconditions.checkArgument(StringUtils.isNotBlank(this.consumerConfigurationData.getSubscriptionName()), + "a subscription name is required"); return new PulsarConsumerSource<>(this); } diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java index 5d20a1d0deaeb..274084ec90263 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java +++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java @@ -34,15 +34,16 @@ import org.apache.flink.table.sinks.TableSink; import org.apache.flink.types.Row; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; /** * An append-only table sink to emit a streaming table as a Pulsar stream. */ public abstract class PulsarTableSink implements AppendStreamTableSink { - protected final String serviceUrl; - protected final String topic; - protected Authentication authentication; + protected ClientConfigurationData clientConfigurationData = new ClientConfigurationData(); + protected ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData(); protected SerializationSchema serializationSchema; protected PulsarKeyExtractor keyExtractor; protected String[] fieldNames; @@ -54,9 +55,20 @@ public PulsarTableSink( String topic, Authentication authentication, String routingKeyFieldName) { - this.serviceUrl = checkNotNull(serviceUrl, "Service url not set"); - this.topic = checkNotNull(topic, "Topic is null"); - this.authentication = checkNotNull(authentication, "authentication is null, set new AuthenticationDisabled() instead"); + checkNotNull(serviceUrl, "Service url not set"); + checkNotNull(topic, "Topic is null"); + this.clientConfigurationData.setServiceUrl(serviceUrl); + this.clientConfigurationData.setAuthentication(authentication); + this.producerConfigurationData.setTopicName(topic);; + this.routingKeyFieldName = routingKeyFieldName; + } + + public PulsarTableSink( + ClientConfigurationData clientConfigurationData, + ProducerConfigurationData producerConfigurationData, + String routingKeyFieldName) { + this.clientConfigurationData = checkNotNull(clientConfigurationData, "client config is null"); + this.producerConfigurationData = checkNotNull(producerConfigurationData, "producer config is null"); this.routingKeyFieldName = routingKeyFieldName; } @@ -80,9 +92,8 @@ public PulsarTableSink( */ protected FlinkPulsarProducer createFlinkPulsarProducer() { return new FlinkPulsarProducer( - serviceUrl, - topic, - authentication, + clientConfigurationData, + producerConfigurationData, serializationSchema, keyExtractor); } diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java index bedcbda99cdf9..eb667b0af6c5b 100644 --- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java +++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java @@ -18,7 +18,10 @@ */ package org.apache.flink.batch.connectors.pulsar; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.testng.annotations.Test; import static org.testng.Assert.assertNotNull; @@ -55,4 +58,70 @@ public void testPulsarAvroOutputFormatConstructor() { new PulsarAvroOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled()); assertNotNull(pulsarAvroOutputFormat); } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testPulsarAvroOutputFormatConstructorV2WhenServiceUrlIsNull() { + ClientConfigurationData clientConf = ClientConfigurationData.builder() + .serviceUrl(null) + .build(); + + ProducerConfigurationData producerConf = ProducerConfigurationData.builder() + .topicName("testTopic") + .build(); + + new PulsarAvroOutputFormat(clientConf, producerConf); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testPulsarAvroOutputFormatConstructorV2WhenTopicNameIsNull() { + ClientConfigurationData clientConf = ClientConfigurationData.builder() + .serviceUrl("testServiceUrl") + .build(); + + ProducerConfigurationData producerConf = ProducerConfigurationData.builder() + .topicName(null) + .build(); + + new PulsarAvroOutputFormat(clientConf, producerConf); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testPulsarAvroOutputFormatConstructorV2WhenTopicNameIsBlank() { + ClientConfigurationData clientConf = ClientConfigurationData.builder() + .serviceUrl("testServiceUrl") + .build(); + + ProducerConfigurationData producerConf = ProducerConfigurationData.builder() + .topicName(StringUtils.EMPTY) + .build(); + + new PulsarAvroOutputFormat(clientConf, producerConf); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testPulsarAvroOutputFormatConstructorV2WhenServiceUrlIsBlank() { + ClientConfigurationData clientConf = ClientConfigurationData.builder() + .serviceUrl(StringUtils.EMPTY) + .build(); + + ProducerConfigurationData producerConf = ProducerConfigurationData.builder() + .topicName(StringUtils.EMPTY) + .build(); + + new PulsarAvroOutputFormat(clientConf, producerConf); + } + + @Test + public void testPulsarAvroOutputFormatConstructorV2() { + ClientConfigurationData clientConf = ClientConfigurationData.builder() + .serviceUrl("testServiceUrl") + .build(); + + ProducerConfigurationData producerConf = ProducerConfigurationData.builder() + .topicName("testTopic") + .build(); + + PulsarAvroOutputFormat pulsarAvroOutputFormat = new PulsarAvroOutputFormat(clientConf, producerConf); + assertNotNull(pulsarAvroOutputFormat); + } } diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java index caccb6b90fddd..a374053e7d8da 100644 --- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java +++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java @@ -18,7 +18,10 @@ */ package org.apache.flink.batch.connectors.pulsar; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.testng.annotations.Test; import static org.testng.Assert.assertNotNull; @@ -54,4 +57,70 @@ public void testPulsarCsvOutputFormatConstructor() { new PulsarCsvOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled()); assertNotNull(pulsarCsvOutputFormat); } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testPulsarCsvOutputFormatConstructorV2WhenServiceUrlIsNull() { + ClientConfigurationData clientConf = ClientConfigurationData.builder() + .serviceUrl(null) + .build(); + + ProducerConfigurationData producerConf = ProducerConfigurationData.builder() + .topicName("testTopic") + .build(); + + new PulsarAvroOutputFormat(clientConf, producerConf); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testPulsarCsvOutputFormatConstructorV2WhenTopicNameIsNull() { + ClientConfigurationData clientConf = ClientConfigurationData.builder() + .serviceUrl("testServiceUrl") + .build(); + + ProducerConfigurationData producerConf = ProducerConfigurationData.builder() + .topicName(null) + .build(); + + new PulsarAvroOutputFormat(clientConf, producerConf); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testPulsarCsvOutputFormatConstructorV2WhenTopicNameIsBlank() { + ClientConfigurationData clientConf = ClientConfigurationData.builder() + .serviceUrl("testServiceUrl") + .build(); + + ProducerConfigurationData producerConf = ProducerConfigurationData.builder() + .topicName(StringUtils.EMPTY) + .build(); + + new PulsarAvroOutputFormat(clientConf, producerConf); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testPulsarCsvOutputFormatConstructorV2WhenServiceUrlIsBlank() { + ClientConfigurationData clientConf = ClientConfigurationData.builder() + .serviceUrl(StringUtils.EMPTY) + .build(); + + ProducerConfigurationData producerConf = ProducerConfigurationData.builder() + .topicName("testTopic") + .build(); + + new PulsarAvroOutputFormat(clientConf, producerConf); + } + + @Test + public void testPulsarCsvOutputFormatConstructorV2() { + ClientConfigurationData clientConf = ClientConfigurationData.builder() + .serviceUrl("testServiceUrl") + .build(); + + ProducerConfigurationData producerConf = ProducerConfigurationData.builder() + .topicName("testTopic") + .build(); + + PulsarCsvOutputFormat pulsarCsvOutputFormat = new PulsarCsvOutputFormat(clientConf, producerConf); + assertNotNull(pulsarCsvOutputFormat); + } } diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java index 4ab723275d0ec..01edc9a545d1c 100644 --- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java +++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java @@ -18,7 +18,10 @@ */ package org.apache.flink.batch.connectors.pulsar; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.testng.annotations.Test; import static org.testng.Assert.assertNotNull; @@ -54,4 +57,70 @@ public void testPulsarJsonOutputFormatConstructor() { new PulsarJsonOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled()); assertNotNull(pulsarJsonOutputFormat); } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testPulsarJsonOutputFormatConstructorV2WhenServiceUrlIsNull() { + ClientConfigurationData clientConf = ClientConfigurationData.builder() + .serviceUrl(null) + .build(); + + ProducerConfigurationData producerConf = ProducerConfigurationData.builder() + .topicName("testTopic") + .build(); + + new PulsarAvroOutputFormat(clientConf, producerConf); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testPulsarJsonROutputFormatConstructorV2WhenTopicNameIsNull() { + ClientConfigurationData clientConf = ClientConfigurationData.builder() + .serviceUrl("testServiceUrl") + .build(); + + ProducerConfigurationData producerConf = ProducerConfigurationData.builder() + .topicName(null) + .build(); + + new PulsarAvroOutputFormat(clientConf, producerConf); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testPulsarJsonOutputFormatConstructorV2WhenTopicNameIsBlank() { + ClientConfigurationData clientConf = ClientConfigurationData.builder() + .serviceUrl("testServiceUrl") + .build(); + + ProducerConfigurationData producerConf = ProducerConfigurationData.builder() + .topicName(StringUtils.EMPTY) + .build(); + + new PulsarAvroOutputFormat(clientConf, producerConf); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testPulsarJsonOutputFormatConstructorV2WhenServiceUrlIsBlank() { + ClientConfigurationData clientConf = ClientConfigurationData.builder() + .serviceUrl(StringUtils.EMPTY) + .build(); + + ProducerConfigurationData producerConf = ProducerConfigurationData.builder() + .topicName("testTopic") + .build(); + + new PulsarAvroOutputFormat(clientConf, producerConf); + } + + @Test + public void testPulsarJsonOutputFormatConstructorV2() { + ClientConfigurationData clientConf = ClientConfigurationData.builder() + .serviceUrl("testServiceUrl") + .build(); + + ProducerConfigurationData producerConf = ProducerConfigurationData.builder() + .topicName("testTopic") + .build(); + + PulsarJsonOutputFormat pulsarJsonOutputFormat = new PulsarJsonOutputFormat(clientConf, producerConf); + assertNotNull(pulsarJsonOutputFormat); + } } diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java index 238c49b716e13..a841067dfbd1d 100644 --- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java +++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java @@ -19,8 +19,11 @@ package org.apache.flink.batch.connectors.pulsar; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.testng.annotations.Test; import java.io.IOException; @@ -59,6 +62,72 @@ public void testPulsarOutputFormatConstructorWhenSerializationSchemaIsNull() { new PulsarOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled(), null); } + @Test(expectedExceptions = IllegalArgumentException.class) + public void testPulsarOutputFormatConstructorV2WhenServiceUrlIsNull() { + ClientConfigurationData clientConf = ClientConfigurationData.builder() + .serviceUrl(null) + .build(); + + ProducerConfigurationData producerConf = ProducerConfigurationData.builder() + .topicName("testTopic") + .build(); + + new PulsarOutputFormat(clientConf, producerConf); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testPulsarOutputFormatConstructorV2WhenTopicNameIsNull() { + ClientConfigurationData clientConf = ClientConfigurationData.builder() + .serviceUrl("testServiceUrl") + .build(); + + ProducerConfigurationData producerConf = ProducerConfigurationData.builder() + .topicName(null) + .build(); + + new PulsarOutputFormat(clientConf, producerConf); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testPulsarOutputFormatConstructorV2WhenTopicNameIsBlank() { + ClientConfigurationData clientConf = ClientConfigurationData.builder() + .serviceUrl("testServiceUrl") + .build(); + + ProducerConfigurationData producerConf = ProducerConfigurationData.builder() + .topicName(StringUtils.EMPTY) + .build(); + + new PulsarOutputFormat(clientConf, producerConf); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testPulsarOutputFormatConstructorV2WhenServiceUrlIsBlank() { + ClientConfigurationData clientConf = ClientConfigurationData.builder() + .serviceUrl(StringUtils.EMPTY) + .build(); + + ProducerConfigurationData producerConf = ProducerConfigurationData.builder() + .topicName("testTopic") + .build(); + + new PulsarOutputFormat(clientConf, producerConf); + } + + @Test + public void testPulsarOutputFormatConstructorV2() { + ClientConfigurationData clientConf = ClientConfigurationData.builder() + .serviceUrl("testServiceUrl") + .build(); + + ProducerConfigurationData producerConf = ProducerConfigurationData.builder() + .topicName("testTopic") + .build(); + + PulsarCsvOutputFormat pulsarCsvOutputFormat = new PulsarCsvOutputFormat(clientConf, producerConf); + assertNotNull(pulsarCsvOutputFormat); + } + @Test public void testPulsarOutputFormatWithStringSerializationSchema() throws IOException { String input = "Wolfgang Amadeus Mozart"; diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java index 125ee4a4afbd8..024637bbf87cb 100644 --- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java +++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java @@ -27,6 +27,8 @@ import org.apache.flink.types.Row; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.mockito.Mockito; import org.mockito.internal.util.reflection.Whitebox; import org.powermock.api.mockito.PowerMockito; @@ -89,7 +91,15 @@ public void testEmitDataStream() throws Exception { private PulsarAvroTableSink spySink() throws Exception { - PulsarAvroTableSink sink = new PulsarAvroTableSink(SERVICE_URL, TOPIC_NAME, AUTHENTICATION, ROUTING_KEY, NasaMission.class); + ClientConfigurationData clientConf = ClientConfigurationData.builder() + .serviceUrl(SERVICE_URL) + .build(); + ProducerConfigurationData producerConf = ProducerConfigurationData.builder() + .topicName(TOPIC_NAME) + .build(); + + PulsarAvroTableSink sink = + new PulsarAvroTableSink(clientConf, producerConf, ROUTING_KEY, NasaMission.class); FlinkPulsarProducer producer = Mockito.mock(FlinkPulsarProducer.class); PowerMockito.whenNew( FlinkPulsarProducer.class diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java index c42ae6c2693a4..25755d24eb10a 100644 --- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java +++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java @@ -26,6 +26,8 @@ import org.apache.flink.types.Row; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.mockito.Mockito; import org.mockito.internal.util.reflection.Whitebox; import org.powermock.api.mockito.PowerMockito; @@ -83,7 +85,10 @@ public void testEmitDataStream() throws Exception { } private PulsarJsonTableSink spySink() throws Exception { - PulsarJsonTableSink sink = new PulsarJsonTableSink(SERVICE_URL, TOPIC_NAME, AUTHENTICATION, ROUTING_KEY); + PulsarJsonTableSink sink = new PulsarJsonTableSink( + ClientConfigurationData.builder().serviceUrl(SERVICE_URL).build(), + ProducerConfigurationData.builder().topicName(TOPIC_NAME).build(), + ROUTING_KEY); FlinkPulsarProducer producer = Mockito.mock(FlinkPulsarProducer.class); PowerMockito.whenNew( FlinkPulsarProducer.class diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java index 67a2433fc60ad..cc12ba42be66b 100644 --- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java +++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java @@ -18,16 +18,20 @@ */ package org.apache.flink.streaming.connectors.pulsar; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.io.IOException; import java.util.Arrays; +import java.util.HashSet; import java.util.regex.Pattern; /** @@ -53,6 +57,22 @@ public void testBuild() { Assert.assertNotNull(sourceFunction); } + + @Test + public void testBuildWithConfPojo() { + ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build(); + ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder() + .topicNames(new HashSet<>(Arrays.asList("testTopic"))) + .subscriptionName("testSubscriptionName") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .build(); + SourceFunction sourceFunction = pulsarSourceBuilder + .pulsarAllClientConf(clientConf) + .pulsarAllConsumerConf(consumerConf) + .build(); + Assert.assertNotNull(sourceFunction); + } + @Test(expectedExceptions = IllegalArgumentException.class) public void testBuildWithoutSettingRequiredProperties() { pulsarSourceBuilder.build(); @@ -136,4 +156,91 @@ public TypeInformation getProducedType() { return null; } } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testServiceUrlNullWithConfPojo() { + ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl(null).build(); + ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder() + .topicNames(new HashSet(Arrays.asList("testServiceUrl"))) + .subscriptionName("testSubscriptionName") + .build(); + + pulsarSourceBuilder + .pulsarAllClientConf(clientConf) + .pulsarAllConsumerConf(consumerConf) + .build(); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testServiceUrlWithBlankWithConfPojo() { + ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl(StringUtils.EMPTY).build(); + ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder() + .topicNames(new HashSet(Arrays.asList("testTopic"))) + .subscriptionName("testSubscriptionName") + .build(); + + pulsarSourceBuilder + .pulsarAllClientConf(clientConf) + .pulsarAllConsumerConf(consumerConf) + .build(); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testTopicPatternWithNullWithConfPojo() { + ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build(); + ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder() + .topicsPattern(null) + .subscriptionName("testSubscriptionName") + .build(); + + pulsarSourceBuilder + .pulsarAllClientConf(clientConf) + .pulsarAllConsumerConf(consumerConf) + .build(); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testSubscriptionNameWithNullWithConfPojo() { + ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build(); + ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder() + .topicNames(new HashSet(Arrays.asList("testTopic"))) + .subscriptionName(null) + .build(); + + pulsarSourceBuilder + .pulsarAllClientConf(clientConf) + .pulsarAllConsumerConf(consumerConf) + .build(); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testSubscriptionNameWithBlankWithConfPojo() { + pulsarSourceBuilder.topic(null); + ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build(); + ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder() + .topicNames(new HashSet(Arrays.asList("testTopic"))) + .subscriptionName(StringUtils.EMPTY) + .build(); + + pulsarSourceBuilder + .pulsarAllClientConf(clientConf) + .pulsarAllConsumerConf(consumerConf) + .build(); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testSubscriptionInitialPositionWithConfPojo() { + pulsarSourceBuilder.topic(null); + ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build(); + ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder() + .topicNames(new HashSet(Arrays.asList("testTopic"))) + .subscriptionName("testSubscriptionName") + .subscriptionInitialPosition(null) + .build(); + + pulsarSourceBuilder + .pulsarAllClientConf(clientConf) + .pulsarAllConsumerConf(consumerConf) + .build(); + } }