Skip to content

Commit

Permalink
improving config validation (apache#1859)
Browse files Browse the repository at this point in the history
* improving config validation

* removing unnecessary file

* removing unnecessary log

* fix bug

* fix potential NPE
  • Loading branch information
jerrypeng authored May 30, 2018
1 parent 92802e9 commit ecec933
Show file tree
Hide file tree
Showing 13 changed files with 1,352 additions and 297 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,12 @@ public IObjectFactory getObjectFactory() {
private Functions functions;
private CmdFunctions cmd;

public class DummyFunction implements Function<String, String> {
public static class DummyFunction implements Function<String, String> {

public DummyFunction() {

}

@Override
public String process(String input, Context context) throws Exception {
return null;
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.WindowConfig;
import org.apache.pulsar.functions.utils.validation.ValidatorImpls;
import org.apache.pulsar.functions.windowing.evictors.CountEvictionPolicy;
import org.apache.pulsar.functions.windowing.evictors.TimeEvictionPolicy;
import org.apache.pulsar.functions.windowing.evictors.WatermarkCountEvictionPolicy;
Expand Down Expand Up @@ -93,7 +94,9 @@ private WindowConfig getWindowConfigs(Context context) {
(new Gson().toJson(context.getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY).get())),
WindowConfig.class);

WindowUtils.validateAndSetDefaultsWindowConfig(windowConfig);

WindowUtils.inferDefaultConfigs(windowConfig);
ValidatorImpls.WindowConfigValidator.validateWindowConfig(windowConfig);
return windowConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,44 +25,7 @@ public static String getFullyQualifiedName(String tenant, String namespace, Stri
return String.format("%s/%s/%s", tenant, namespace, name);
}

public static void validateAndSetDefaultsWindowConfig(WindowConfig windowConfig) {
if (windowConfig.getWindowLengthDurationMs() == null && windowConfig.getWindowLengthCount() == null) {
throw new IllegalArgumentException("Window length is not specified");
}

if (windowConfig.getWindowLengthDurationMs() != null && windowConfig.getWindowLengthCount() != null) {
throw new IllegalArgumentException(
"Window length for time and count are set! Please set one or the other.");
}

if (windowConfig.getWindowLengthCount() != null) {
if (windowConfig.getWindowLengthCount() <= 0) {
throw new IllegalArgumentException(
"Window length must be positive [" + windowConfig.getWindowLengthCount() + "]");
}
}

if (windowConfig.getWindowLengthDurationMs() != null) {
if (windowConfig.getWindowLengthDurationMs() <= 0) {
throw new IllegalArgumentException(
"Window length must be positive [" + windowConfig.getWindowLengthDurationMs() + "]");
}
}

if (windowConfig.getSlidingIntervalCount() != null) {
if (windowConfig.getSlidingIntervalCount() <= 0) {
throw new IllegalArgumentException(
"Sliding interval must be positive [" + windowConfig.getSlidingIntervalCount() + "]");
}
}

if (windowConfig.getSlidingIntervalDurationMs() != null) {
if (windowConfig.getSlidingIntervalDurationMs() <= 0) {
throw new IllegalArgumentException(
"Sliding interval must be positive [" + windowConfig.getSlidingIntervalDurationMs() + "]");
}
}

public static void inferDefaultConfigs(WindowConfig windowConfig) {
if (windowConfig.getWindowLengthDurationMs() != null && windowConfig.getSlidingIntervalDurationMs() == null) {
windowConfig.setSlidingIntervalDurationMs(windowConfig.getWindowLengthDurationMs());
}
Expand All @@ -72,20 +35,10 @@ public static void validateAndSetDefaultsWindowConfig(WindowConfig windowConfig)
}

if (windowConfig.getTimestampExtractorClassName() != null) {
if (windowConfig.getMaxLagMs() != null) {
if (windowConfig.getMaxLagMs() <= 0) {
throw new IllegalArgumentException(
"Lag duration must be positive [" + windowConfig.getMaxLagMs() + "]");
}
} else {
if (windowConfig.getMaxLagMs() == null) {
windowConfig.setMaxLagMs(WindowFunctionExecutor.DEFAULT_MAX_LAG_MS);
}
if (windowConfig.getWatermarkEmitIntervalMs() != null) {
if (windowConfig.getWatermarkEmitIntervalMs() <= 0) {
throw new IllegalArgumentException(
"Watermark interval must be positive [" + windowConfig.getWatermarkEmitIntervalMs() + "]");
}
} else {
if (windowConfig.getWatermarkEmitIntervalMs() == null) {
windowConfig.setWatermarkEmitIntervalMs(WindowFunctionExecutor.DEFAULT_WATERMARK_EVENT_INTERVAL_MS);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,8 @@ public void testSettingLagTime() throws Exception {
if (arg0 == null) {
Assert.assertEquals(testWindowedPulsarFunction.windowConfig.getMaxLagMs(),
new Long(testWindowedPulsarFunction.DEFAULT_MAX_LAG_MS));
} else if((Long) arg0 <= 0) {
fail(String.format("Window lag cannot be zero or less -- lagTime: %s", arg0));
} else if((Long) arg0 < 0) {
fail(String.format("Window lag cannot be less than zero -- lagTime: %s", arg0));
} else {
Assert.assertEquals(testWindowedPulsarFunction.windowConfig.getMaxLagMs().longValue(),
maxLagMs.longValue());
Expand Down
5 changes: 5 additions & 0 deletions pulsar-functions/utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>net.jodah</groupId>
<artifactId>typetools</artifactId>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,29 @@
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClasses;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isListEntryCustom;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isMapEntryCustom;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isPositiveNumber;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidFunctionConfig;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidResources;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidTopicName;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidWindowConfig;
import org.apache.pulsar.functions.utils.validation.ValidatorImpls;

import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;

@Getter
@Setter
@Data
@EqualsAndHashCode
@ToString
@isValidFunctionConfig
public class FunctionConfig {

public enum ProcessingGuarantees {
Expand Down Expand Up @@ -61,25 +73,37 @@ public enum Runtime {
PYTHON
}


@NotNull
private String tenant;
@NotNull
private String namespace;
@NotNull
private String name;
@NotNull
@isImplementationOfClasses(implementsClasses = {Function.class, java.util.function.Function.class})
private String className;

@isListEntryCustom(entryValidatorClasses = {ValidatorImpls.TopicNameValidator.class})
private Collection<String> inputs;
@isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class },
valueValidatorClasses = { ValidatorImpls.SerdeValidator.class })
private Map<String, String> customSerdeInputs;

@isValidTopicName
private String output;
@isImplementationOfClass(implementsClass = SerDe.class)
private String outputSerdeClassName;

@isValidTopicName
private String logTopic;
private ProcessingGuarantees processingGuarantees;
private Map<String, Object> userConfig;
private SubscriptionType subscriptionType;
private Runtime runtime;
private boolean autoAck;
@isPositiveNumber
private int parallelism;
@isValidResources
private Resources resources;
private String fqfn;
@isValidWindowConfig
private WindowConfig windowConfig;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,28 @@
import com.google.protobuf.AbstractMessage.Builder;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.util.JsonFormat;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.net.ServerSocket;
import java.util.Collection;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.functions.api.Function;

import net.jodah.typetools.TypeResolver;

/**
* Utils used for runtime.
*/
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class Utils {

Expand Down Expand Up @@ -76,4 +87,66 @@ public static int findAvailablePort() {
throw new RuntimeException("No free port found", ex);
}
}

public static Class<?>[] getFunctionTypes(FunctionConfig functionConfig) {

Object userClass = createInstance(functionConfig.getClassName(), Thread.currentThread().getContextClassLoader());

Class<?>[] typeArgs;
// if window function
if (functionConfig.getWindowConfig() != null) {
java.util.function.Function function = (java.util.function.Function) userClass;
if (function == null) {
throw new IllegalArgumentException(String.format("The Java util function class %s could not be instantiated",
functionConfig.getClassName()));
}
typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
if (!typeArgs[0].equals(Collection.class)) {
throw new IllegalArgumentException("Window function must take a collection as input");
}
Type type = TypeResolver.resolveGenericType(java.util.function.Function.class, function.getClass());
Type collectionType = ((ParameterizedType) type).getActualTypeArguments()[0];
Type actualInputType = ((ParameterizedType) collectionType).getActualTypeArguments()[0];
typeArgs[0] = (Class<?>) actualInputType;
} else {
if (userClass instanceof Function) {
Function pulsarFunction = (Function) userClass;
typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass());
} else {
java.util.function.Function function = (java.util.function.Function) userClass;
typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
}
}

return typeArgs;
}

public static Object createInstance(String userClassName, ClassLoader classLoader) {
Class<?> theCls;
try {
theCls = Class.forName(userClassName);
} catch (ClassNotFoundException cnfe) {
try {
theCls = Class.forName(userClassName, true, classLoader);
} catch (ClassNotFoundException e) {
throw new RuntimeException("User class must be in class path", cnfe);
}
}
Object result;
try {
Constructor<?> meth = theCls.getDeclaredConstructor();
meth.setAccessible(true);
result = meth.newInstance();
} catch (InstantiationException ie) {
throw new RuntimeException("User class must be concrete", ie);
} catch (NoSuchMethodException e) {
throw new RuntimeException("User class doesn't have such method", e);
} catch (IllegalAccessException e) {
throw new RuntimeException("User class must have a no-arg constructor", e);
} catch (InvocationTargetException e) {
throw new RuntimeException("User class constructor throws exception", e);
}
return result;

}
}
Loading

0 comments on commit ecec933

Please sign in to comment.