Skip to content

Commit

Permalink
fix: batch source able to be submitted (apache#7659)
Browse files Browse the repository at this point in the history
* fix: batch source able to be submitted

* fix logic

Co-authored-by: Jerry Peng <[email protected]>
  • Loading branch information
jerrypeng and Jerry Peng authored Jul 25, 2020
1 parent 396eceb commit ee39e40
Showing 1 changed file with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.utils.Exceptions;
import org.apache.pulsar.io.core.BatchSource;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.annotations.FieldDoc;
Expand All @@ -65,9 +66,9 @@ public static String getIOSourceClass(NarClassLoader ncl) throws IOException {
try {
// Try to load source class and check it implements Source interface
Class sourceClass = ncl.loadClass(conf.getSourceClass());
if (!(Source.class.isAssignableFrom(sourceClass))) {
throw new IOException("Class " + conf.getSourceClass() + " does not implement interface "
+ Source.class.getName());
if (!(Source.class.isAssignableFrom(sourceClass) || BatchSource.class.isAssignableFrom(sourceClass))) {
throw new IOException(String.format("Class %s does not implement interface %s or %s",
conf.getSourceClass(), Source.class.getName(), BatchSource.class.getName()));
}
} catch (Throwable t) {
Exceptions.rethrowIOException(t);
Expand Down

0 comments on commit ee39e40

Please sign in to comment.