diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml
index 34089191ddee8..b2f66085ee658 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -55,6 +55,7 @@
+
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 337847e975a12..9b7a206346e30 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -477,25 +477,6 @@ protected void validateBatchSourceConfigs(BatchSourceConfig batchSourceConfig) {
if (isBlank(batchSourceConfig.getDiscoveryTriggererClassName())) {
throw new IllegalArgumentException("Discovery Triggerer not specified");
}
-
- boolean isBatchSourceTriggerer = false;
-
- try {
- Class>[] interfaces = Class.forName(batchSourceConfig.getDiscoveryTriggererClassName()).getInterfaces();
- int idx = 0;
-
- while (idx < interfaces.length && !isBatchSourceTriggerer) {
- isBatchSourceTriggerer = interfaces[idx++].getName().equals("org.apache.pulsar.io.core.BatchSourceTriggerer");
- }
-
- if (!isBatchSourceTriggerer) {
- throw new IllegalArgumentException("Invalid Discovery Triggerer specified");
- }
-
- } catch (ClassNotFoundException e) {
- throw new IllegalArgumentException("Invalid Discovery Triggerer specified");
- }
-
}
protected String validateSourceType(String sourceType) throws IOException {
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 13e0ff0214f5f..5b4d906b81b75 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -426,36 +426,6 @@ public void testBatchSourceConfigMissingDiscoveryTriggererClassName() throws Exc
testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
}
- /*
- * Test where the class name does not implement the BatchSourceTriggerer interface
- */
- @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid Discovery Triggerer specified")
- public void testBatchSourceConfigInvalidDiscoveryTriggererClassName() throws Exception {
- SourceConfig testSourceConfig = getSourceConfig();
- BatchSourceConfig batchSourceConfig = getBatchSourceConfig();
- batchSourceConfig.setDiscoveryTriggererClassName("java.lang.String");
- testSourceConfig.setBatchSourceConfig(batchSourceConfig);
-
- SourceConfig expectedSourceConfig = getSourceConfig();
- expectedSourceConfig.setBatchSourceConfig(batchSourceConfig);
- testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
- }
-
- /*
- * Test where the class name provided doesn't exist
- */
- @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid Discovery Triggerer specified")
- public void testBatchSourceConfigDiscoveryTriggererClassNotFound() throws Exception {
- SourceConfig testSourceConfig = getSourceConfig();
- BatchSourceConfig batchSourceConfig = getBatchSourceConfig();
- batchSourceConfig.setDiscoveryTriggererClassName("com.foo.Bar");
- testSourceConfig.setBatchSourceConfig(batchSourceConfig);
-
- SourceConfig expectedSourceConfig = getSourceConfig();
- expectedSourceConfig.setBatchSourceConfig(batchSourceConfig);
- testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
- }
-
public void testCmdSourceConfigFile(SourceConfig testSourceConfig, SourceConfig expectedSourceConfig) throws Exception {
File file = Files.createTempFile("", "").toFile();
diff --git a/tests/docker-images/latest-version-image/Dockerfile b/tests/docker-images/latest-version-image/Dockerfile
index f0022cb83298e..720bcdf414a33 100644
--- a/tests/docker-images/latest-version-image/Dockerfile
+++ b/tests/docker-images/latest-version-image/Dockerfile
@@ -105,5 +105,6 @@ COPY --from=pulsar-all /pulsar/connectors/pulsar-io-hdfs*.nar /pulsar/connectors
COPY --from=pulsar-all /pulsar/connectors/pulsar-io-jdbc-postgres-*.nar /pulsar/connectors/
COPY --from=pulsar-all /pulsar/connectors/pulsar-io-kafka-*.nar /pulsar/connectors/
COPY --from=pulsar-all /pulsar/connectors/pulsar-io-rabbitmq-*.nar /pulsar/connectors/
+COPY --from=pulsar-all /pulsar/connectors/pulsar-io-batch-data-generator-*.nar /pulsar/connectors/
CMD bash
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/BatchSourceTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/BatchSourceTest.java
new file mode 100644
index 0000000000000..8ed9e5445420c
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/BatchSourceTest.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io.sources;
+
+import lombok.Cleanup;
+import lombok.Data;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.tests.integration.containers.StandaloneContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR;
+import static org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * This tests verifies that a batch source can be successfully submitted and run via the pulsar-admin CLI
+ */
+@Slf4j
+public class BatchSourceTest extends PulsarStandaloneTestSuite {
+
+ private static final String BATCH_CONFIG = "{\"discoveryTriggererConfig\": {\"__CRON__\": \"* * * * * *\"}, " +
+ "\"discoveryTriggererClassName\": \"org.apache.pulsar.io.batchdiscovery.CronTriggerer\"}";
+
+ @Test(groups = {"source"})
+ public void testGenericRecordSource() throws Exception {
+ String outputTopicName = "test-state-source-output-" + randomName(8);
+ String sourceName = "test-state-source-" + randomName(8);
+ int numMessages = 10;
+ try {
+ submitSourceConnector(
+ sourceName,
+ outputTopicName,
+ "builtin://batch-data-generator");
+
+ // get source info
+ getSourceInfoSuccess(container, sourceName);
+
+ // get source status
+ getSourceStatus(container, sourceName);
+
+ try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
+
+ retryStrategically((test) -> {
+ try {
+ SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
+ return status.getInstances().size() > 0
+ && status.getInstances().get(0).getStatus().numWritten >= 10;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 10, 200);
+
+ SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
+ assertEquals(status.getInstances().size(), 1);
+ assertTrue(status.getInstances().get(0).getStatus().numWritten >= 10);
+ }
+
+ consumeMessages(container, outputTopicName, numMessages);
+
+ // delete source
+ deleteSource(container, sourceName);
+
+ getSourceInfoNotFound(container, sourceName);
+ } finally {
+ dumpFunctionLogs(sourceName);
+ }
+
+ }
+
+ private void submitSourceConnector(String sourceName,
+ String outputTopicName,
+ String archive) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "sources", "create",
+ "--name", sourceName,
+ "--destinationTopicName", outputTopicName,
+ "--archive", archive,
+ "--batch-source-config", BATCH_CONFIG
+ };
+ log.info("Run command : {}", StringUtils.join(commands, ' '));
+ ContainerExecResult result = container.execCmd(commands);
+ assertTrue(
+ result.getStdout().contains("\"Created successfully\""),
+ result.getStdout());
+ }
+
+ private static void getSourceInfoSuccess(StandaloneContainer container, String sourceName) throws Exception {
+ ContainerExecResult result = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sources",
+ "get",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sourceName
+ );
+ assertTrue(result.getStdout().contains("\"name\": \"" + sourceName + "\""));
+ }
+
+ private static void getSourceStatus(StandaloneContainer container,String sourceName) throws Exception {
+ retryStrategically((test) -> {
+ try {
+ ContainerExecResult result = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sources",
+ "status",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sourceName);
+
+ if (result.getStdout().contains("\"running\" : true")) {
+ return true;
+ }
+ return false;
+ } catch (Exception e) {
+ log.error("Encountered error when getting source status", e);
+ return false;
+ }
+ }, 10, 200);
+
+ ContainerExecResult result = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sources",
+ "status",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sourceName);
+
+ Assert.assertTrue(result.getStdout().contains("\"running\" : true"));
+ }
+
+ private static void consumeMessages(StandaloneContainer container, String outputTopic,
+ int numMessages) throws Exception {
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(container.getPlainTextServiceUrl())
+ .build();
+
+ // read using Pulsar GenericRecord abstraction
+ @Cleanup
+ Consumer consumer = client.newConsumer(Schema.AUTO_CONSUME())
+ .topic(outputTopic)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionName("test-sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .startMessageIdInclusive()
+ .subscribe();
+
+ for (int i = 0; i < numMessages; i++) {
+ Message msg = consumer.receive(10, TimeUnit.SECONDS);
+ if (msg == null) {
+ fail("message "+i+" not received in time");
+ return;
+ }
+ log.info("received {}", msg.getValue());
+ msg.getValue().getFields().forEach( f -> {
+ log.info("field {} {}", f, msg.getValue().getField(f));
+ });
+ }
+ }
+
+ private static void deleteSource(StandaloneContainer container, String sourceName) throws Exception {
+ ContainerExecResult result = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sources",
+ "delete",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sourceName
+ );
+ assertTrue(result.getStdout().contains("Delete source successfully"));
+ assertTrue(result.getStderr().isEmpty());
+ }
+
+ private static void getSourceInfoNotFound(StandaloneContainer container, String sourceName) throws Exception {
+ try {
+ container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sources",
+ "get",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sourceName);
+ fail("Command should have exited with non-zero");
+ } catch (ContainerExecException e) {
+ assertTrue(e.getResult().getStderr().contains("Reason: Source " + sourceName + " doesn't exist"));
+ }
+ }
+
+}
\ No newline at end of file