diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index 842da22f176d5..43e034682a57f 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -369,6 +369,20 @@ public static void inferMissingArguments(FunctionConfig functionConfig) { private static void doJavaChecks(FunctionConfig functionConfig, ClassLoader clsLoader) { + try { + Class functionClass = clsLoader.loadClass(functionConfig.getClassName()); + + if (!org.apache.pulsar.functions.api.Function.class.isAssignableFrom(functionClass) + && !java.util.function.Function.class.isAssignableFrom(functionClass)) { + throw new IllegalArgumentException( + String.format("Function class %s does not implement the correct interface", + functionClass.getName())); + } + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException( + String.format("Function class %s must be in class path", functionConfig.getClassName()), e); + } + Class[] typeArgs; try { typeArgs = FunctionCommon.getFunctionTypes(functionConfig, clsLoader); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java index e5df566a5a18e..5f90945983e7f 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java @@ -43,6 +43,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput; @@ -109,6 +110,13 @@ public String process(String input, Context context) { } } + private static final class WrongFunction implements Consumer { + @Override + public void accept(String s) { + + } + } + private static final String tenant = "test-tenant"; private static final String namespace = "test-namespace"; private static final String function = "test-function"; @@ -438,6 +446,27 @@ public void testRegisterFunctionHttpUrl() { } } + @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function class .*. does not implement the correct interface") + public void testRegisterFunctionImplementWrongInterface() { + try { + testRegisterFunctionMissingArguments( + tenant, + namespace, + function, + mockedInputStream, + topicsToSerDeClassName, + mockedFormData, + outputTopic, + outputSerdeClassName, + WrongFunction.class.getName(), + parallelism, + null); + } catch (RestException re){ + assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST); + throw re; + } + } + private void testRegisterFunctionMissingArguments( String tenant, String namespace,