Skip to content

Commit

Permalink
Fix single-quotes added to user-conf (apache#8780)
Browse files Browse the repository at this point in the history
Fixes apache#8757

### Motivation

For the config content of Go Function, different runtime types require different parameters. Under Process Runtime, we only need the json string of config content itself; for k8s runtime, since we need to use the config content object to splice the command line in the pod, we need to add single quotes to the config content for processing.

### Modifications

In this pull request, we introduced a `k8sRuntime` flag label, which is used to identify different runtime forms.
  • Loading branch information
wolfstudy authored Dec 4, 2020
1 parent 7853178 commit 03f5b54
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ public static List<String> composeCmd(InstanceConfig instanceConfig,
authConfig, shardId, grpcPort, expectedHealthCheckInterval,
logConfigFile, secretsProviderClassName, secretsProviderConfig,
installUserCodeDependencies, pythonDependencyRepository,
pythonExtraDependencyRepository, metricsPort, narExtractionDirectory, functionInstanceClassPath));
pythonExtraDependencyRepository, metricsPort, narExtractionDirectory,
functionInstanceClassPath, false));
return cmd;
}

Expand Down Expand Up @@ -117,7 +118,8 @@ public static List<String> getArgsBeforeCmd(InstanceConfig instanceConfig, Strin

public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
String originalCodeFileName,
String pulsarServiceUrl) throws IOException {
String pulsarServiceUrl,
boolean k8sRuntime) throws IOException {
final List<String> args = new LinkedList<>();
GoInstanceConfig goInstanceConfig = new GoInstanceConfig();

Expand Down Expand Up @@ -224,11 +226,13 @@ public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
ObjectMapper objectMapper = ObjectMapperFactory.getThreadLocal();
String configContent = objectMapper.writeValueAsString(goInstanceConfig);

// Nit: at present, the implementation of go function depends on pulsar-client-go,
// pulsar-client-go uses cgo, so the currently uploaded executable doesn't support cross-compilation.
args.add(originalCodeFileName);
args.add("-instance-conf");
args.add("'" + configContent + "'");
if (k8sRuntime) {
args.add("'" + configContent + "'");
} else {
args.add(configContent);
}
return args;
}

Expand All @@ -252,11 +256,12 @@ public static List<String> getCmd(InstanceConfig instanceConfig,
String pythonExtraDependencyRepository,
int metricsPort,
String narExtractionDirectory,
String functionInstanceClassPath) throws Exception {
String functionInstanceClassPath,
boolean k8sRuntime) throws Exception {
final List<String> args = new LinkedList<>();

if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.GO) {
return getGoInstanceCmd(instanceConfig, originalCodeFileName, pulsarServiceUrl);
return getGoInstanceCmd(instanceConfig, originalCodeFileName, pulsarServiceUrl, k8sRuntime);
}

if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ public class KubernetesRuntime implements Runtime {
pythonExtraDependencyRepository,
metricsPort,
narExtractionDirectory,
functinoInstanceClassPath));
functinoInstanceClassPath,
true));

doChecks(instanceConfig.getFunctionDetails(), this.jobName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
package org.apache.pulsar.functions.runtime;

import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.jose4j.json.internal.json_simple.JSONObject;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.io.IOException;
Expand Down Expand Up @@ -52,8 +54,10 @@ public void testSplitRuntimeArgs() {
Assert.assertEquals(result[2], "-Dfoo=\"bar foo\"");
}

@Test
public void getGoInstanceCmd() throws IOException {
@Test(dataProvider = "k8sRuntime")
public void getGoInstanceCmd(boolean k8sRuntime) throws IOException {
HashMap<String, String> goInstanceConfig;

InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setClusterName("kluster");
instanceConfig.setInstanceId(3000);
Expand Down Expand Up @@ -104,10 +108,12 @@ public void getGoInstanceCmd() throws IOException {

instanceConfig.setFunctionDetails(functionDetails);

List<String> commands = RuntimeUtils.getGoInstanceCmd(instanceConfig, "config", "pulsar://localhost:6650");

HashMap goInstanceConfig = new ObjectMapper().readValue(commands.get(2).replaceAll("^\'|\'$", ""), HashMap.class);

List<String> commands = RuntimeUtils.getGoInstanceCmd(instanceConfig, "config", "pulsar://localhost:6650", k8sRuntime);
if (k8sRuntime) {
goInstanceConfig = new ObjectMapper().readValue(commands.get(2).replaceAll("^\'|\'$", ""), HashMap.class);
} else {
goInstanceConfig = new ObjectMapper().readValue(commands.get(2), HashMap.class);
}
Assert.assertEquals(commands.toArray().length, 3);
Assert.assertEquals(commands.get(0), "config");
Assert.assertEquals(commands.get(1), "-instance-conf");
Expand Down Expand Up @@ -146,4 +152,16 @@ public void getGoInstanceCmd() throws IOException {
Assert.assertEquals(goInstanceConfig.get("deadLetterTopic"), "go-func-deadletter");
Assert.assertEquals(goInstanceConfig.get("userConfig"), userConfig.toString());
}

@DataProvider(name = "k8sRuntime")
public static Object[][] k8sRuntimeFlag() {
return new Object[][] {
{
true
},
{
false
}
};
}
}

0 comments on commit 03f5b54

Please sign in to comment.