Skip to content

Commit

Permalink
Allow resource overcommitting when running functions in Kubernetes (a…
Browse files Browse the repository at this point in the history
…pache#4829)

### Motivation

Currently, when running Pulsar Functions, Sources, and Sinks in Kubernetes. The resources requests and resource limits are set to the same values.  While this is ok and everything will run as it should, actual resource utilization in the cluster might be low.   To increase actual resource utilization, we need to be able to overcommit a certain amount in our clusters
  • Loading branch information
jerrypeng authored and sijie committed Aug 5, 2019
1 parent f6fee1c commit c7b2cb3
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 12 deletions.
6 changes: 6 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ processContainerFactory:
# extraFunctionDependenciesDir:
# # Additional memory padding added on top of the memory requested by the function per on a per instance basis
# percentMemoryPadding: 10
# # The ratio cpu request and cpu limit to be set for a function/source/sink.
# # The formula for cpu request is cpuRequest = userRequestCpu / cpuOverCommitRatio
# cpuOverCommitRatio: 1.0
# # The ratio memory request and memory limit to be set for a function/source/sink.
# # The formula for memory request is memoryRequest = userRequestMemory / memoryOverCommitRatio
# memoryOverCommitRatio: 1.0

## A set of the minimum amount of resources functions must request.
## Support for this depends on function runtime.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
import static org.apache.pulsar.functions.utils.FunctionCommon.roundDecimal;

/**
* Kubernetes based runtime for running functions.
Expand Down Expand Up @@ -135,6 +136,8 @@ public class KubernetesRuntime implements Runtime {
private final String pulsarAdminUrl;
private final SecretsProviderConfigurator secretsProviderConfigurator;
private int percentMemoryPadding;
private double cpuOverCommitRatio;
private double memoryOverCommitRatio;
private final KubernetesFunctionAuthProvider functionAuthDataCacheProvider;
private final AuthenticationConfig authConfig;

Expand All @@ -161,6 +164,8 @@ public class KubernetesRuntime implements Runtime {
SecretsProviderConfigurator secretsProviderConfigurator,
Integer expectedMetricsCollectionInterval,
int percentMemoryPadding,
double cpuOverCommitRatio,
double memoryOverCommitRatio,
KubernetesFunctionAuthProvider functionAuthDataCacheProvider,
boolean authenticationEnabled) throws Exception {
this.appsClient = appsClient;
Expand All @@ -176,6 +181,8 @@ public class KubernetesRuntime implements Runtime {
this.pulsarAdminUrl = pulsarAdminUrl;
this.secretsProviderConfigurator = secretsProviderConfigurator;
this.percentMemoryPadding = percentMemoryPadding;
this.cpuOverCommitRatio = cpuOverCommitRatio;
this.memoryOverCommitRatio = memoryOverCommitRatio;
this.authenticationEnabled = authenticationEnabled;
String logConfigFile = null;
String secretsProviderClassName = secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails());
Expand Down Expand Up @@ -945,18 +952,33 @@ V1Container getFunctionContainer(List<String> instanceCommand, Function.Resource

// set container resources
final V1ResourceRequirements resourceRequirements = new V1ResourceRequirements();
final Map<String, Quantity> requests = new HashMap<>();
final Map<String, Quantity> resourceLimit = new HashMap<>();
final Map<String, Quantity> resourceRequest = new HashMap<>();


long ram = resource != null && resource.getRam() != 0 ? resource.getRam() : 1073741824;

// add memory padding
long padding = Math.round(ram * (percentMemoryPadding / 100.0));
long ramWithPadding = ram + padding;
long ramRequest = (long) (ramWithPadding / memoryOverCommitRatio);

// set resource limits
double cpuLimit = resource != null && resource.getCpu() != 0 ? resource.getCpu() : 1;
// for cpu overcommiting
double cpuRequest = cpuLimit / cpuOverCommitRatio;

// round cpu to 3 decimal places as it is the finest cpu precision allowed
resourceLimit.put("cpu", Quantity.fromString(Double.toString(roundDecimal(cpuLimit, 3))));
resourceLimit.put("memory", Quantity.fromString(Long.toString(ramWithPadding)));

// set resource requests
// round cpu to 3 decimal places as it is the finest cpu precision allowed
resourceRequest.put("cpu", Quantity.fromString(Double.toString(roundDecimal(cpuRequest, 3))));
resourceRequest.put("memory", Quantity.fromString(Long.toString(ramRequest)));

requests.put("memory", Quantity.fromString(Long.toString(ramWithPadding)));
requests.put("cpu", Quantity.fromString(Double.toString(resource != null && resource.getCpu() != 0 ? resource.getCpu() : 1)));
resourceRequirements.setRequests(requests);
resourceRequirements.setLimits(requests);
resourceRequirements.setRequests(resourceRequest);
resourceRequirements.setLimits(resourceLimit);
container.setResources(resourceRequirements);

// set container ports
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class KubernetesInfo {
private String changeConfigMap;
private String changeConfigMapNamespace;
private int percentMemoryPadding;
private double cpuOverCommitRatio;
private double memoryOverCommitRatio;
}
private final KubernetesInfo kubernetesInfo;
private final Boolean submittingInsidePod;
Expand Down Expand Up @@ -108,6 +110,8 @@ public KubernetesRuntimeFactory(String k8Uri,
String extraDependenciesDir,
Map<String, String> customLabels,
int percentMemoryPadding,
double cpuOverCommitRatio,
double memoryOverCommitRatio,
String pulsarServiceUri,
String pulsarAdminUri,
String stateStorageServiceUri,
Expand Down Expand Up @@ -158,6 +162,8 @@ public KubernetesRuntimeFactory(String k8Uri,
this.kubernetesInfo.setChangeConfigMap(changeConfigMap);
this.kubernetesInfo.setChangeConfigMapNamespace(changeConfigMapNamespace);
this.kubernetesInfo.setPercentMemoryPadding(percentMemoryPadding);
this.kubernetesInfo.setCpuOverCommitRatio(cpuOverCommitRatio);
this.kubernetesInfo.setMemoryOverCommitRatio(memoryOverCommitRatio);
this.submittingInsidePod = submittingInsidePod;
this.installUserCodeDependencies = installUserCodeDependencies;
this.customLabels = customLabels;
Expand Down Expand Up @@ -230,6 +236,8 @@ public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String c
secretsProviderConfigurator,
expectedMetricsCollectionInterval,
this.kubernetesInfo.getPercentMemoryPadding(),
this.kubernetesInfo.getCpuOverCommitRatio(),
this.kubernetesInfo.getMemoryOverCommitRatio(),
getAuthProvider(),
authenticationEnabled);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, Res
extraDepsDir,
null,
0,
1.0,
1.0,
pulsarServiceUrl,
pulsarAdminUrl,
stateStorageServiceUrl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.stream.Collectors;

import static org.apache.pulsar.functions.runtime.RuntimeUtils.FUNCTIONS_INSTANCE_CLASSPATH;
import static org.apache.pulsar.functions.utils.FunctionCommon.roundDecimal;
import static org.powermock.api.mockito.PowerMockito.doNothing;
import static org.powermock.api.mockito.PowerMockito.spy;
import static org.testng.Assert.assertEquals;
Expand Down Expand Up @@ -149,7 +150,8 @@ public void tearDown() {
}
}

KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding) throws Exception {
KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
double cpuOverCommitRatio, double memoryOverCommitRatio) throws Exception {
KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory(
null,
null,
Expand All @@ -163,6 +165,8 @@ KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int
extraDepsDir,
null,
percentMemoryPadding,
cpuOverCommitRatio,
memoryOverCommitRatio,
pulsarServiceUrl,
pulsarAdminUrl,
stateStorageServiceUrl,
Expand Down Expand Up @@ -222,7 +226,7 @@ public void testRamPadding() throws Exception {
}

private void verifyRamPadding(int percentMemoryPadding, long ram, long expectedRamWithPadding) throws Exception {
factory = createKubernetesRuntimeFactory(null, percentMemoryPadding);
factory = createKubernetesRuntimeFactory(null, percentMemoryPadding, 1.0, 1.0);
InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, true);

KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
Expand All @@ -240,7 +244,7 @@ private void verifyRamPadding(int percentMemoryPadding, long ram, long expectedR
public void testJavaConstructor() throws Exception {
InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);

factory = createKubernetesRuntimeFactory(null, 10);
factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0);

verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false);
}
Expand All @@ -249,7 +253,7 @@ public void testJavaConstructor() throws Exception {
public void testJavaConstructorWithSecrets() throws Exception {
InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, true);

factory = createKubernetesRuntimeFactory(null, 10);
factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0);

verifyJavaInstance(config, pulsarRootDir + "/instances/deps", true);
}
Expand All @@ -260,11 +264,45 @@ public void testJavaConstructorWithDeps() throws Exception {

String extraDepsDir = "/path/to/deps/dir";

factory = createKubernetesRuntimeFactory(extraDepsDir, 10);
factory = createKubernetesRuntimeFactory(extraDepsDir, 10, 1.0, 1.0);

verifyJavaInstance(config, extraDepsDir, false);
}

@Test
public void testResources() throws Exception {

// test overcommit
testResouces(1, 1000,1.0, 1.0);
testResouces(1, 1000,2.0, 1.0);
testResouces(1, 1000,1.0, 2.0);
testResouces(1, 1000,1.5, 1.5);
testResouces(1, 1000,1.3, 1.0);

// test cpu rounding
testResouces(1.0 / 1.5, 1000,1.3, 1.0);
}

public void testResouces(double userCpuRequest, long userMemoryRequest, double cpuOverCommitRatio, double memoryOverCommitRatio) throws Exception {

Function.Resources resources = Function.Resources.newBuilder()
.setRam(userMemoryRequest).setCpu(userCpuRequest).setDisk(10000L).build();

InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
factory = createKubernetesRuntimeFactory(null, 10, cpuOverCommitRatio, memoryOverCommitRatio);
KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
List<String> args = container.getProcessArgs();

// check padding and xmx
long heap = Long.parseLong(args.stream().filter(s -> s.startsWith("-Xmx")).collect(Collectors.toList()).get(0).replace("-Xmx", ""));
V1Container containerSpec = container.getFunctionContainer(Collections.emptyList(), resources);
assertEquals(containerSpec.getResources().getLimits().get("memory").getNumber().longValue(), Math.round(heap + (heap * 0.1)));
assertEquals(containerSpec.getResources().getRequests().get("memory").getNumber().longValue(), Math.round((heap + (heap * 0.1)) / memoryOverCommitRatio));

// check cpu
assertEquals(containerSpec.getResources().getRequests().get("cpu").getNumber().doubleValue(), roundDecimal(resources.getCpu() / cpuOverCommitRatio, 3));
assertEquals(containerSpec.getResources().getLimits().get("cpu").getNumber().doubleValue(), roundDecimal(resources.getCpu(), 3));
}

private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean secretsAttached) throws Exception {
KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
Expand Down Expand Up @@ -333,7 +371,7 @@ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean s
public void testPythonConstructor() throws Exception {
InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.PYTHON, false);

factory = createKubernetesRuntimeFactory(null, 10);
factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0);

verifyPythonInstance(config, pulsarRootDir + "/instances/deps", false);
}
Expand All @@ -344,7 +382,7 @@ public void testPythonConstructorWithDeps() throws Exception {

String extraDepsDir = "/path/to/deps/dir";

factory = createKubernetesRuntimeFactory(extraDepsDir, 10);
factory = createKubernetesRuntimeFactory(extraDepsDir, 10, 1.0, 1.0);

verifyPythonInstance(config, extraDepsDir, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,4 +399,9 @@ public static Class<?> getTypeArg(String className, Class<?> funClass, ClassLoad
}
return TypeResolver.resolveRawArgument(funClass, loadedClass);
}

public static double roundDecimal(double value, int places) {
double scale = Math.pow(10, places);
return Math.round(value * scale) / scale;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerSer
workerConfig.getKubernetesContainerFactory().getExtraFunctionDependenciesDir(),
workerConfig.getKubernetesContainerFactory().getCustomLabels(),
workerConfig.getKubernetesContainerFactory().getPercentMemoryPadding(),
workerConfig.getKubernetesContainerFactory().getCpuOverCommitRatio(),
workerConfig.getKubernetesContainerFactory().getMemoryOverCommitRatio(),
StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl()) ? workerConfig.getPulsarServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl(),
StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl()) ? workerConfig.getPulsarWebServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl(),
workerConfig.getStateStorageServiceUrl(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,18 @@ public static class KubernetesContainerFactory {
doc = "Additional memory padding added on top of the memory requested by the function per on a per instance basis"
)
private int percentMemoryPadding;

@FieldContext(
doc = "The ratio cpu request and cpu limit to be set for a function/source/sink." +
" The formula for cpu request is cpuRequest = userRequestCpu / cpuOverCommitRatio"
)
private double cpuOverCommitRatio = 1.0;

@FieldContext(
doc = "The ratio memory request and memory limit to be set for a function/source/sink." +
" The formula for memory request is memoryRequest = userRequestMemory / memoryOverCommitRatio"
)
private double memoryOverCommitRatio = 1.0;
}
@FieldContext(
category = CATEGORY_FUNC_RUNTIME_MNG,
Expand Down

0 comments on commit c7b2cb3

Please sign in to comment.