diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java index fcddf5cfb25f8..bdcab9da570a3 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java @@ -39,6 +39,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.Reflections; import org.apache.pulsar.functions.utils.Exceptions; +import org.apache.pulsar.io.core.BatchSource; import org.apache.pulsar.io.core.Sink; import org.apache.pulsar.io.core.Source; import org.apache.pulsar.io.core.annotations.FieldDoc; @@ -65,9 +66,9 @@ public static String getIOSourceClass(NarClassLoader ncl) throws IOException { try { // Try to load source class and check it implements Source interface Class sourceClass = ncl.loadClass(conf.getSourceClass()); - if (!(Source.class.isAssignableFrom(sourceClass))) { - throw new IOException("Class " + conf.getSourceClass() + " does not implement interface " - + Source.class.getName()); + if (!(Source.class.isAssignableFrom(sourceClass) || BatchSource.class.isAssignableFrom(sourceClass))) { + throw new IOException(String.format("Class %s does not implement interface %s or %s", + conf.getSourceClass(), Source.class.getName(), BatchSource.class.getName())); } } catch (Throwable t) { Exceptions.rethrowIOException(t);