Skip to content

Commit

Permalink
propagate default resource values in Pulsar Functions (apache#3636)
Browse files Browse the repository at this point in the history
* propagate default resource values in Pulsar Functions

* fix unit tests
  • Loading branch information
jerrypeng authored and merlimat committed Feb 21, 2019
1 parent 9d97f1d commit 1bf7224
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,30 @@
@NoArgsConstructor
@Builder(toBuilder=true)
public class Resources {

private static final Resources DEFAULT = new Resources();

// Default cpu is 1 core
private Double cpu = 1d;
// Default memory is 1GB
private Long ram = 1073741824l;
// Default disk is 10GB
private Long disk = 10737418240l;

public static Resources getDefaultResources() {
return DEFAULT;
}

public static Resources mergeWithDefault(Resources resources) {

if (resources == null) {
return DEFAULT;
}

double cpu = resources.getCpu() == null ? Resources.getDefaultResources().getCpu() : resources.getCpu();
long ram = resources.getRam() == null ? Resources.getDefaultResources().getRam() : resources.getRam();
long disk = resources.getDisk() == null ? Resources.getDefaultResources().getDisk() : resources.getDisk();

return new Resources(cpu, ram, disk);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,19 +211,16 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader
} else {
functionDetailsBuilder.setParallelism(1);
}
if (functionConfig.getResources() != null) {
Function.Resources.Builder bldr = Function.Resources.newBuilder();
if (functionConfig.getResources().getCpu() != null) {
bldr.setCpu(functionConfig.getResources().getCpu());
}
if (functionConfig.getResources().getRam() != null) {
bldr.setRam(functionConfig.getResources().getRam());
}
if (functionConfig.getResources().getDisk() != null) {
bldr.setDisk(functionConfig.getResources().getDisk());
}
functionDetailsBuilder.setResources(bldr.build());
}

// use default resources if resources not set
Resources resources = Resources.mergeWithDefault(functionConfig.getResources());

Function.Resources.Builder bldr = Function.Resources.newBuilder();
bldr.setCpu(resources.getCpu());
bldr.setRam(resources.getRam());
bldr.setDisk(resources.getDisk());
functionDetailsBuilder.setResources(bldr);

return functionDetailsBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,19 +181,15 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail
}
functionDetailsBuilder.setSink(sinkSpecBuilder);

if (sinkConfig.getResources() != null) {
Function.Resources.Builder bldr = Function.Resources.newBuilder();
if (sinkConfig.getResources().getCpu() != null) {
bldr.setCpu(sinkConfig.getResources().getCpu());
}
if (sinkConfig.getResources().getRam() != null) {
bldr.setRam(sinkConfig.getResources().getRam());
}
if (sinkConfig.getResources().getDisk() != null) {
bldr.setDisk(sinkConfig.getResources().getDisk());
}
functionDetailsBuilder.setResources(bldr.build());
}
// use default resources if resources not set
Resources resources = Resources.mergeWithDefault(sinkConfig.getResources());

Function.Resources.Builder bldr = Function.Resources.newBuilder();
bldr.setCpu(resources.getCpu());
bldr.setRam(resources.getRam());
bldr.setDisk(resources.getDisk());
functionDetailsBuilder.setResources(bldr);

return functionDetailsBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,14 @@ public static FunctionDetails convert(SourceConfig sourceConfig, ExtractedSource

functionDetailsBuilder.setSink(sinkSpecBuilder);

if (sourceConfig.getResources() != null) {
Function.Resources.Builder bldr = Function.Resources.newBuilder();
if (sourceConfig.getResources().getCpu() != null) {
bldr.setCpu(sourceConfig.getResources().getCpu());
}
if (sourceConfig.getResources().getRam() != null) {
bldr.setRam(sourceConfig.getResources().getRam());
}
if (sourceConfig.getResources().getDisk() != null) {
bldr.setDisk(sourceConfig.getResources().getDisk());
}
functionDetailsBuilder.setResources(bldr.build());
}
// use default resources if resources not set
Resources resources = Resources.mergeWithDefault(sourceConfig.getResources());

Function.Resources.Builder bldr = Function.Resources.newBuilder();
bldr.setCpu(resources.getCpu());
bldr.setRam(resources.getRam());
bldr.setDisk(resources.getDisk());
functionDetailsBuilder.setResources(bldr);

return functionDetailsBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ public void testConvertBackFidelity() {
functionConfig.setTimeoutMs(2000l);
Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null);
FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);

// add default resources
functionConfig.setResources(Resources.getDefaultResources());
assertEquals(
new Gson().toJson(functionConfig),
new Gson().toJson(convertedConfig)
Expand Down Expand Up @@ -90,6 +93,9 @@ public void testConvertWindow() {
functionConfig.setWindowConfig(new WindowConfig().setWindowLengthCount(10));
Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null);
FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);

// add default resources
functionConfig.setResources(Resources.getDefaultResources());
assertEquals(
new Gson().toJson(functionConfig),
new Gson().toJson(convertedConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public void testConvertBackFidelity() throws IOException {
sourceConfig.setConfigs(new HashMap<>());
Function.FunctionDetails functionDetails = SourceConfigUtils.convert(sourceConfig, new SourceConfigUtils.ExtractedSourceDetails(null, null));
SourceConfig convertedConfig = SourceConfigUtils.convertFromDetails(functionDetails);

// add default resources
sourceConfig.setResources(Resources.getDefaultResources());
assertEquals(
new Gson().toJson(sourceConfig),
new Gson().toJson(convertedConfig)
Expand Down

0 comments on commit 1bf7224

Please sign in to comment.