diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java index fde3d38801775..06513d657a478 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java @@ -29,10 +29,30 @@ @NoArgsConstructor @Builder(toBuilder=true) public class Resources { + + private static final Resources DEFAULT = new Resources(); + // Default cpu is 1 core private Double cpu = 1d; // Default memory is 1GB private Long ram = 1073741824l; // Default disk is 10GB private Long disk = 10737418240l; + + public static Resources getDefaultResources() { + return DEFAULT; + } + + public static Resources mergeWithDefault(Resources resources) { + + if (resources == null) { + return DEFAULT; + } + + double cpu = resources.getCpu() == null ? Resources.getDefaultResources().getCpu() : resources.getCpu(); + long ram = resources.getRam() == null ? Resources.getDefaultResources().getRam() : resources.getRam(); + long disk = resources.getDisk() == null ? Resources.getDefaultResources().getDisk() : resources.getDisk(); + + return new Resources(cpu, ram, disk); + } } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index 3da83c14ace70..aef096a704dc9 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -211,19 +211,16 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader } else { functionDetailsBuilder.setParallelism(1); } - if (functionConfig.getResources() != null) { - Function.Resources.Builder bldr = Function.Resources.newBuilder(); - if (functionConfig.getResources().getCpu() != null) { - bldr.setCpu(functionConfig.getResources().getCpu()); - } - if (functionConfig.getResources().getRam() != null) { - bldr.setRam(functionConfig.getResources().getRam()); - } - if (functionConfig.getResources().getDisk() != null) { - bldr.setDisk(functionConfig.getResources().getDisk()); - } - functionDetailsBuilder.setResources(bldr.build()); - } + + // use default resources if resources not set + Resources resources = Resources.mergeWithDefault(functionConfig.getResources()); + + Function.Resources.Builder bldr = Function.Resources.newBuilder(); + bldr.setCpu(resources.getCpu()); + bldr.setRam(resources.getRam()); + bldr.setDisk(resources.getDisk()); + functionDetailsBuilder.setResources(bldr); + return functionDetailsBuilder.build(); } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index a972947198a59..e7c53c5883e50 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -181,19 +181,15 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail } functionDetailsBuilder.setSink(sinkSpecBuilder); - if (sinkConfig.getResources() != null) { - Function.Resources.Builder bldr = Function.Resources.newBuilder(); - if (sinkConfig.getResources().getCpu() != null) { - bldr.setCpu(sinkConfig.getResources().getCpu()); - } - if (sinkConfig.getResources().getRam() != null) { - bldr.setRam(sinkConfig.getResources().getRam()); - } - if (sinkConfig.getResources().getDisk() != null) { - bldr.setDisk(sinkConfig.getResources().getDisk()); - } - functionDetailsBuilder.setResources(bldr.build()); - } + // use default resources if resources not set + Resources resources = Resources.mergeWithDefault(sinkConfig.getResources()); + + Function.Resources.Builder bldr = Function.Resources.newBuilder(); + bldr.setCpu(resources.getCpu()); + bldr.setRam(resources.getRam()); + bldr.setDisk(resources.getDisk()); + functionDetailsBuilder.setResources(bldr); + return functionDetailsBuilder.build(); } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java index 94274cf86050c..59b05bd4a7fa0 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java @@ -124,19 +124,14 @@ public static FunctionDetails convert(SourceConfig sourceConfig, ExtractedSource functionDetailsBuilder.setSink(sinkSpecBuilder); - if (sourceConfig.getResources() != null) { - Function.Resources.Builder bldr = Function.Resources.newBuilder(); - if (sourceConfig.getResources().getCpu() != null) { - bldr.setCpu(sourceConfig.getResources().getCpu()); - } - if (sourceConfig.getResources().getRam() != null) { - bldr.setRam(sourceConfig.getResources().getRam()); - } - if (sourceConfig.getResources().getDisk() != null) { - bldr.setDisk(sourceConfig.getResources().getDisk()); - } - functionDetailsBuilder.setResources(bldr.build()); - } + // use default resources if resources not set + Resources resources = Resources.mergeWithDefault(sourceConfig.getResources()); + + Function.Resources.Builder bldr = Function.Resources.newBuilder(); + bldr.setCpu(resources.getCpu()); + bldr.setRam(resources.getRam()); + bldr.setDisk(resources.getDisk()); + functionDetailsBuilder.setResources(bldr); return functionDetailsBuilder.build(); } diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java index 0f4f40054304f..b758c72e97293 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java @@ -62,6 +62,9 @@ public void testConvertBackFidelity() { functionConfig.setTimeoutMs(2000l); Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null); FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails); + + // add default resources + functionConfig.setResources(Resources.getDefaultResources()); assertEquals( new Gson().toJson(functionConfig), new Gson().toJson(convertedConfig) @@ -90,6 +93,9 @@ public void testConvertWindow() { functionConfig.setWindowConfig(new WindowConfig().setWindowLengthCount(10)); Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null); FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails); + + // add default resources + functionConfig.setResources(Resources.getDefaultResources()); assertEquals( new Gson().toJson(functionConfig), new Gson().toJson(convertedConfig) diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java index a3aad9158dddc..0f4b1cb5f09c7 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java @@ -53,6 +53,9 @@ public void testConvertBackFidelity() throws IOException { sourceConfig.setConfigs(new HashMap<>()); Function.FunctionDetails functionDetails = SourceConfigUtils.convert(sourceConfig, new SourceConfigUtils.ExtractedSourceDetails(null, null)); SourceConfig convertedConfig = SourceConfigUtils.convertFromDetails(functionDetails); + + // add default resources + sourceConfig.setResources(Resources.getDefaultResources()); assertEquals( new Gson().toJson(sourceConfig), new Gson().toJson(convertedConfig)