Skip to content

Commit

Permalink
Added ability to add annotations to Connector Configs (apache#6983)
Browse files Browse the repository at this point in the history
* Added sourceConfigClass and sinkConfigClass

* Add Validator annotation helpers to validate class parameters

* Fix build errors

* Take feedback into account

* Connected with validation

* Fix bugs

* Added tests

* Fix class name

* Address feedback

Co-authored-by: Sanjeev Kulkarni <[email protected]>
  • Loading branch information
srkukarni and Sanjeev Kulkarni authored May 22, 2020
1 parent add2eae commit c8170b7
Show file tree
Hide file tree
Showing 29 changed files with 229 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,26 @@ public class ConnectorDefinition {
* <p>If not defined, it will be assumed this connector cannot act as a data sink.
*/
private String sinkClass;

/**
* The class name for the source config implementation.
* Most of the sources are using a config class for managing their config
* and directly convert the supplied Map object at open to this object.
* These connector can declare their config class in this variable that will allow
* the framework to check for config parameter checking at submission time.
*
* <p>If not defined, the framework will not be able to do any submission time checks.
*/
private String sourceConfigClass;

/**
* The class name for the sink config implementation.
* Most of the sink are using a config class for managing their config
* and directly convert the supplied Map object at open to this object.
* These connector can declare their config class in this variable that will allow
* the framework to check for config parameter checking at submission time.
*
* <p>If not defined, the framework will not be able to do any submission time checks.
*/
private String sinkConfigClass;
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,14 @@ public void start(boolean blocking) throws Exception {

if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file, narExtractionDirectory));
functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file, narExtractionDirectory, true));

} else {
File file = new File(userCodeFile);
if (!file.exists()) {
throw new RuntimeException("Source archive (" + userCodeFile + ") does not exist");
}
functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file, narExtractionDirectory));
functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file, narExtractionDirectory, true));
}
} else if (sinkConfig != null) {
inferMissingArguments(sinkConfig);
Expand All @@ -285,13 +285,13 @@ public void start(boolean blocking) throws Exception {

if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file, narExtractionDirectory));
functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file, narExtractionDirectory, true));
} else {
File file = new File(userCodeFile);
if (!file.exists()) {
throw new RuntimeException("Sink archive does not exist");
}
functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file, narExtractionDirectory));
functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file, narExtractionDirectory, true));
}
} else {
throw new IllegalArgumentException("Must specify Function, Source or Sink config");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
doc = "The directory where nar packages are extractors"
)
private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "Should we validate connector config during submission"
)
private Boolean validateConnectorConfig = false;
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The pulsar topic used for storing function metadata"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.pulsar.functions.utils;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import lombok.AllArgsConstructor;
Expand All @@ -31,9 +32,13 @@
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.validator.ConfigValidation;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
Expand Down Expand Up @@ -302,7 +307,8 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
}

