Skip to content

Commit

Permalink
[functions][Issue:5350]Fix pulsar can't load the customized SerDe (ap…
Browse files Browse the repository at this point in the history
…ache#5357)

Fixes apache#5350

### Motivation

When using the `--output-serde-classname` option, `functionClassLoader` is not set correctly.
  • Loading branch information
wolfstudy authored and sijie committed Oct 28, 2019
1 parent 28b0c3a commit 18712eb
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ Schema<T> initializeSchema() throws ClassNotFoundException {
pulsarSinkConfig.getSchemaType(), false);
} else {
return (Schema<T>) topicSchema.getSchema(pulsarSinkConfig.getTopic(), typeArg,
pulsarSinkConfig.getSerdeClassName(), false);
pulsarSinkConfig.getSerdeClassName(), false, functionClassLoader);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public Schema<?> getSchema(String topic, Class<?> clazz, SchemaType schemaType)
return cachedSchemas.computeIfAbsent(topic, t -> newSchemaInstance(clazz, schemaType));
}

public Schema<?> getSchema(String topic, Class<?> clazz, String schemaTypeOrClassName, boolean input, ClassLoader classLoader) {
return cachedSchemas.computeIfAbsent(topic, t -> newSchemaInstance(topic, clazz, schemaTypeOrClassName, input, classLoader));
}


/**
* If the topic is already created, we should be able to fetch the schema type (avro, json, ...)
*/
Expand Down Expand Up @@ -164,7 +169,7 @@ private static boolean isProtobufClass(Class<?> pojoClazz) {
}

@SuppressWarnings("unchecked")
private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, String schemaTypeOrClassName, boolean input) {
private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, String schemaTypeOrClassName, boolean input, ClassLoader classLoader) {
// The schemaTypeOrClassName can represent multiple thing, either a schema type, a schema class name or a ser-de
// class name.

Expand All @@ -191,12 +196,17 @@ private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, String sch
// First try with Schema
try {
return (Schema<T>) InstanceUtils.initializeCustomSchema(schemaTypeOrClassName,
Thread.currentThread().getContextClassLoader(), clazz, input);
classLoader, clazz, input);
} catch (Throwable t) {
// Now try with Serde or just fail
SerDe<T> serDe = (SerDe<T>) InstanceUtils.initializeSerDe(schemaTypeOrClassName,
Thread.currentThread().getContextClassLoader(), clazz, input);
classLoader, clazz, input);
return new SerDeSchema<>(serDe);
}
}

@SuppressWarnings("unchecked")
private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, String schemaTypeOrClassName, boolean input) {
return newSchemaInstance(topic, clazz, schemaTypeOrClassName, input, Thread.currentThread().getContextClassLoader());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
import org.apache.pulsar.functions.api.examples.serde.CustomObject;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
import org.apache.pulsar.tests.integration.containers.DebeziumPostgreSqlContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
Expand Down Expand Up @@ -1392,6 +1393,48 @@ private void testPublishFunction(Runtime runtime) throws Exception {
checkSubscriptionsCleanup(inputTopicName);
}

@Test
public void testSerdeFunction() throws Exception {
testCustomSerdeFunction();
}

private void testCustomSerdeFunction() throws Exception {
if (functionRuntimeType == FunctionRuntimeType.THREAD) {
return;
}

String inputTopicName = "persistent://public/default/test-serde-java-input-" + randomName(8);
String outputTopicName = "test-publish-serde-output-" + randomName(8);
try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
admin.topics().createNonPartitionedTopic(inputTopicName);
admin.topics().createNonPartitionedTopic(outputTopicName);
}

String functionName = "test-serde-fn-" + randomName(8);
submitFunction(
Runtime.JAVA, inputTopicName, outputTopicName, functionName, null, Serde_JAVA_CLASS,
Serde_OUTPUT_CLASS, Collections.singletonMap("serde-topic", outputTopicName)
);

// get function info
getFunctionInfoSuccess(functionName);
// get function stats
getFunctionStatsEmpty(functionName);

ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
"status",
"--tenant", "public",
"--namespace", "default",
"--name", functionName
);

FunctionStatus functionStatus = FunctionStatus.decode(result.getStdout());
assertEquals(functionStatus.getNumInstances(), 1);
assertEquals(functionStatus.getInstances().get(0).getStatus().isRunning(), true);
}

@Test
public void testPythonExclamationFunction() throws Exception {
testExclamationFunction(Runtime.PYTHON, false, false, false);
Expand Down Expand Up @@ -1600,6 +1643,49 @@ private static <T> void submitFunction(Runtime runtime,
ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), inputTopicSchema);
}

private static <T> void submitFunction(Runtime runtime,
String inputTopicName,
String outputTopicName,
String functionName,
String functionFile,
String functionClass,
String outputSerdeClassName,
Map<String, String> userConfigs) throws Exception {

CommandGenerator generator;
log.info("------- INPUT TOPIC: '{}'", inputTopicName);
if (inputTopicName.endsWith(".*")) {
log.info("----- CREATING TOPIC PATTERN FUNCTION --- ");
generator = CommandGenerator.createTopicPatternGenerator(inputTopicName, functionClass);
} else {
log.info("----- CREATING REGULAR FUNCTION --- ");
generator = CommandGenerator.createDefaultGenerator(inputTopicName, functionClass);
}
generator.setSinkTopic(outputTopicName);
generator.setFunctionName(functionName);
generator.setOutputSerDe(outputSerdeClassName);
if (userConfigs != null) {
generator.setUserConfig(userConfigs);
}
String command;
if (Runtime.JAVA == runtime) {
command = generator.generateCreateFunctionCommand();
} else if (Runtime.PYTHON == runtime) {
generator.setRuntime(runtime);
command = generator.generateCreateFunctionCommand(functionFile);
} else {
throw new IllegalArgumentException("Unsupported runtime : " + runtime);
}

log.info("---------- Function command: {}", command);
String[] commands = {
"sh", "-c", command
};
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
commands);
assertTrue(result.getStdout().contains("\"Created successfully\""));
}

private static <T> void ensureSubscriptionCreated(String inputTopicName,
String subscriptionName,
Schema<T> inputTopicSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ public void teardownFunctionWorkers() {
public static final String EXCEPTION_JAVA_CLASS =
"org.apache.pulsar.tests.integration.functions.ExceptionFunction";

public static final String Serde_JAVA_CLASS =
"org.apache.pulsar.functions.api.examples.CustomBaseToBaseFunction";

public static final String Serde_OUTPUT_CLASS =
"org.apache.pulsar.functions.api.examples.CustomBaseSerde";

public static final String EXCLAMATION_PYTHON_CLASS =
"exclamation_function.ExclamationFunction";
Expand Down

0 comments on commit 18712eb

Please sign in to comment.