Skip to content

Commit

Permalink
[functions] Allow custom resources for k8s (apache#6411)
Browse files Browse the repository at this point in the history
* [functions] Allow custom resources for k8s

This commit adds additional arguments to the
BasicKubernetesManifestCustomizer for resource requirements. Currently
both the request and limit for cpu and memory are specified by resources
arguments, with no way to change the limit. This commit allows both the
limit and request to be set independently.

* Make constants private static final
  • Loading branch information
rivernate authored Apr 5, 2020
1 parent 2166003 commit 25d313f
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
*/
package org.apache.pulsar.functions.runtime.kubernetes;

import com.google.gson.Gson;
import io.kubernetes.client.custom.Quantity;
import io.kubernetes.client.models.*;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import com.google.gson.Gson;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.kubernetes.KubernetesManifestCustomizer;

import java.util.List;
import java.util.Map;
Expand All @@ -42,6 +41,10 @@
*/
public class BasicKubernetesManifestCustomizer implements KubernetesManifestCustomizer {

private static final String RESOURCE_CPU = "cpu";
private static final String RESOURCE_MEMORY = "memory";
private static final String[] RESOURCES = {RESOURCE_CPU, RESOURCE_MEMORY};

@Getter
@Setter
@NoArgsConstructor
Expand All @@ -50,6 +53,7 @@ static private class RuntimeOpts {
private Map<String, String> extraLabels;
private Map<String, String> extraAnnotations;
private Map<String, String> nodeSelectorLabels;
private V1ResourceRequirements resourceRequirements;
private List<V1Toleration> tolerations;
}

Expand Down Expand Up @@ -87,9 +91,27 @@ public V1StatefulSet customizeStatefulSet(Function.FunctionDetails funcDetails,
if (opts.getTolerations() != null && opts.getTolerations().size() > 0) {
opts.getTolerations().forEach(ps::addTolerationsItem);
}
ps.getContainers().forEach(container -> updateContainerResources(container, opts));
return statefulSet;
}

private void updateContainerResources(V1Container container, RuntimeOpts opts) {
if (opts.getResourceRequirements() != null) {
V1ResourceRequirements resourceRequirements = opts.getResourceRequirements();
V1ResourceRequirements containerResources = container.getResources();
Map<String, Quantity> limits = resourceRequirements.getLimits();
Map<String, Quantity> requests = resourceRequirements.getRequests();
for (String resource : RESOURCES) {
if (limits.containsKey(resource)) {
containerResources.putLimitsItem(resource, limits.get(resource));
}
if (requests.containsKey(resource)) {
containerResources.putRequestsItem(resource, requests.get(resource));
}
}
}
}

private RuntimeOpts getOptsFromDetails(Function.FunctionDetails funcDetails) {
String customRuntimeOptions = funcDetails.getCustomRuntimeOptions();
RuntimeOpts opts = new Gson().fromJson(customRuntimeOptions, RuntimeOpts.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.protobuf.util.JsonFormat;
import io.kubernetes.client.apis.AppsV1Api;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.custom.Quantity;
import io.kubernetes.client.models.*;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
Expand All @@ -43,6 +44,7 @@
import org.testng.annotations.Test;

import java.lang.reflect.Type;
import java.math.BigDecimal;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -630,6 +632,17 @@ public void testBasicKubernetesManifestCustomizer() throws Exception {
tolerations.add(toleration);
configObj.add("tolerations", tolerations);

JsonObject resourceRequirements = new JsonObject();
JsonObject requests = new JsonObject();
JsonObject limits = new JsonObject();
requests.addProperty("cpu", 1);
requests.addProperty("memory", "4G");
limits.addProperty("cpu", 2);
limits.addProperty("memory", "8G");
resourceRequirements.add("requests", requests);
resourceRequirements.add("limits", limits);
configObj.add("resourceRequirements", resourceRequirements);

return fb.setCustomRuntimeOptions(configObj.toString());
}));

Expand All @@ -653,6 +666,17 @@ public void testBasicKubernetesManifestCustomizer() throws Exception {
assertEquals(serviceSpec.getMetadata().getAnnotations().get("annotation"), "test");
assertEquals(serviceSpec.getMetadata().getLabels().get("label"), "test");

List<V1Container> containers = spec.getSpec().getTemplate().getSpec().getContainers();
containers.forEach(c -> {
V1ResourceRequirements resources = c.getResources();
Map<String, Quantity> limits = resources.getLimits();
Map<String, Quantity> requests = resources.getRequests();
assertEquals(requests.get("cpu").getNumber(), new BigDecimal(1) );
assertEquals(limits.get("cpu").getNumber(), new BigDecimal(2) );
assertEquals(requests.get("memory").getNumber(), new BigDecimal(4000000000L) );
assertEquals(limits.get("memory").getNumber(), new BigDecimal(8000000000L) );
});

}

@Test
Expand Down
12 changes: 11 additions & 1 deletion site2/docs/functions-runtime.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,16 @@ for the `runtimeCustomerClassName` property. This implementation takes the follo
"value": "value",
"effect": "NoSchedule"
}
]
],
"resourceRequirements": { // values for cpu and memory should be defined as described here: https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container
"requests": {
"cpu": 1,
"memory": "4G"
},
"limits": {
"cpu": 2,
"memory": "8G"
}
}
}
```

0 comments on commit 25d313f

Please sign in to comment.