public static ExtractedSinkDetails validate(SinkConfig sinkConfig, Path archivePath,
File sinkPackageFile, String narExtractionDirectory) {
File sinkPackageFile, String narExtractionDirectory,
boolean validateConnectorConfig) {
if (isEmpty(sinkConfig.getTenant())) {
throw new IllegalArgumentException("Sink tenant cannot be null");
}
Expand Down Expand Up @@ -374,6 +380,9 @@ public static ExtractedSinkDetails validate(SinkConfig sinkConfig, Path archiveP
} catch (IOException e) {
throw new IllegalArgumentException("Failed to extract Sink class from archive", e);
}
if (validateConnectorConfig) {
validateConnectorConfig(sinkConfig, (NarClassLoader) narClassLoader);
}
try {
typeArg = getSinkType(sinkClassName, narClassLoader);
classLoader = narClassLoader;
Expand All @@ -398,12 +407,18 @@ public static ExtractedSinkDetails validate(SinkConfig sinkConfig, Path archiveP
throw new IllegalArgumentException(
String.format("Sink class %s must be in class path", sinkClassName), e1);
}
if (validateConnectorConfig) {
validateConnectorConfig(sinkConfig, (NarClassLoader) narClassLoader);
}
} else {
throw new IllegalArgumentException(
String.format("Sink class %s must be in class path", sinkClassName), e);
}
}
} else if (narClassLoader != null) {
if (validateConnectorConfig) {
validateConnectorConfig(sinkConfig, (NarClassLoader) narClassLoader);
}
try {
typeArg = getSinkType(sinkClassName, narClassLoader);
classLoader = narClassLoader;
Expand Down Expand Up @@ -580,4 +595,24 @@ public static SinkConfig validateUpdate(SinkConfig existingConfig, SinkConfig ne
}
return mergedConfig;
}

public static void validateConnectorConfig(SinkConfig sinkConfig, ClassLoader classLoader) {
try {
ConnectorDefinition defn = ConnectorUtils.getConnectorDefinition(classLoader);
if (defn.getSinkConfigClass() != null) {
Class configClass = Class.forName(defn.getSinkConfigClass(), true, classLoader);
Object configObject =
ObjectMapperFactory.getThreadLocal().convertValue(sinkConfig.getConfigs(), configClass);
if (configObject != null) {
ConfigValidation.validateConfig(configObject);
}
}
} catch (IOException e) {
throw new IllegalArgumentException("Error validating sink config", e);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Could not find sink config class", e);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Could not validate sink config: " + e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.pulsar.functions.utils;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import lombok.AllArgsConstructor;
Expand All @@ -28,10 +29,12 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.validator.ConfigValidation;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
Expand Down Expand Up @@ -209,7 +212,8 @@ public static SourceConfig convertFromDetails(FunctionDetails functionDetails) {
}

public static ExtractedSourceDetails validate(SourceConfig sourceConfig, Path archivePath,
File sourcePackageFile, String narExtractionDirectory) {
File sourcePackageFile, String narExtractionDirectory,
boolean validateConnectorConfig) {
if (isEmpty(sourceConfig.getTenant())) {
throw new IllegalArgumentException("Source tenant cannot be null");
}
Expand Down Expand Up @@ -268,6 +272,9 @@ public static ExtractedSourceDetails validate(SourceConfig sourceConfig, Path ar
} catch (IOException e) {
throw new IllegalArgumentException("Failed to extract source class from archive", e);
}
if (validateConnectorConfig) {
validateConnectorConfig(sourceConfig, (NarClassLoader) narClassLoader);
}
try {
typeArg = getSourceType(sourceClassName, narClassLoader);
classLoader = narClassLoader;
Expand All @@ -292,12 +299,18 @@ public static ExtractedSourceDetails validate(SourceConfig sourceConfig, Path ar
throw new IllegalArgumentException(
String.format("Source class %s must be in class path", sourceClassName), e1);
}
if (validateConnectorConfig) {
validateConnectorConfig(sourceConfig, (NarClassLoader) narClassLoader);
}
} else {
throw new IllegalArgumentException(
String.format("Source class %s must be in class path", sourceClassName), e);
}
}
} else if (narClassLoader != null) {
if (validateConnectorConfig) {
validateConnectorConfig(sourceConfig, (NarClassLoader) narClassLoader);
}
try {
typeArg = getSourceType(sourceClassName, narClassLoader);
classLoader = narClassLoader;
Expand Down Expand Up @@ -386,4 +399,23 @@ public static SourceConfig validateUpdate(SourceConfig existingConfig, SourceCon
return mergedConfig;
}

public static void validateConnectorConfig(SourceConfig sourceConfig, ClassLoader classLoader) {
try {
ConnectorDefinition defn = ConnectorUtils.getConnectorDefinition(classLoader);
if (defn.getSourceConfigClass() != null) {
Class configClass = Class.forName(defn.getSourceConfigClass(), true, classLoader);
Object configObject = ObjectMapperFactory.getThreadLocal().convertValue(sourceConfig.getConfigs(), configClass);
if (configObject != null) {
ConfigValidation.validateConfig(configObject);
}
}
} catch (IOException e) {
throw new IllegalArgumentException("Error validating source config", e);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Could not find source config class");
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Could not validate source config: " + e.getMessage());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,8 @@ public static String getIOSourceClass(NarClassLoader ncl) throws IOException {
* Extract the Pulsar IO Sink class from a connector archive.
*/
public static String getIOSinkClass(ClassLoader classLoader) throws IOException {
ConnectorDefinition conf = getConnectorDefinition(classLoader);
NarClassLoader ncl = (NarClassLoader) classLoader;
String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);

ConnectorDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
ConnectorDefinition.class);
if (StringUtils.isEmpty(conf.getSinkClass())) {
throw new IOException(
String.format("The '%s' connector does not provide a sink implementation", conf.getName()));
Expand All @@ -100,12 +97,17 @@ public static String getIOSinkClass(ClassLoader classLoader) throws IOException

public static ConnectorDefinition getConnectorDefinition(String narPath, String narExtractionDirectory) throws IOException {
try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), narExtractionDirectory)) {
String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);

return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, ConnectorDefinition.class);
return getConnectorDefinition(ncl);
}
}

public static ConnectorDefinition getConnectorDefinition(ClassLoader classLoader) throws IOException {
NarClassLoader narClassLoader = (NarClassLoader) classLoader;
String configStr = narClassLoader.getServiceDefinition(PULSAR_IO_SERVICE_NAME);

return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, ConnectorDefinition.class);
}

