Skip to content

Commit

Permalink
[pulsar-functions] move metricsPort to InstanceConfig (apache#9610)
Browse files Browse the repository at this point in the history
Master Issue: apache#9177

### Motivation

As discussed in apache#9318, both @zymap and @wolfstudy suggested, to add `metricsPort` as a field of `InstanceConfig`.

### Modifications

- add metricsPort to InstanceConfig
- add hasValidMetricsPort to InstanceConfig to check if metrics port is valid
- applied changes to k8s runtime & process runtime
  • Loading branch information
freeznet authored Feb 22, 2021
1 parent 2a1828c commit 6497177
Show file tree
Hide file tree
Showing 10 changed files with 20 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class InstanceConfig {
// Whether the pulsar admin client exposed to function context, default is disabled.
@Getter
private boolean exposePulsarAdminClientEnabled = false;
private int metricsPort;

/**
* Get the string representation of {@link #getInstanceId()}.
Expand All @@ -56,4 +57,8 @@ public String getInstanceName() {
public FunctionDetails getFunctionDetails() {
return functionDetails;
}

public boolean hasValidMetricsPort() {
return metricsPort > 0 && metricsPort < 65536;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ private void startProcessMode(org.apache.pulsar.functions.proto.Function.Functio
instanceConfig.setInstanceId(i + instanceIdOffset);
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(FunctionCommon.findAvailablePort());
instanceConfig.setMetricsPort(FunctionCommon.findAvailablePort());
instanceConfig.setClusterName("local");
if (functionConfig != null) {
instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
Expand Down Expand Up @@ -499,6 +500,7 @@ private void startThreadedMode(org.apache.pulsar.functions.proto.Function.Functi
instanceConfig.setInstanceId(i + instanceIdOffset);
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(FunctionCommon.findAvailablePort());
instanceConfig.setMetricsPort(FunctionCommon.findAvailablePort());
instanceConfig.setClusterName("local");
if (functionConfig != null) {
instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ public void start(String[] args, ClassLoader functionInstanceClassLoader, ClassL
Function.FunctionDetails functionDetails = functionDetailsBuilder.build();
instanceConfig.setFunctionDetails(functionDetails);
instanceConfig.setPort(port);
instanceConfig.setMetricsPort(metrics_port);

Map<String, String> secretsProviderConfigMap = null;
if (!StringUtils.isEmpty(secretsProviderConfig)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public static List<String> composeCmd(InstanceConfig instanceConfig,
Boolean installUserCodeDependencies,
String pythonDependencyRepository,
String pythonExtraDependencyRepository,
int metricsPort,
String narExtractionDirectory,
String functionInstanceClassPath,
String pulsarWebServiceUrl) throws Exception {
Expand All @@ -82,7 +81,7 @@ public static List<String> composeCmd(InstanceConfig instanceConfig,
authConfig, shardId, grpcPort, expectedHealthCheckInterval,
logConfigFile, secretsProviderClassName, secretsProviderConfig,
installUserCodeDependencies, pythonDependencyRepository,
pythonExtraDependencyRepository, metricsPort, narExtractionDirectory,
pythonExtraDependencyRepository, narExtractionDirectory,
functionInstanceClassPath, false, pulsarWebServiceUrl));
return cmd;
}
Expand Down Expand Up @@ -120,8 +119,7 @@ public static List<String> getArgsBeforeCmd(InstanceConfig instanceConfig, Strin
public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
String originalCodeFileName,
String pulsarServiceUrl,
boolean k8sRuntime,
int metricsPort) throws IOException {
boolean k8sRuntime) throws IOException {
final List<String> args = new LinkedList<>();
GoInstanceConfig goInstanceConfig = new GoInstanceConfig();

Expand Down Expand Up @@ -221,8 +219,8 @@ public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
goInstanceConfig.setMaxMessageRetries(instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
}

if (metricsPort > 0 && metricsPort < 65536) {
goInstanceConfig.setMetricsPort(metricsPort);
if (instanceConfig.hasValidMetricsPort()) {
goInstanceConfig.setMetricsPort(instanceConfig.getMetricsPort());
}

goInstanceConfig.setKillAfterIdleMs(0);
Expand Down Expand Up @@ -260,15 +258,14 @@ public static List<String> getCmd(InstanceConfig instanceConfig,
Boolean installUserCodeDependencies,
String pythonDependencyRepository,
String pythonExtraDependencyRepository,
int metricsPort,
String narExtractionDirectory,
String functionInstanceClassPath,
boolean k8sRuntime,
String pulsarWebServiceUrl) throws Exception {
final List<String> args = new LinkedList<>();

if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.GO) {
return getGoInstanceCmd(instanceConfig, originalCodeFileName, pulsarServiceUrl, k8sRuntime, metricsPort);
return getGoInstanceCmd(instanceConfig, originalCodeFileName, pulsarServiceUrl, k8sRuntime);
}

if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
Expand Down Expand Up @@ -398,7 +395,7 @@ && isNotBlank(authConfig.getClientAuthenticationParameters())) {
args.add(String.valueOf(grpcPort));

args.add("--metrics_port");
args.add(String.valueOf(metricsPort));
args.add(String.valueOf(instanceConfig.getMetricsPort()));

// state storage configs
if (null != stateStorageServiceUrl) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ public class KubernetesRuntime implements Runtime {
Optional<KubernetesFunctionAuthProvider> functionAuthDataCacheProvider,
boolean authenticationEnabled,
Integer grpcPort,
Integer metricsPort,
String narExtractionDirectory,
Optional<KubernetesManifestCustomizer> manifestCustomizer,
String functinoInstanceClassPath,
Expand Down Expand Up @@ -238,7 +237,7 @@ public class KubernetesRuntime implements Runtime {
this.functionAuthDataCacheProvider = functionAuthDataCacheProvider;

this.grpcPort = grpcPort;
this.metricsPort = metricsPort;
this.metricsPort = instanceConfig.hasValidMetricsPort() ? instanceConfig.getMetricsPort() : null;
this.narExtractionDirectory = narExtractionDirectory;

this.processArgs = new LinkedList<>();
Expand Down Expand Up @@ -275,7 +274,6 @@ public class KubernetesRuntime implements Runtime {
installUserCodeDependencies,
pythonDependencyRepository,
pythonExtraDependencyRepository,
metricsPort,
narExtractionDirectory,
functinoInstanceClassPath,
true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String c
authProvider,
authenticationEnabled,
grpcPort,
metricsPort,
narExtractionDirectory,
manifestCustomizer,
functionInstanceClassPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class ProcessRuntime implements Runtime {
String pulsarWebServiceUrl) throws Exception {
this.instanceConfig = instanceConfig;
this.instancePort = instanceConfig.getPort();
this.metricsPort = FunctionCommon.findAvailablePort();
this.metricsPort = instanceConfig.getMetricsPort();
this.expectedHealthCheckInterval = expectedHealthCheckInterval;
this.secretsProviderConfigurator = secretsProviderConfigurator;
this.funcLogDir = RuntimeUtils.genFunctionLogFolder(logDirectory, instanceConfig);
Expand Down Expand Up @@ -134,7 +134,6 @@ class ProcessRuntime implements Runtime {
false,
null,
null,
this.metricsPort,
narExtractionDirectory,
null,
pulsarWebServiceUrl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public void getGoInstanceCmd(boolean k8sRuntime) throws IOException {
instanceConfig.setFunctionVersion("1.0.0");
instanceConfig.setMaxBufferedTuples(5);
instanceConfig.setPort(1337);
instanceConfig.setMetricsPort(60000);


JSONObject userConfig = new JSONObject();
Expand Down Expand Up @@ -108,7 +109,7 @@ public void getGoInstanceCmd(boolean k8sRuntime) throws IOException {

instanceConfig.setFunctionDetails(functionDetails);

List<String> commands = RuntimeUtils.getGoInstanceCmd(instanceConfig, "config", "pulsar://localhost:6650", k8sRuntime, 60000);
List<String> commands = RuntimeUtils.getGoInstanceCmd(instanceConfig, "config", "pulsar://localhost:6650", k8sRuntime);
if (k8sRuntime) {
goInstanceConfig = new ObjectMapper().readValue(commands.get(2).replaceAll("^\'|\'$", ""), HashMap.class);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,7 @@ InstanceConfig createGolangInstanceConfig() {
config.setInstanceId(0);
config.setMaxBufferedTuples(1024);
config.setClusterName("standalone");
config.setMetricsPort(4331);

return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ InstanceConfig createInstanceConfig(FunctionDetails functionDetails, Function.Fu
instanceConfig.setInstanceId(instanceId);
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(FunctionCommon.findAvailablePort());
instanceConfig.setMetricsPort(FunctionCommon.findAvailablePort());
instanceConfig.setClusterName(clusterName);
instanceConfig.setFunctionAuthenticationSpec(functionAuthSpec);
instanceConfig.setMaxPendingAsyncRequests(workerConfig.getMaxPendingAsyncRequests());
Expand Down

0 comments on commit 6497177

Please sign in to comment.