Skip to content

Commit

Permalink
Removed Validation Annotations and do manual Validation (apache#2813)
Browse files Browse the repository at this point in the history
* Added Get and List source/sink functionality

* Fixed compile

* Removed test that doesnt make sense any more

* Fixed build

* Fixed logic

* Return error response

* Return response on error

* Fix unittest

* Fixed unittest

* Fixed unittest

* Fixed unittest

* Added get/list sinks tests

* Added get/list tests

* Add more unittests

* Added more unittests

* Added TODO

* Took feedback

* Fix unittest

* Fix unittest

* Fix unittest

* Fixed integration tests

* Fixed integration test

* Added restart/stop functionality to the sources/sinks

* Added getstatus method to sources/sink

* Fix integration tests

* Do Explicit Validation of Configs instead of annotations

* Remove all annotations from config files

* Removed validator related classes

* Removed unused imports

* Fix check logic

* fix compilation

* Fix unittest

* Fixed unittest

* Fixed unittest

* Validator tests should be done elsewhere

* Added unittest

* Fixed unittest
  • Loading branch information
srkukarni authored Oct 22, 2018
1 parent b73a967 commit 32d1daf
Show file tree
Hide file tree
Showing 33 changed files with 1,249 additions and 2,136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import java.util.Map;
import java.util.Set;

import com.google.common.collect.Maps;
import com.google.gson.Gson;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
Expand All @@ -47,11 +45,8 @@
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
Expand Down Expand Up @@ -229,7 +224,7 @@ protected static FunctionConfig createFunctionConfig(String jarFile, String tena

File file = new File(jarFile);
try {
Reflections.loadJar(file);
Utils.loadJar(file);
} catch (MalformedURLException e) {
throw new RuntimeException("Failed to load user jar " + file, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.utils.FunctionConfig;
Expand Down Expand Up @@ -95,6 +96,7 @@ public IObjectFactory getObjectFactory() {
}

private static final String TEST_NAME = "test_name";
private static final String JAR_NAME = CmdFunctionsTest.class.getClassLoader().getResource("dummyexamples.jar").getFile();

private PulsarAdmin admin;
private Functions functions;
Expand Down Expand Up @@ -201,7 +203,7 @@ public void testCreateFunction() throws Exception {
"--name", fnName,
"--inputs", inputTopicName,
"--output", outputTopicName,
"--jar", "SomeJar.jar",
"--jar", JAR_NAME,
"--auto-ack", "false",
"--tenant", "sample",
"--namespace", "ns1",
Expand Down Expand Up @@ -273,7 +275,7 @@ public void stopFunctionInstances() throws Exception {

verify(functions, times(1)).stopFunction(tenant, namespace, fnName);
}

@Test
public void testCreateFunctionWithHttpUrl() throws Exception {
String fnName = TEST_NAME + "-function";
Expand All @@ -300,7 +302,7 @@ public void testCreateFunctionWithHttpUrl() throws Exception {
consoleOutputCapturer.stop();
String output = consoleOutputCapturer.getStderr();

assertTrue(output.contains("Failed to download jar"));
assertTrue(output.contains("Corrupted Jar File"));
assertEquals(fnName, creater.getFunctionName());
assertEquals(inputTopicName, creater.getInputs());
assertEquals(outputTopicName, creater.getOutput());
Expand All @@ -320,14 +322,14 @@ public void testGetFunctionStatus() throws Exception {

verify(functions, times(1)).getFunctionStatus(tenant, namespace, fnName, instanceId);
}

@Test
public void testCreateFunctionWithFileUrl() throws Exception {
String fnName = TEST_NAME + "-function";
String inputTopicName = TEST_NAME + "-input-topic";
String outputTopicName = TEST_NAME + "-output-topic";

final String url = "file:/usr/temp/myfile.jar";
final String url = "file:" + JAR_NAME;
cmd.run(new String[] {
"create",
"--name", fnName,
Expand Down Expand Up @@ -371,7 +373,7 @@ public void testCreateSink() throws Exception {
consoleOutputCapturer.stop();
String output = consoleOutputCapturer.getStderr();

assertTrue(output.contains("Failed to download archive"));
assertTrue(output.contains("Corrupt User PackageFile " + url));
assertEquals(url, creater.archive);
}

Expand All @@ -389,14 +391,15 @@ public void testCreateSource() throws Exception {
"--archive", url,
"--tenant", "sample",
"--namespace", "ns1",
"--destination-topic-name", "input",
});

CreateSource creater = cmdSources.getCreateSource();

consoleOutputCapturer.stop();
String output = consoleOutputCapturer.getStderr();

assertTrue(output.contains("Failed to download archive"));
assertTrue(output.contains("Corrupt User PackageFile " + url));
assertEquals(url, creater.archive);
}

Expand All @@ -410,7 +413,7 @@ public void testCreateFunctionWithTopicPatterns() throws Exception {
"--name", fnName,
"--topicsPattern", topicPatterns,
"--output", outputTopicName,
"--jar", "SomeJar.jar",
"--jar", JAR_NAME,
"--tenant", "sample",
"--namespace", "ns1",
"--className", DummyFunction.class.getName(),
Expand All @@ -435,7 +438,7 @@ public void testCreateWithoutTenant() throws Exception {
"--name", fnName,
"--inputs", inputTopicName,
"--output", outputTopicName,
"--jar", "SomeJar.jar",
"--jar", JAR_NAME,
"--namespace", "ns1",
"--className", DummyFunction.class.getName(),
});
Expand All @@ -455,7 +458,7 @@ public void testCreateWithoutNamespace() throws Exception {
"--name", fnName,
"--inputs", inputTopicName,
"--output", outputTopicName,
"--jar", "SomeJar.jar",
"--jar", JAR_NAME,
"--className", DummyFunction.class.getName(),
});

Expand All @@ -479,7 +482,7 @@ public void testCreateUsingFullyQualifiedFunctionName() throws Exception {
"--inputs", inputTopicName,
"--output", outputTopicName,
"--fqfn", fqfn,
"--jar", "SomeJar.jar",
"--jar", JAR_NAME,
"--className", DummyFunction.class.getName(),
});

Expand All @@ -498,7 +501,7 @@ public void testCreateWithoutFunctionName() throws Exception {
"create",
"--inputs", inputTopicName,
"--output", outputTopicName,
"--jar", "SomeJar.jar",
"--jar", JAR_NAME,
"--tenant", "sample",
"--namespace", "ns1",
"--className", DummyFunction.class.getName(),
Expand All @@ -515,7 +518,7 @@ public void testCreateWithoutOutputTopicWithSkipFlag() throws Exception {
cmd.run(new String[] {
"create",
"--inputs", inputTopicName,
"--jar", "SomeJar.jar",
"--jar", JAR_NAME,
"--tenant", "sample",
"--namespace", "ns1",
"--className", DummyFunction.class.getName(),
Expand All @@ -527,7 +530,7 @@ public void testCreateWithoutOutputTopicWithSkipFlag() throws Exception {

}


@Test
public void testCreateWithoutOutputTopic() {

Expand All @@ -538,7 +541,7 @@ public void testCreateWithoutOutputTopic() {
cmd.run(new String[] {
"create",
"--inputs", inputTopicName,
"--jar", "SomeJar.jar",
"--jar", JAR_NAME,
"--tenant", "sample",
"--namespace", "ns1",
"--className", DummyFunction.class.getName(),
Expand Down Expand Up @@ -603,7 +606,7 @@ public void testUpdateFunction() throws Exception {
"--name", fnName,
"--inputs", inputTopicName,
"--output", outputTopicName,
"--jar", "SomeJar.jar",
"--jar", JAR_NAME,
"--tenant", "sample",
"--namespace", "ns1",
"--className", DummyFunction.class.getName(),
Expand Down Expand Up @@ -735,7 +738,7 @@ public void TestCreateFunctionParallelism() throws Exception{
"--name", fnName,
"--inputs", inputTopicName,
"--output", outputTopicName,
"--jar", "SomeJar.jar",
"--jar", JAR_NAME,
"--tenant", "sample",
"--namespace", "ns1",
"--className", DummyFunction.class.getName(),
Expand All @@ -746,14 +749,14 @@ public void TestCreateFunctionParallelism() throws Exception{
"--name", fnName,
"--inputs", inputTopicName,
"--output", outputTopicName,
"--jar", "SomeJar.jar",
"--jar", JAR_NAME,
"--tenant", "sample",
"--namespace", "ns1",
"--className", DummyFunction.class.getName(),
"--parallelism", "-1"
};

testValidateFunctionsConfigs(correctArgs, incorrectArgs, "Field 'parallelism' must be a Positive Number");
testValidateFunctionsConfigs(correctArgs, incorrectArgs, "Function parallelism should positive number");

}

Expand All @@ -764,7 +767,7 @@ public void TestCreateTopicName() throws Exception {
"--name", fnName,
"--inputs", inputTopicName,
"--output", outputTopicName,
"--jar", "SomeJar.jar",
"--jar", JAR_NAME,
"--tenant", "sample",
"--namespace", "ns1",
"--className", DummyFunction.class.getName(),
Expand All @@ -775,13 +778,13 @@ public void TestCreateTopicName() throws Exception {
"--name", fnName,
"--inputs", inputTopicName,
"--output", wrongOutputTopicName,
"--jar", "SomeJar.jar",
"--jar", JAR_NAME,
"--tenant", "sample",
"--namespace", "ns1",
"--className", DummyFunction.class.getName(),
};

testValidateFunctionsConfigs(correctArgs, incorrectArgs, "The topic name " + wrongOutputTopicName + " is invalid for field 'output'");
testValidateFunctionsConfigs(correctArgs, incorrectArgs, "Output topic " + wrongOutputTopicName + " is invalid");
}

@Test
Expand All @@ -791,7 +794,7 @@ public void TestCreateClassName() throws Exception {
"--name", fnName,
"--inputs", inputTopicName,
"--output", outputTopicName,
"--jar", "SomeJar.jar",
"--jar", DummyFunction.class.getProtectionDomain().getCodeSource().getLocation().getPath(),
"--tenant", "sample",
"--namespace", "ns1",
"--className", DummyFunction.class.getName(),
Expand All @@ -802,13 +805,13 @@ public void TestCreateClassName() throws Exception {
"--name", fnName,
"--inputs", inputTopicName,
"--output", outputTopicName,
"--jar", "SomeJar.jar",
"--jar", DummyFunction.class.getProtectionDomain().getCodeSource().getLocation().getPath(),
"--tenant", "sample",
"--namespace", "ns1",
"--className", cannotLoadClass,
};

testValidateFunctionsConfigs(correctArgs, incorrectArgs, "Cannot find/load class " + cannotLoadClass);
testValidateFunctionsConfigs(correctArgs, incorrectArgs, "User class must be in class path");
}

@Test
Expand All @@ -818,7 +821,7 @@ public void TestCreateSameInOutTopic() throws Exception {
"--name", fnName,
"--inputs", inputTopicName,
"--output", outputTopicName,
"--jar", "SomeJar.jar",
"--jar", JAR_NAME,
"--tenant", "sample",
"--namespace", "ns1",
"--className", DummyFunction.class.getName(),
Expand All @@ -828,7 +831,7 @@ public void TestCreateSameInOutTopic() throws Exception {
"--name", fnName,
"--inputs", inputTopicName,
"--output", inputTopicName,
"--jar", "SomeJar.jar",
"--jar", JAR_NAME,
"--tenant", "sample",
"--namespace", "ns1",
"--className", DummyFunction.class.getName(),
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,8 @@
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.apache.commons.lang.StringUtils.isBlank;
import static org.apache.commons.lang.StringUtils.isNotBlank;
import static org.apache.commons.lang.StringUtils.isNotEmpty;
import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import static org.apache.pulsar.functions.utils.Utils.fileExists;
import static org.apache.pulsar.functions.worker.Utils.downloadFromHttpUrl;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
Expand All @@ -46,7 +43,6 @@

import java.io.File;
import java.lang.reflect.Type;
import java.net.MalformedURLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
Expand All @@ -69,25 +65,12 @@
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.admin.cli.utils.CmdUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.internal.FunctionsImpl;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.Resources;
import org.apache.pulsar.functions.proto.Function.RetryDetails;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.Function.SubscriptionType;
import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.utils.*;
import org.apache.pulsar.functions.utils.FunctionConfig.ProcessingGuarantees;
import org.apache.pulsar.functions.utils.validation.ConfigValidation;
import org.apache.pulsar.functions.utils.validation.ValidatorImpls.ImplementsClassesValidator;
import org.apache.pulsar.functions.windowing.WindowFunctionExecutor;
import org.apache.pulsar.functions.windowing.WindowUtils;

@Slf4j
Expand Down Expand Up @@ -505,48 +488,19 @@ protected void validateFunctionConfigs(FunctionConfig functionConfig) {
+ " be specified for the function. Please specify one.");
}

boolean isJarPathUrl = isNotBlank(functionConfig.getJar()) && Utils.isFunctionPackageUrlSupported(functionConfig.getJar());
String jarFilePath = null;
if (isJarPathUrl) {
if (functionConfig.getJar().startsWith(Utils.HTTP)) {
// download jar file if url is http or file is downloadable
File tempPkgFile = null;
try {
tempPkgFile = downloadFromHttpUrl(functionConfig.getJar(), functionConfig.getName());
jarFilePath = tempPkgFile.getAbsolutePath();
} catch (Exception e) {
if (tempPkgFile != null) {
tempPkgFile.deleteOnExit();
}
throw new ParameterException("Failed to download jar from " + functionConfig.getJar()
+ ", due to =" + e.getMessage());
}
}
} else {
if (!fileExists(userCodeFile)) {
throw new ParameterException("File " + userCodeFile + " does not exist");
}
jarFilePath = userCodeFile;
if (!isBlank(functionConfig.getJar()) && !Utils.isFunctionPackageUrlSupported(functionConfig.getJar()) &&
!new File(functionConfig.getJar()).exists()) {
throw new ParameterException("The specified jar file does not exist");
}

if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {

if (jarFilePath != null) {
File file = new File(jarFilePath);
try {
classLoader = Reflections.loadJar(file);
} catch (MalformedURLException e) {
throw new ParameterException(
"Failed to load user jar " + file + " with error " + e.getMessage());
}
(new ImplementsClassesValidator(Function.class, java.util.function.Function.class))
.validateField("className", functionConfig.getClassName(), classLoader);
}
if (!isBlank(functionConfig.getPy()) && !Utils.isFunctionPackageUrlSupported(functionConfig.getPy()) &&
!new File(functionConfig.getPy()).exists()) {
throw new ParameterException("The specified jar file does not exist");
}

try {
// Need to load jar and set context class loader before calling
ConfigValidation.validateConfig(functionConfig, functionConfig.getRuntime().name(), classLoader);
String functionPkgUrl = Utils.isFunctionPackageUrlSupported(userCodeFile) ? userCodeFile : null;
classLoader = FunctionConfigUtils.validate(functionConfig, functionPkgUrl, null);
} catch (Exception e) {
throw new IllegalArgumentException(e.getMessage());
}
Expand Down
Loading

0 comments on commit 32d1daf

Please sign in to comment.