public static Connectors searchForConnectors(String connectorsDirectory, String narExtractionDirectory) throws IOException {
Path path = Paths.get(connectorsDirectory).toAbsolutePath();
log.info("Searching for connectors in {}", path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,22 @@
package org.apache.pulsar.functions.utils;

import com.google.gson.Gson;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.validator.ConfigValidationAnnotations;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.testng.PowerMockTestCase;
import org.testng.annotations.Test;

import java.io.IOException;
Expand All @@ -33,12 +43,26 @@
import java.util.Map;

import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE;
import static org.testng.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.testng.Assert.*;

/**
* Unit test of {@link Reflections}.
*/
public class SinkConfigUtilsTest {
@PrepareForTest(ConnectorUtils.class)
@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "javax.xml.*", "org.xml.*", "org.w3c.dom.*", "org.springframework.context.*", "org.apache.log4j.*", "com.sun.org.apache.xerces.*", "javax.management.*" })
public class SinkConfigUtilsTest extends PowerMockTestCase {

private ConnectorDefinition defn;

@Data
@Accessors(chain = true)
@NoArgsConstructor
public static class TestSinkConfig {
@ConfigValidationAnnotations.NotNull
private String configParameter;
}

@Test
public void testConvertBackFidelity() throws IOException {
Expand Down Expand Up @@ -279,6 +303,25 @@ public void testMergeRuntimeFlags() {
);
}

@Test
public void testValidateConfig() throws IOException {
mockStatic(ConnectorUtils.class);
defn = new ConnectorDefinition();
defn.setSinkConfigClass(TestSinkConfig.class.getName());
PowerMockito.when(ConnectorUtils.getConnectorDefinition(any())).thenReturn(defn);

SinkConfig sinkConfig = createSinkConfig();

// Good config
sinkConfig.getConfigs().put("configParameter", "Test");
SinkConfigUtils.validateConnectorConfig(sinkConfig, Thread.currentThread().getContextClassLoader());

// Bad config
sinkConfig.getConfigs().put("configParameter", null);
Exception e = expectThrows(IllegalArgumentException.class, () -> SinkConfigUtils.validateConnectorConfig(sinkConfig, Thread.currentThread().getContextClassLoader()));
assertTrue(e.getMessage().contains("Could not validate sink config: Field 'configParameter' cannot be null!"));
}

private SinkConfig createSinkConfig() {
SinkConfig sinkConfig = new SinkConfig();
sinkConfig.setTenant("test-tenant");
Expand Down
Loading

0 comments on commit c8170b7

Please sign in to comment.