Skip to content

Commit

Permalink
Perform Checkstyle analysis in the pulsar-flink module (apache#4832)
Browse files Browse the repository at this point in the history
* Add maven plugin for style checking. Fix some style violations.

* Fix issues shown by the style checker in the pulsar-flink module
  • Loading branch information
vzhikserg authored and sijie committed Jul 29, 2019
1 parent 91c4254 commit b0793a1
Show file tree
Hide file tree
Showing 17 changed files with 190 additions and 74 deletions.
19 changes: 19 additions & 0 deletions pulsar-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,25 @@
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<executions>
<execution>
<id>check-style</id>
<phase>verify</phase>
<configuration>
<configLocation>../buildtools/src/main/resources/pulsar/checkstyle.xml</configLocation>
<suppressionsLocation>../buildtools/src/main/resources/pulsar/suppressions.xml</suppressionsLocation>
<encoding>UTF-8</encoding>
</configuration>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,24 @@
*/
package org.apache.flink.batch.connectors.pulsar;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

/**
* Base Pulsar Output Format to write Flink DataSets into a Pulsar topic.
*/
Expand All @@ -54,7 +53,8 @@ public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T> {
private ProducerConfigurationData producerConf;


protected BasePulsarOutputFormat(final String serviceUrl, final String topicName, final Authentication authentication) {
protected BasePulsarOutputFormat(final String serviceUrl, final String topicName,
final Authentication authentication) {
Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl), "serviceUrl cannot be blank.");
Preconditions.checkArgument(StringUtils.isNotBlank(topicName), "topicName cannot be blank.");

Expand All @@ -65,7 +65,8 @@ protected BasePulsarOutputFormat(final String serviceUrl, final String topicName
this.clientConf.setAuthentication(authentication);
this.producerConf.setTopicName(topicName);

LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}", this.producerConf.getTopicName());
LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}",
this.producerConf.getTopicName());
}

protected BasePulsarOutputFormat(ClientConfigurationData clientConf, ProducerConfigurationData producerConf) {
Expand All @@ -75,7 +76,8 @@ protected BasePulsarOutputFormat(ClientConfigurationData clientConf, ProducerCon
Preconditions.checkArgument(StringUtils.isNotBlank(clientConf.getServiceUrl()), "serviceUrl cannot be blank.");
Preconditions.checkArgument(StringUtils.isNotBlank(producerConf.getTopicName()), "topicName cannot be blank.");

LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}", this.producerConf.getTopicName());
LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}",
this.producerConf.getTopicName());
}

