Skip to content

Commit

Permalink
[Issue-10109] [admin client] Add --batch-source-config switch to the …
Browse files Browse the repository at this point in the history
…Pulsar Admin Source API (apache#10593)

Co-authored-by: David Kjerrumgaard <[email protected]>
  • Loading branch information
david-streamlio and David Kjerrumgaard authored May 25, 2021
1 parent a244ed3 commit 85effc4
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 1 deletion.
6 changes: 6 additions & 0 deletions pulsar-client-tools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-io-batch-discovery-triggerers</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.io.BatchSourceConfig;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.io.SourceConfig;
Expand Down Expand Up @@ -315,6 +316,8 @@ abstract class SourceDetailsCommand extends BaseCommand {
protected String DEPRECATED_sourceConfigString;
@Parameter(names = "--source-config", description = "Source config key/values")
protected String sourceConfigString;
@Parameter(names = "--batch-source-config", description = "Batch source config key/values")
protected String batchSourceConfigString;
@Parameter(names = "--custom-runtime-options", description = "A string that encodes options to customize the runtime, see docs for configured runtime for details")
protected String customRuntimeOptions;

Expand Down Expand Up @@ -417,6 +420,10 @@ void processArguments() throws Exception {
if (null != sourceConfigString) {
sourceConfig.setConfigs(parseConfigs(sourceConfigString));
}

if (null != batchSourceConfigString) {
sourceConfig.setBatchSourceConfig(parseBatchSourceConfigs(batchSourceConfigString));
}

if (customRuntimeOptions != null) {
sourceConfig.setCustomRuntimeOptions(customRuntimeOptions);
Expand All @@ -427,7 +434,11 @@ void processArguments() throws Exception {

protected Map<String, Object> parseConfigs(String str) {
Type type = new TypeToken<Map<String, Object>>(){}.getType();
return new Gson().fromJson(str, type);
return new Gson().fromJson(str, type);
}

protected BatchSourceConfig parseBatchSourceConfigs(String str) {
return new Gson().fromJson(str, BatchSourceConfig.class);
}

protected void validateSourceConfigs(SourceConfig sourceConfig) {
Expand All @@ -444,6 +455,35 @@ protected void validateSourceConfigs(SourceConfig sourceConfig) {
if (isBlank(sourceConfig.getName())) {
throw new IllegalArgumentException("Source name not specified");
}

if (sourceConfig.getBatchSourceConfig() != null) {
validateBatchSourceConfigs(sourceConfig.getBatchSourceConfig());
}
}

protected void validateBatchSourceConfigs(BatchSourceConfig batchSourceConfig) {
if (isBlank(batchSourceConfig.getDiscoveryTriggererClassName())) {
throw new IllegalArgumentException("Discovery Triggerer not specified");
}

boolean isBatchSourceTriggerer = false;

try {
Class<?>[] interfaces = Class.forName(batchSourceConfig.getDiscoveryTriggererClassName()).getInterfaces();
int idx = 0;

while (idx < interfaces.length && !isBatchSourceTriggerer) {
isBatchSourceTriggerer = interfaces[idx++].getName().equals("org.apache.pulsar.io.core.BatchSourceTriggerer");
}

if (!isBatchSourceTriggerer) {
throw new IllegalArgumentException("Invalid Discovery Triggerer specified");
}

} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Invalid Discovery Triggerer specified");
}

}

protected String validateSourceType(String sourceType) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,16 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.HashMap;
import java.util.Map;

import org.apache.pulsar.admin.cli.utils.CmdUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.Sources;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.io.BatchSourceConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.powermock.api.mockito.PowerMockito;
Expand Down Expand Up @@ -74,6 +78,8 @@ public IObjectFactory getObjectFactory() {
private static final Long RAM = 1024L * 1024L;
private static final Long DISK = 1024L * 1024L * 1024L;
private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 +0000 2018\"}";
private static final String BATCH_SOURCE_CONFIG_STRING = "{ \"discoveryTriggererClassName\" : \"org.apache.pulsar.io.batchdiscovery.CronTriggerer\","
+ "\"discoveryTriggererConfig\": {\"cron\": \"5 0 0 0 0 *\"} }";

private PulsarAdmin pulsarAdmin;
private Sources source;
Expand Down Expand Up @@ -133,6 +139,10 @@ public SourceConfig getSourceConfig() {
sourceConfig.setConfigs(createSource.parseConfigs(SINK_CONFIG_STRING));
return sourceConfig;
}

public BatchSourceConfig getBatchSourceConfig() {
return createSource.parseBatchSourceConfigs(BATCH_SOURCE_CONFIG_STRING);
}

@Test
public void testCliCorrect() throws Exception {
Expand Down Expand Up @@ -390,6 +400,61 @@ public void testCmdSourceConfigFileInvalidJar() throws Exception {
testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
}

@Test
public void testBatchSourceConfigCorrect() throws Exception {
SourceConfig testSourceConfig = getSourceConfig();
testSourceConfig.setBatchSourceConfig(getBatchSourceConfig());

SourceConfig expectedSourceConfig = getSourceConfig();
expectedSourceConfig.setBatchSourceConfig(getBatchSourceConfig());
testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
}

/*
* Test where the DiscoveryTriggererClassName is null
*/
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Discovery Triggerer not specified")
public void testBatchSourceConfigMissingDiscoveryTriggererClassName() throws Exception {
SourceConfig testSourceConfig = getSourceConfig();
BatchSourceConfig batchSourceConfig = getBatchSourceConfig();
batchSourceConfig.setDiscoveryTriggererClassName(null);
testSourceConfig.setBatchSourceConfig(batchSourceConfig);

SourceConfig expectedSourceConfig = getSourceConfig();
expectedSourceConfig.setBatchSourceConfig(batchSourceConfig);
testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
}

/*
* Test where the class name does not implement the BatchSourceTriggerer interface
*/
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid Discovery Triggerer specified")
public void testBatchSourceConfigInvalidDiscoveryTriggererClassName() throws Exception {
SourceConfig testSourceConfig = getSourceConfig();
BatchSourceConfig batchSourceConfig = getBatchSourceConfig();
batchSourceConfig.setDiscoveryTriggererClassName("java.lang.String");
testSourceConfig.setBatchSourceConfig(batchSourceConfig);

SourceConfig expectedSourceConfig = getSourceConfig();
expectedSourceConfig.setBatchSourceConfig(batchSourceConfig);
testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
}

/*
* Test where the class name provided doesn't exist
*/
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid Discovery Triggerer specified")
public void testBatchSourceConfigDiscoveryTriggererClassNotFound() throws Exception {
SourceConfig testSourceConfig = getSourceConfig();
BatchSourceConfig batchSourceConfig = getBatchSourceConfig();
batchSourceConfig.setDiscoveryTriggererClassName("com.foo.Bar");
testSourceConfig.setBatchSourceConfig(batchSourceConfig);

SourceConfig expectedSourceConfig = getSourceConfig();
expectedSourceConfig.setBatchSourceConfig(batchSourceConfig);
testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
}

public void testCmdSourceConfigFile(SourceConfig testSourceConfig, SourceConfig expectedSourceConfig) throws Exception {

File file = Files.createTempFile("", "").toFile();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.io;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;

import org.testng.annotations.Test;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class BatchSourceConfigParseTest {

private ObjectMapper objectMapper = new ObjectMapper();

@Test
public final void ImmediateTriggererTest() throws JsonMappingException, JsonProcessingException {
String json = "{ \"discoveryTriggererClassName\" : \"org.apache.pulsar.io.batchdiscovery.ImmediateTriggerer\" }";
BatchSourceConfig config = objectMapper.readValue(json, BatchSourceConfig.class);
assertNotNull(config);
assertEquals(config.getDiscoveryTriggererClassName(), "org.apache.pulsar.io.batchdiscovery.ImmediateTriggerer");
}

@Test
public final void CronTriggererTest() throws JsonMappingException, JsonProcessingException {
String json = "{ \"discoveryTriggererClassName\" : \"org.apache.pulsar.io.batchdiscovery.CronTriggerer\","
+ "\"discoveryTriggererConfig\": {\"cron\": \"5 0 0 0 0 *\"} }";
BatchSourceConfig config = objectMapper.readValue(json, BatchSourceConfig.class);
assertNotNull(config);
assertEquals(config.getDiscoveryTriggererClassName(), "org.apache.pulsar.io.batchdiscovery.CronTriggerer");
assertNotNull(config.getDiscoveryTriggererConfig());
assertEquals(config.getDiscoveryTriggererConfig().size(), 1);
assertEquals(config.getDiscoveryTriggererConfig().get("cron"), "5 0 0 0 0 *");
}
}
2 changes: 2 additions & 0 deletions site2/website/versioned_docs/version-2.7.2/io-cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ $ pulsar-admin sources create options
|Flag|Description|
|----|---|
| `-a`, `--archive` | The path to the NAR archive for the source. <br> It also supports url-path (http/https/file [file protocol assumes that file already exists on worker host]) from which worker can download the package.
| `--batch-source-config` | BatchSource configuration key/values pairs provided as a JSON string, e.g., { "discoveryTriggererClassName" : "org.apache.pulsar.io.batchdiscovery.CronTriggerer", "discoveryTriggererConfig": {"cron": "*/5 * * * *"} }
| `--classname` | The source's class name if `archive` is file-url-path (file://).
| `--cpu` | The CPU (in cores) that needs to be allocated per source instance (applicable only to Docker runtime).
| `--deserialization-classname` | The SerDe classname for the source.
Expand Down Expand Up @@ -89,6 +90,7 @@ $ pulsar-admin sources update options
|Flag|Description|
|----|---|
| `-a`, `--archive` | The path to the NAR archive for the source. <br> It also supports url-path (http/https/file [file protocol assumes that file already exists on worker host]) from which worker can download the package.
| `--batch-source-config` | BatchSource configuration key/values pairs provided as a JSON string, e.g., { "discoveryTriggererClassName" : "org.apache.pulsar.io.batchdiscovery.CronTriggerer", "discoveryTriggererConfig": {"cron": "*/5 * * * *"} }
| `--classname` | The source's class name if `archive` is file-url-path (file://).
| `--cpu` | The CPU (in cores) that needs to be allocated per source instance (applicable only to Docker runtime).
| `--deserialization-classname` | The SerDe classname for the source.
Expand Down

0 comments on commit 85effc4

Please sign in to comment.