Skip to content

Commit

Permalink
Classloader choice for validating Source/Sink (apache#3865)
Browse files Browse the repository at this point in the history
* Try both regular classloader as well as nar class loader for validating source/sinks

* Fixed test

* Fix unittest

* Added more comments to the code

* rename variables

* Wait for the create to succeed before updating. Otherwise there might be some reamnant producers
  • Loading branch information
srkukarni authored Mar 29, 2019
1 parent 5740699 commit f3095d8
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,14 @@ public void testPulsarSourceStats() throws Exception {
SourceConfig sourceConfig = createSourceConfig(tenant, namespacePortion, functionName, sinkTopic);
admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl);

retryStrategically((test) -> {
try {
return (admin.topics().getStats(sinkTopic).publishers.size() == 1);
} catch (PulsarAdminException e) {
return false;
}
}, 10, 150);

admin.source().updateSourceWithUrl(sourceConfig, jarFilePathUrl);

retryStrategically((test) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
Expand All @@ -46,6 +47,7 @@
import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
import static org.apache.pulsar.functions.utils.Utils.getSinkType;

@Slf4j
public class SinkConfigUtils {

@Getter
Expand Down Expand Up @@ -296,14 +298,37 @@ public static ExtractedSinkDetails validate(SinkConfig sinkConfig, Path archiveP
}

String sinkClassName;
ClassLoader classLoader;
final Class<?> typeArg;
final ClassLoader classLoader;
if (!isEmpty(sinkConfig.getClassName())) {
sinkClassName = sinkConfig.getClassName();
// We really don't know if we should use nar class loader or regular classloader
ClassLoader jarClassLoader = null;
ClassLoader narClassLoader = null;
try {
classLoader = Utils.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
jarClassLoader = Utils.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
} catch (Exception e) {
throw new IllegalArgumentException("Invalid Sink Jar");
}
try {
narClassLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
} catch (Exception e) {
}
if (jarClassLoader == null && narClassLoader == null) {
throw new IllegalArgumentException("Invalid Sink Package");
}
// We use typeArg and classLoader as arguments for lambda functions that require them to be final
// Thus we use these tmp vars
Class<?> tmptypeArg;
ClassLoader tmpclassLoader;
try {
tmptypeArg = getSinkType(sinkClassName, narClassLoader);
tmpclassLoader = narClassLoader;
} catch (Exception e) {
tmptypeArg = getSinkType(sinkClassName, jarClassLoader);
tmpclassLoader = jarClassLoader;
}
typeArg = tmptypeArg;
classLoader = tmpclassLoader;
} else if (!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
throw new IllegalArgumentException("Class-name must be present for archive with file-url");
} else {
Expand All @@ -316,10 +341,9 @@ public static ExtractedSinkDetails validate(SinkConfig sinkConfig, Path archiveP
} catch (IOException e1) {
throw new IllegalArgumentException("Failed to extract sink class from archive", e1);
}
typeArg = getSinkType(sinkClassName, classLoader);
}

Class<?> typeArg = getSinkType(sinkClassName, classLoader);

if (sinkConfig.getTopicToSerdeClassName() != null) {
sinkConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassName) -> {
ValidatorUtils.validateSerde(serdeClassName, typeArg, classLoader, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,37 @@ public static ExtractedSourceDetails validate(SourceConfig sourceConfig, Path ar
}

String sourceClassName;
ClassLoader classLoader;
final Class<?> typeArg;
final ClassLoader classLoader;
if (!isEmpty(sourceConfig.getClassName())) {
sourceClassName = sourceConfig.getClassName();
// We really don't know if we should use nar class loader or regular classloader
ClassLoader jarClassLoader = null;
ClassLoader narClassLoader = null;
try {
classLoader = Utils.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
jarClassLoader = Utils.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
} catch (Exception e) {
throw new IllegalArgumentException("Invalid Source Jar");
}
try {
narClassLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
} catch (Exception e) {
}
if (jarClassLoader == null && narClassLoader == null) {
throw new IllegalArgumentException("Invalid Source Package");
}
// We use typeArg and classLoader as arguments for lambda functions that require them to be final
// Thus we use these tmp vars
Class<?> tmptypeArg;
ClassLoader tmpclassLoader;
try {
tmptypeArg = getSourceType(sourceClassName, narClassLoader);
tmpclassLoader = narClassLoader;
} catch (Exception e) {
tmptypeArg = getSourceType(sourceClassName, jarClassLoader);
tmpclassLoader = jarClassLoader;
}
typeArg = tmptypeArg;
classLoader = tmpclassLoader;
} else if (!StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
throw new IllegalArgumentException("Class-name must be present for archive with file-url");
} else {
Expand All @@ -229,10 +252,9 @@ public static ExtractedSourceDetails validate(SourceConfig sourceConfig, Path ar
} catch (IOException e1) {
throw new IllegalArgumentException("Failed to extract source class from archive", e1);
}
typeArg = getSourceType(sourceClassName, classLoader);
}

Class<?> typeArg = getSourceType(sourceClassName, classLoader);

// Only one of serdeClassName or schemaType should be set
if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName()) && !StringUtils.isEmpty(sourceConfig.getSchemaType())) {
throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,9 @@ public void updateFunction(final String tenant,
} else if (uploadedInputStream != null) {
functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, uploadedInputStreamAsFile,
fileDetail, functionDetailsJson, mergedComponentConfigJson, componentType);
} else if (existingComponent.getPackageLocation().getPackagePath().startsWith("builtin://")) {
functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, null,
null, functionDetailsJson, mergedComponentConfigJson, componentType);
} else {
functionDetails = validateUpdateRequestParamsWithExistingMetadata(
tenant, namespace, componentName, existingComponent.getPackageLocation(), mergedComponentConfigJson, componentType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ public void testRegisterSinkZeroParallelism() {
}
}

@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Invalid Sink Jar")
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Invalid Sink Package")
public void testRegisterSinkHttpUrl() {
try {
testRegisterSinkMissingArguments(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ public void testRegisterSourceNoOutputTopic() throws IOException {
}
}

@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Invalid Source Jar")
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Invalid Source Package")
public void testRegisterSourceHttpUrl() {
try {
testRegisterSourceMissingArguments(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ private void runSinkTester(SinkTester tester, boolean builtin) throws Exception
// validate the sink result
tester.validateSinkResult(kvs);

// update the sink
updateSinkConnector(tester, tenant, namespace, sinkName, inputTopicName);

// delete the sink
deleteSink(tenant, namespace, sinkName);

Expand Down Expand Up @@ -220,6 +223,45 @@ protected void submitSinkConnector(SinkTester tester,
result.getStdout());
}

protected void updateSinkConnector(SinkTester tester,
String tenant,
String namespace,
String sinkName,
String inputTopicName) throws Exception {
String[] commands;
if (tester.getSinkType() != SinkTester.SinkType.UNDEFINED) {
commands = new String[] {
PulsarCluster.ADMIN_SCRIPT,
"sink", "update",
"--tenant", tenant,
"--namespace", namespace,
"--name", sinkName,
"--sink-type", tester.sinkType().name().toLowerCase(),
"--sinkConfig", new Gson().toJson(tester.sinkConfig()),
"--inputs", inputTopicName,
"--parallelism", "2"
};
} else {
commands = new String[] {
PulsarCluster.ADMIN_SCRIPT,
"sink", "create",
"--tenant", tenant,
"--namespace", namespace,
"--name", sinkName,
"--archive", tester.getSinkArchive(),
"--classname", tester.getSinkClassName(),
"--sinkConfig", new Gson().toJson(tester.sinkConfig()),
"--inputs", inputTopicName,
"--parallelism", "2"
};
}
log.info("Run command : {}", StringUtils.join(commands, ' '));
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
assertTrue(
result.getStdout().contains("\"Updated successfully\""),
result.getStdout());
}

protected void getSinkInfoSuccess(SinkTester tester,
String tenant,
String namespace,
Expand Down Expand Up @@ -422,6 +464,9 @@ private void testSource(SourceTester tester) throws Exception {
// validate the source result
validateSourceResult(consumer, kvs);

// update the source connector
updateSourceConnector(tester, tenant, namespace, sourceName, outputTopicName);

// delete the source
deleteSource(tenant, namespace, sourceName);

Expand Down Expand Up @@ -455,6 +500,29 @@ protected void submitSourceConnector(SourceTester tester,
result.getStdout());
}

protected void updateSourceConnector(SourceTester tester,
String tenant,
String namespace,
String sourceName,
String outputTopicName) throws Exception {
String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
"source", "update",
"--tenant", tenant,
"--namespace", namespace,
"--name", sourceName,
"--source-type", tester.sourceType(),
"--sourceConfig", new Gson().toJson(tester.sourceConfig()),
"--destinationTopicName", outputTopicName,
"--parallelism", "2"
};
log.info("Run command : {}", StringUtils.join(commands, ' '));
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
assertTrue(
result.getStdout().contains("\"Updated successfully\""),
result.getStdout());
}

protected void getSourceInfoSuccess(SourceTester tester,
String tenant,
String namespace,
Expand Down

0 comments on commit f3095d8

Please sign in to comment.