Skip to content

Commit

Permalink
apache#10882 use ObjectMapper to parse Sink/Source configs (apache#10883
Browse files Browse the repository at this point in the history
)

Fixes apache#10882

### Motivation

CmdSink and CmdSource uses `gson` to parse the JSON configs from pulsar-admin. But most of connectors are using ObjectMapper to serde the config into actual class. `gson` will also convert int/long value into float by default, which will lead ObjectMapper cannot parse float string into int/long correctlly.
 
### Modifications

use ObjectMapper to parse sink/source config.
  • Loading branch information
freeznet authored Jun 10, 2021
1 parent b1b3b3c commit 2c9ea81
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.beust.jcommander.converters.StringConverter;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
Expand Down Expand Up @@ -55,6 +58,7 @@
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.util.ObjectMapperFactory;

@Getter
@Parameters(commandDescription = "Interface for managing Pulsar IO sinks (egress data from Pulsar)")
Expand Down Expand Up @@ -463,8 +467,12 @@ void processArguments() throws Exception {
sinkConfig.setResources(resources);
}

if (null != sinkConfigString) {
sinkConfig.setConfigs(parseConfigs(sinkConfigString));
try {
if (null != sinkConfigString) {
sinkConfig.setConfigs(parseConfigs(sinkConfigString));
}
} catch (Exception ex) {
throw new ParameterException("Cannot parse sink-config", ex);
}

if (autoAck != null) {
Expand All @@ -485,9 +493,12 @@ void processArguments() throws Exception {
validateSinkConfigs(sinkConfig);
}

protected Map<String, Object> parseConfigs(String str) {
Type type = new TypeToken<Map<String, Object>>(){}.getType();
return new Gson().fromJson(str, type);
protected Map<String, Object> parseConfigs(String str) throws JsonProcessingException {
ObjectMapper mapper = ObjectMapperFactory.getThreadLocal();
TypeReference<HashMap<String,Object>> typeRef
= new TypeReference<HashMap<String,Object>>() {};

return mapper.readValue(str, typeRef);
}

protected void validateSinkConfigs(SinkConfig sinkConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.beust.jcommander.converters.StringConverter;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
Expand All @@ -35,6 +38,7 @@
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand All @@ -58,6 +62,7 @@
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.util.ObjectMapperFactory;

@Getter
@Parameters(commandDescription = "Interface for managing Pulsar IO Sources (ingress data into Pulsar)")
Expand Down Expand Up @@ -417,8 +422,12 @@ void processArguments() throws Exception {
sourceConfig.setResources(resources);
}

if (null != sourceConfigString) {
sourceConfig.setConfigs(parseConfigs(sourceConfigString));
try {
if (null != sourceConfigString) {
sourceConfig.setConfigs(parseConfigs(sourceConfigString));
}
} catch (Exception ex) {
throw new ParameterException("Cannot parse source-config", ex);
}

if (null != batchSourceConfigString) {
Expand All @@ -432,12 +441,15 @@ void processArguments() throws Exception {
validateSourceConfigs(sourceConfig);
}

protected Map<String, Object> parseConfigs(String str) {
Type type = new TypeToken<Map<String, Object>>(){}.getType();
return new Gson().fromJson(str, type);
protected Map<String, Object> parseConfigs(String str) throws JsonProcessingException {
ObjectMapper mapper = ObjectMapperFactory.getThreadLocal();
TypeReference<HashMap<String,Object>> typeRef
= new TypeReference<HashMap<String,Object>>() {};

return mapper.readValue(str, typeRef);
}
protected BatchSourceConfig parseBatchSourceConfigs(String str) {

protected BatchSourceConfig parseBatchSourceConfigs(String str) {
return new Gson().fromJson(str, BatchSourceConfig.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.powermock.api.mockito.PowerMockito.mockStatic;

import com.beust.jcommander.ParameterException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
import java.io.Closeable;
import java.io.File;
Expand Down Expand Up @@ -94,7 +95,7 @@ public IObjectFactory getObjectFactory() {
private static final Double CPU = 100.0;
private static final Long RAM = 1024L * 1024L;
private static final Long DISK = 1024L * 1024L * 1024L;
private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 0000 2018\"}";
private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 +0000 2018\",\"int\":1000,\"int_string\":\"1000\",\"float\":1000.0,\"float_string\":\"1000.0\"}";

private PulsarAdmin pulsarAdmin;
private Sinks sink;
Expand Down Expand Up @@ -144,7 +145,7 @@ public void cleanup() throws IOException {
}
}

public SinkConfig getSinkConfig() {
public SinkConfig getSinkConfig() throws JsonProcessingException {
SinkConfig sinkConfig = new SinkConfig();
sinkConfig.setTenant(TENANT);
sinkConfig.setNamespace(NAMESPACE);
Expand Down Expand Up @@ -738,4 +739,15 @@ public void testUpdateSink() throws Exception {


}

@Test
public void testParseConfigs() throws Exception {
SinkConfig testSinkConfig = getSinkConfig();
Map<String, Object> config = testSinkConfig.getConfigs();
Assert.assertEquals(config.get("int"), 1000);
Assert.assertEquals(config.get("int_string"), "1000");
Assert.assertEquals(config.get("float"), 1000.0);
Assert.assertEquals(config.get("float_string"), "1000.0");
Assert.assertEquals(config.get("created_at"), "Mon Jul 02 00:33:15 +0000 2018");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import static org.powermock.api.mockito.PowerMockito.mockStatic;

import com.beust.jcommander.ParameterException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Map;

import org.apache.pulsar.admin.cli.utils.CmdUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
Expand Down Expand Up @@ -75,7 +77,8 @@ public IObjectFactory getObjectFactory() {
private static final Double CPU = 100.0;
private static final Long RAM = 1024L * 1024L;
private static final Long DISK = 1024L * 1024L * 1024L;
private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 +0000 2018\"}";
private static final String SINK_CONFIG_STRING =
"{\"created_at\":\"Mon Jul 02 00:33:15 +0000 2018\",\"int\":1000,\"int_string\":\"1000\",\"float\":1000.0,\"float_string\":\"1000.0\"}";
private static final String BATCH_SOURCE_CONFIG_STRING = "{ \"discoveryTriggererClassName\" : \"org.apache.pulsar.io.batchdiscovery.CronTriggerer\","
+ "\"discoveryTriggererConfig\": {\"cron\": \"5 0 0 0 0 *\"} }";

Expand Down Expand Up @@ -122,7 +125,7 @@ public void cleanup() throws IOException {
}
}

public SourceConfig getSourceConfig() {
public SourceConfig getSourceConfig() throws JsonProcessingException {
SourceConfig sourceConfig = new SourceConfig();
sourceConfig.setTenant(TENANT);
sourceConfig.setNamespace(NAMESPACE);
Expand Down Expand Up @@ -690,4 +693,15 @@ public void testUpdateSource() throws Exception {


}

@Test
public void testParseConfigs() throws Exception {
SourceConfig testSourceConfig = getSourceConfig();
Map<String, Object> config = testSourceConfig.getConfigs();
Assert.assertEquals(config.get("int"), 1000);
Assert.assertEquals(config.get("int_string"), "1000");
Assert.assertEquals(config.get("float"), 1000.0);
Assert.assertEquals(config.get("float_string"), "1000.0");
Assert.assertEquals(config.get("created_at"), "Mon Jul 02 00:33:15 +0000 2018");
}
}

0 comments on commit 2c9ea81

Please sign in to comment.