Skip to content

Commit

Permalink
[CheckStyle]Enable CheckStyle of rest of modules of pulsar-functions (a…
Browse files Browse the repository at this point in the history
…pache#14352)

### Motivation

Enable CheckStyle plugin in 

pulsar-functions-api-example 
pulsar-functions-local-runner-origin 
pulsar-functions-runtime
pulsar-functions-runtime-all
pulsar-functions-utils
pulsar-functions-secrets

### Modifications

Fix the wrong code style
  • Loading branch information
gaozhangmin authored Feb 25, 2022
1 parent 7a58aeb commit 9c5bc38
Show file tree
Hide file tree
Showing 88 changed files with 1,317 additions and 679 deletions.
13 changes: 13 additions & 0 deletions pulsar-functions/java-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,19 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<executions>
<execution>
<id>checkstyle</id>
<phase>verify</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@
*/
package org.apache.pulsar.functions.api.examples;

import lombok.extern.slf4j.Slf4j;

import java.util.Collection;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;

/**
* Example Function that acts on a window of tuples at a time rather than per tuple basis.
*/
@Slf4j
public class AddWindowFunction implements Function <Collection<Integer>, Integer> {
public class AddWindowFunction implements Function<Collection<Integer>, Integer> {
@Override
public Integer apply(Collection<Integer> integers) {
return integers.stream().reduce(0, (x, y) -> x + y);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,24 @@
public class AsyncContextFunction implements Function<String, CompletableFuture<Void>> {
@Override
public CompletableFuture<Void> process(String input, Context context) {
Logger LOG = context.getLogger();
Logger log = context.getLogger();
CompletableFuture<Void> future = new CompletableFuture();

// this method only delay a function execute.
Executors.newCachedThreadPool().submit(() -> {
try {
Thread.sleep(500);
} catch (Exception e) {
LOG.error("Exception when Thread.sleep", e);
log.error("Exception when Thread.sleep", e);
future.completeExceptionally(e);
}

String inputTopics = context.getInputTopics().stream().collect(Collectors.joining(", "));
String funcName = context.getFunctionName();

String logMessage = String.format("A message with value of \"%s\" has arrived on " +
"one of the following topics: %s %n", input, inputTopics);
LOG.info(logMessage);
String logMessage = String.format("A message with value of \"%s\" has arrived on "
+ "one of the following topics: %s %n", input, inputTopics);
log.info(logMessage);

String metricName = String.format("function-%s-messages-received", funcName);
context.recordMetric(metricName, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
*/
package org.apache.pulsar.functions.api.examples;

import org.apache.pulsar.functions.api.SerDe;

import java.nio.ByteBuffer;
import org.apache.pulsar.functions.api.SerDe;

/**
* Simple ByteBuffer Serializer and Deserializer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
*/
package org.apache.pulsar.functions.api.examples;

import java.util.Optional;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

import java.util.Optional;

/**
* Function that appends something to incoming input based on config supplied.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,22 @@
*/
package org.apache.pulsar.functions.api.examples;

import java.util.stream.Collectors;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.slf4j.Logger;

import java.util.stream.Collectors;

public class ContextFunction implements Function<String, Void> {
@Override
public Void process(String input, Context context) {
Logger LOG = context.getLogger();
Logger log = context.getLogger();
String inputTopics = context.getInputTopics().stream().collect(Collectors.joining(", "));
String functionName = context.getFunctionName();

String logMessage = String.format("A message with a value of \"%s\" has arrived on one of " +
"the following topics: %s %n", input, inputTopics);
String logMessage = String.format("A message with a value of \"%s\" has arrived on one of "
+ "the following topics: %s %n", input, inputTopics);

LOG.info(logMessage);
log.info(logMessage);

String metricName = String.format("function-%s-messages-received", functionName);
context.recordMetric(metricName, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public class CursorManagementFunction implements Function<String, String> {
public String process(String input, Context context) throws Exception {
PulsarAdmin adminClient = context.getPulsarAdmin();
if (adminClient != null) {
String topic = context.getCurrentRecord().getTopicName().isPresent() ?
context.getCurrentRecord().getTopicName().get() : null;
String topic = context.getCurrentRecord().getTopicName().isPresent()
? context.getCurrentRecord().getTopicName().get() : null;
String subName = context.getTenant() + "/" + context.getNamespace() + "/" + context.getFunctionName();
if (topic != null) {
// 1578188166 below is a random-pick timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@
*/
package org.apache.pulsar.functions.api.examples;

import org.apache.pulsar.functions.api.SerDe;

import java.nio.ByteBuffer;
import org.apache.pulsar.functions.api.SerDe;

/**
* Example of using a byte buffer serialization for Custom object.
*/
public class CustomBaseSerde implements SerDe<CustomBaseObject> {
@Override
public CustomBaseObject deserialize(byte[] bytes) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
ByteBuffer buffer = ByteBuffer.wrap(bytes);
return new CustomBaseObject(buffer.getLong());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public class CustomBaseToDerivedFunction implements Function<CustomBaseObject, C

@Override
public CustomDerivedObject process(CustomBaseObject input, Context context) {
return new CustomDerivedObject(input.getBaseValue() + 100, (int)input.getBaseValue() + 50);
return new CustomDerivedObject(input.getBaseValue() + 100,
(int) input.getBaseValue() + 50);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@
*/
package org.apache.pulsar.functions.api.examples;

import org.apache.pulsar.functions.api.SerDe;

import java.nio.ByteBuffer;
import org.apache.pulsar.functions.api.SerDe;

/**
* Example to derived object serialization.
*/
public class CustomDerivedSerde implements SerDe<CustomDerivedObject> {
@Override
public CustomDerivedObject deserialize(byte[] bytes) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
ByteBuffer buffer = ByteBuffer.wrap(bytes);
return new CustomDerivedObject(buffer.getLong(), buffer.getInt());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
*/
package org.apache.pulsar.functions.api.examples;

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

/**
* Function that appends the host name to the payload message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.functions.api.examples;

import lombok.Data;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.EncryptionKeyInfo;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import lombok.Data;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.EncryptionKeyInfo;

/**
* Example function that provide a constructor with map<string, object> argument
* to initialize the CryptoKeyReader class used by pulsar function
* to initialize the CryptoKeyReader class used by pulsar function.
*/
@Data
public class RawFileKeyReader implements CryptoKeyReader {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
*/
package org.apache.pulsar.functions.api.examples;

import java.util.HashMap;
import java.util.Map;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

import java.util.HashMap;
import java.util.Map;

/**
* Example function that uses the built in publish function in the context
* to publish to a desired topic based on config and setting various message configurations to be passed along.
Expand All @@ -45,7 +44,7 @@ public Void process(String input, Context context) {
try {
TypedMessageBuilder messageBuilder = context.newOutputMessage(publishTopic, Schema.STRING).
value(output).properties(properties);
if (context.getCurrentRecord().getKey().isPresent()){
if (context.getCurrentRecord().getKey().isPresent()) {
messageBuilder.key(context.getCurrentRecord().getKey().get());
}
messageBuilder.eventTime(System.currentTimeMillis()).sendAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
*/
package org.apache.pulsar.functions.api.examples;

import java.util.Optional;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

import java.util.Optional;

/**
* An example demonstrate retrieving user config value from Context.
*/
Expand All @@ -32,7 +31,7 @@ public class UserConfigFunction implements Function<String, String> {
public String process(String input, Context context) {
Optional<Object> whatToWrite = context.getUserConfigValue("WhatToWrite");
if (whatToWrite.get() != null) {
return (String)whatToWrite.get();
return (String) whatToWrite.get();
} else {
return "Not a nice way";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.functions.api.examples;

import java.util.Optional;

import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.functions.api.Context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.pulsar.functions.api.Function;

/**
* Example function that does not return any value
* Example function that does not return any value.
*/
public class VoidFunction implements Function<String, Void> {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
*/
package org.apache.pulsar.functions.api.examples;

import java.util.Arrays;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

import java.util.Arrays;

/**
* The classic word count example done using pulsar functions
* Each input message is a sentence that split into words and each word counted.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.functions.api.examples;
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.functions.api.examples.pojo;
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@
*/
package org.apache.pulsar.functions.api.examples.serde;

import org.apache.pulsar.functions.api.SerDe;

import java.nio.ByteBuffer;
import org.apache.pulsar.functions.api.SerDe;

/**
* This class takes care of serializing/deserializing CustomObject.
*/
public class CustomObjectSerde implements SerDe<CustomObject> {
@Override
public CustomObject deserialize(byte[] bytes) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
ByteBuffer buffer = ByteBuffer.wrap(bytes);
return new CustomObject(buffer.getLong());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.functions.api.examples.serde;
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
*/
package org.apache.pulsar.functions.api.examples.window;

import java.util.Collection;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.WindowContext;
import org.apache.pulsar.functions.api.WindowFunction;

import java.util.Collection;

/**
* Example Function that acts on a window of tuples at a time rather than per tuple basis.
*/
Expand Down
Loading

0 comments on commit 9c5bc38

Please sign in to comment.