Skip to content

Commit

Permalink
[pulsar-functions] fix words error and remove not use import java cla…
Browse files Browse the repository at this point in the history
…ss (apache#3791)

### Motivation
   when I study and read this module code , I fix some error like:
1. words error, such as JaveInstance -> JavaInstance, Recieved -> Received,  Unknwon -> Unknown, alterted -> alterted, Updare -> Update, etc.
2. remove not use import java class.
3. duplicated code in this  java class I extract method. 
4. javadoc  styles are consistent (not required)

### Modifications

minor fix up in pulsar-functions module
  • Loading branch information
ambition119 authored and sijie committed Mar 9, 2019
1 parent 18e3ab1 commit 4f03548
Show file tree
Hide file tree
Showing 86 changed files with 253 additions and 260 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,43 +34,50 @@
*/
public interface Context {
/**
* Access the record associated with the current input value
* Access the record associated with the current input value.
*
* @return
*/
Record<?> getCurrentRecord();

/**
* Get a list of all input topics
* Get a list of all input topics.
*
* @return a list of all input topics
*/
Collection<String> getInputTopics();

/**
* Get the output topic of the function
* Get the output topic of the function.
*
* @return output topic name
*/
String getOutputTopic();

/**
* Get output schema builtin type or custom class name
* Get output schema builtin type or custom class name.
*
* @return output schema builtin type or custom class name
*/
String getOutputSchemaType();

/**
* The tenant this function belongs to
* The tenant this function belongs to.
*
* @return the tenant this function belongs to
*/
String getTenant();

/**
* The namespace this function belongs to
* The namespace this function belongs to.
*
* @return the namespace this function belongs to
*/
String getNamespace();

/**
* The name of the function that we are executing
* The name of the function that we are executing.
*
* @return The Function name
*/
String getFunctionName();
Expand All @@ -96,19 +103,22 @@ public interface Context {
int getNumInstances();

/**
* The version of the function that we are executing
* The version of the function that we are executing.
*
* @return The version id
*/
String getFunctionVersion();

/**
* The logger object that can be used to log in a function
* The logger object that can be used to log in a function.
*
* @return the logger object
*/
Logger getLogger();

/**
* Increment the builtin distributed counter refered by key
* Increment the builtin distributed counter referred by key.
*
* @param key The name of the key
* @param amount The amount to be incremented
*/
Expand All @@ -123,7 +133,7 @@ public interface Context {
long getCounter(String key);

/**
* Updare the state value for the key.
* Update the state value for the key.
*
* @param key name of the key
* @param value state value of the key
Expand All @@ -139,42 +149,47 @@ public interface Context {
ByteBuffer getState(String key);

/**
* Get a map of all user-defined key/value configs for the function
* Get a map of all user-defined key/value configs for the function.
*
* @return The full map of user-defined config values
*/
Map<String, Object> getUserConfigMap();

/**
* Get any user-defined key/value
* Get any user-defined key/value.
*
* @param key The key
* @return The Optional value specified by the user for that key.
*/
Optional<Object> getUserConfigValue(String key);

/**
* Get any user-defined key/value or a default value if none is present
* Get any user-defined key/value or a default value if none is present.
*
* @param key
* @param defaultValue
* @return Either the user config value associated with a given key or a supplied default value
*/
Object getUserConfigValueOrDefault(String key, Object defaultValue);

/**
* Get the secret associated with this key
* Get the secret associated with this key.
*
* @param secretName The name of the secret
* @return The secret if anything was found or null
*/
String getSecret(String secretName);

/**
* Record a user defined metric
* Record a user defined metric.
*
* @param metricName The name of the metric
* @param value The value of the metric
*/
void recordMetric(String metricName, double value);

/**
* Publish an object using serDe for serializing to the topic
* Publish an object using serDe for serializing to the topic.
*
* @param topicName
* The name of the topic for publishing
Expand All @@ -187,7 +202,8 @@ public interface Context {
<O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName);

/**
* Publish an object to the topic using default schemas
* Publish an object to the topic using default schemas.
*
* @param topicName The name of the topic for publishing
* @param object The object that needs to be published
* @return A future that completes when the framework is done publishing the message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
public interface Function<I, O> {
/**
* Process the input.
*
* @return the output
*/
O process(I input, Context context) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,21 @@
public interface Record<T> {

/**
* If the record originated from a topic, report the topic name
* If the record originated from a topic, report the topic name.
*/
default Optional<String> getTopicName() {
return Optional.empty();
}

/**
* Return a key if the key has one associated
* Return a key if the key has one associated.
*/
default Optional<String> getKey() {
return Optional.empty();
}

/**
* Retrieves the actual data of the record
* Retrieves the actual data of the record.
*
* @return The record data
*/
Expand Down Expand Up @@ -85,19 +85,19 @@ default Map<String, String> getProperties() {
}

/**
* Acknowledge that this record is fully processed
* Acknowledge that this record is fully processed.
*/
default void ack() {
}

/**
* To indicate that this record has failed to be processed
* To indicate that this record has failed to be processed.
*/
default void fail() {
}

/**
* To support message routing on a per message basis
* To support message routing on a per message basis.
*
* @return The topic this message should be written to
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,29 @@
public interface WindowContext {

/**
* The tenant this function belongs to
* The tenant this function belongs to.
*
* @return the tenant this function belongs to
*/
String getTenant();

/**
* The namespace this function belongs to
* The namespace this function belongs to.
*
* @return the namespace this function belongs to
*/
String getNamespace();

/**
* The name of the function that we are executing
* The name of the function that we are executing.
*
* @return The Function name
*/
String getFunctionName();

/**
* The id of the function that we are executing
* The id of the function that we are executing.
*
* @return The function id
*/
String getFunctionId();
Expand All @@ -67,37 +71,43 @@ public interface WindowContext {
int getNumInstances();

/**
* The version of the function that we are executing
* The version of the function that we are executing.
*
* @return The version id
*/
String getFunctionVersion();

/**
* Get a list of all input topics
* Get a list of all input topics.
*
* @return a list of all input topics
*/
Collection<String> getInputTopics();

/**
* Get the output topic of the function
* Get the output topic of the function.
*
* @return output topic name
*/
String getOutputTopic();

/**
* Get output schema builtin type or custom class name
* Get output schema builtin type or custom class name.
*
* @return output schema builtin type or custom class name
*/
String getOutputSchemaType();

/**
* The logger object that can be used to log in a function
* The logger object that can be used to log in a function.
*
* @return the logger object
*/
Logger getLogger();

/**
* Increment the builtin distributed counter refered by key
* Increment the builtin distributed counter referred by key.
*
* @param key The name of the key
* @param amount The amount to be incremented
*/
Expand All @@ -112,7 +122,7 @@ public interface WindowContext {
long getCounter(String key);

/**
* Updare the state value for the key.
* Update the state value for the key.
*
* @param key name of the key
* @param value state value of the key
Expand All @@ -128,35 +138,39 @@ public interface WindowContext {
ByteBuffer getState(String key);

/**
* Get a map of all user-defined key/value configs for the function
* Get a map of all user-defined key/value configs for the function.
*
* @return The full map of user-defined config values
*/
Map<String, Object> getUserConfigMap();

/**
* Get any user-defined key/value
* Get any user-defined key/value.
*
* @param key The key
* @return The Optional value specified by the user for that key.
*/
Optional<Object> getUserConfigValue(String key);

/**
* Get any user-defined key/value or a default value if none is present
* Get any user-defined key/value or a default value if none is present.
*
* @param key
* @param defaultValue
* @return Either the user config value associated with a given key or a supplied default value
*/
Object getUserConfigValueOrDefault(String key, Object defaultValue);

/**
* Record a user defined metric
* Record a user defined metric.
*
* @param metricName The name of the metric
* @param value The value of the metric
*/
void recordMetric(String metricName, double value);

/**
* Publish an object using serDe for serializing to the topic
* Publish an object using serDe for serializing to the topic.
*
* @param topicName
* The name of the topic for publishing
Expand All @@ -169,7 +183,8 @@ public interface WindowContext {
<O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName);

/**
* Publish an object to the topic using default schemas
* Publish an object to the topic using default schemas.
*
* @param topicName The name of the topic for publishing
* @param object The object that needs to be published
* @return A future that completes when the framework is done publishing the message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
public interface WindowFunction<I, O> {
/**
* Process the input.
*
* @return the output
*/
O process(Collection<Record<I>> input, WindowContext context) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.pulsar.functions.api.utils.JavaSerDe;
import org.testng.annotations.Test;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
/**
* This class implements the Context interface exposed to the user.
*/

class ContextImpl implements Context, SinkContext, SourceContext {
private InstanceConfig config;
private Logger logger;
Expand Down Expand Up @@ -215,7 +214,7 @@ public String getFunctionName() {

@Override
public String getFunctionId() {
return config.getFunctionId().toString();
return config.getFunctionId();
}

@Override
Expand Down
Loading

0 comments on commit 4f03548

Please sign in to comment.