@Override
Expand Down Expand Up @@ -107,9 +109,9 @@ public void close() throws IOException {

private Producer<byte[]> getProducerInstance()
throws PulsarClientException {
if(producer == null){
if (producer == null){
synchronized (PulsarOutputFormat.class) {
if(producer == null){
if (producer == null){
producer = Preconditions.checkNotNull(createPulsarProducer(),
"Pulsar producer cannot be null.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public PulsarAvroOutputFormat(String serviceUrl, String topicName, Authenticatio
this.serializationSchema = new AvroSerializationSchema();
}

public PulsarAvroOutputFormat(ClientConfigurationData clientConfigurationData, ProducerConfigurationData producerConfigurationData) {
public PulsarAvroOutputFormat(ClientConfigurationData clientConfigurationData,
ProducerConfigurationData producerConfigurationData) {
super(clientConfigurationData, producerConfigurationData);
this.serializationSchema = new AvroSerializationSchema();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public PulsarCsvOutputFormat(String serviceUrl, String topicName, Authentication
this.serializationSchema = new CsvSerializationSchema<>();
}

public PulsarCsvOutputFormat(ClientConfigurationData clientConfigurationData, ProducerConfigurationData producerConfigurationData) {
public PulsarCsvOutputFormat(ClientConfigurationData clientConfigurationData,
ProducerConfigurationData producerConfigurationData) {
super(clientConfigurationData, producerConfigurationData);
this.serializationSchema = new CsvSerializationSchema<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public PulsarJsonOutputFormat(String serviceUrl, String topicName, Authenticatio
this.serializationSchema = new JsonSerializationSchema();
}

public PulsarJsonOutputFormat(ClientConfigurationData clientConfigurationData, ProducerConfigurationData producerConfigurationData) {
public PulsarJsonOutputFormat(ClientConfigurationData clientConfigurationData,
ProducerConfigurationData producerConfigurationData) {
super(clientConfigurationData, producerConfigurationData);
this.serializationSchema = new JsonSerializationSchema();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public class PulsarOutputFormat<T> extends BasePulsarOutputFormat<T> {

private static final long serialVersionUID = 2997027580167793000L;

public PulsarOutputFormat(String serviceUrl, String topicName, Authentication authentication, final SerializationSchema<T> serializationSchema) {
public PulsarOutputFormat(String serviceUrl, String topicName, Authentication authentication,
final SerializationSchema<T> serializationSchema) {
super(serviceUrl, topicName, authentication);
Preconditions.checkNotNull(serializationSchema, "serializationSchema cannot be null.");
this.serializationSchema = serializationSchema;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Implementations of different output formats.
*/
package org.apache.flink.batch.connectors.pulsar;
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@
*/
package org.apache.flink.batch.connectors.pulsar.serialization;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.apache.flink.api.common.serialization.SerializationSchema;

import java.io.ByteArrayOutputStream;
import java.io.IOException;

/**
* Avro Serialization Schema to serialize Dataset records to Avro.
*/
Expand All @@ -49,7 +48,7 @@ public byte[] serialize(T t) {
Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
arrayOutputStream.reset();
try {
writer.write(t,encoder);
writer.write(t, encoder);
encoder.flush();
} catch (IOException e) {
throw new RuntimeException("Error while serializing the record to Avro", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
*/
package org.apache.flink.batch.connectors.pulsar.serialization;

import java.io.IOException;
import java.io.StringWriter;
import org.apache.commons.csv.CSVFormat;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.tuple.Tuple;

import java.io.IOException;
import java.io.StringWriter;

/**
* Csv Serialization Schema to serialize Tuples to Csv.
*/
Expand All @@ -38,7 +37,7 @@ public byte[] serialize(T t) {
StringWriter stringWriter;
try {
Object[] fieldsValues = new Object[t.getArity()];
for(int index = 0; index < t.getArity(); index++) {
for (int index = 0; index < t.getArity(); index++) {
fieldsValues[index] = (t.getField(index));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Implementations of the serialization schemas.
*/
package org.apache.flink.batch.connectors.pulsar.serialization;
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.apache.flink.util.Preconditions.checkNotNull;

import java.util.function.Function;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SerializationSchema;
Expand All @@ -35,10 +34,10 @@
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
import org.apache.flink.util.SerializableObject;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
Expand All @@ -48,8 +47,8 @@
/**
* Flink Sink to produce data into a Pulsar topic.
*/
public class FlinkPulsarProducer<IN>
extends RichSinkFunction<IN>
public class FlinkPulsarProducer<T>
extends RichSinkFunction<T>
implements CheckpointedFunction {

private static final Logger LOG = LoggerFactory.getLogger(FlinkPulsarProducer.class);
Expand All @@ -61,12 +60,12 @@ public class FlinkPulsarProducer<IN>
* (Serializable) SerializationSchema for turning objects used with Flink into.
* byte[] for Pulsar.
*/
protected final SerializationSchema<IN> schema;
protected final SerializationSchema<T> schema;

/**
* User-provided key extractor for assigning a key to a pulsar message.
*/
protected final PulsarKeyExtractor<IN> flinkPulsarKeyExtractor;
protected final PulsarKeyExtractor<T> flinkPulsarKeyExtractor;

/**
* Produce Mode.
Expand Down Expand Up @@ -110,8 +109,8 @@ public class FlinkPulsarProducer<IN>
public FlinkPulsarProducer(String serviceUrl,
String defaultTopicName,
Authentication authentication,
SerializationSchema<IN> serializationSchema,
PulsarKeyExtractor<IN> keyExtractor) {
SerializationSchema<T> serializationSchema,
PulsarKeyExtractor<T> keyExtractor) {
checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url cannot be blank");
checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName cannot be blank");
checkNotNull(authentication, "auth cannot be null, set disabled for no auth");
Expand All @@ -129,8 +128,8 @@ public FlinkPulsarProducer(String serviceUrl,

public FlinkPulsarProducer(ClientConfigurationData clientConfigurationData,
ProducerConfigurationData producerConfigurationData,
SerializationSchema<IN> serializationSchema,
PulsarKeyExtractor<IN> keyExtractor) {
SerializationSchema<T> serializationSchema,
PulsarKeyExtractor<T> keyExtractor) {
this.clientConf = checkNotNull(clientConfigurationData, "client conf can not be null");
this.producerConf = checkNotNull(producerConfigurationData, "producer conf can not be null");
this.schema = checkNotNull(serializationSchema, "Serialization Schema not set");
Expand All @@ -144,7 +143,7 @@ public FlinkPulsarProducer(ClientConfigurationData clientConfigurationData,
/**
* @return pulsar key extractor.
*/
public PulsarKeyExtractor<IN> getKeyExtractor() {
public PulsarKeyExtractor<T> getKeyExtractor() {
return flinkPulsarKeyExtractor;
}

Expand Down Expand Up @@ -178,7 +177,7 @@ public void setFlushOnCheckpoint(boolean flush) {
// ----------------------------------- Sink Methods --------------------------

@SuppressWarnings("unchecked")
private static final <T> PulsarKeyExtractor<T> getOrNullKeyExtractor(PulsarKeyExtractor<T> extractor) {
private static <T> PulsarKeyExtractor<T> getOrNullKeyExtractor(PulsarKeyExtractor<T> extractor) {
if (null == extractor) {
return PulsarKeyExtractor.NULL;
} else {
Expand Down Expand Up @@ -238,7 +237,7 @@ public void open(Configuration parameters) throws Exception {
}

@Override
public void invoke(IN value, Context context) throws Exception {
public void invoke(T value, Context context) throws Exception {
checkErroneous();

byte[] serializedValue = schema.serialize(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,30 @@
*/
package org.apache.flink.streaming.connectors.pulsar;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.util.IOUtils;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
* Pulsar source (consumer) which receives messages from a topic and acknowledges messages.
* When checkpointing is enabled, it guarantees at least once processing semantics.
Expand Down Expand Up @@ -190,6 +193,6 @@ PulsarClient createClient() throws PulsarClientException {
}

Consumer<byte[]> createConsumer(PulsarClient client) throws PulsarClientException {
return ((PulsarClientImpl)client).subscribeAsync(consumerConfigurationData).join();
return ((PulsarClientImpl) client).subscribeAsync(consumerConfigurationData).join();
}
}
Loading

0 comments on commit b0793a1

Please sign in to comment.