Skip to content

Commit

Permalink
[FLINK-14495][core] Limit ResourceSpec to always specify cpu cores an…
Browse files Browse the repository at this point in the history
…d task heap memory size, unless it UNKNOWN.
  • Loading branch information
xintongsong authored and azagrebin committed Nov 12, 2019
1 parent b6c2943 commit 001733b
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -338,22 +338,31 @@ private Object readResolve() {
// builder
// ------------------------------------------------------------------------

public static Builder newBuilder() {
return new Builder();
public static Builder newBuilder(double cpuCores, MemorySize taskHeapMemory) {
return new Builder(cpuCores, taskHeapMemory);
}

public static Builder newBuilder(double cpuCores, int taskHeapMemoryMB) {
return newBuilder(cpuCores, MemorySize.parse(taskHeapMemoryMB + "m"));
}

/**
* Builder for the {@link ResourceSpec}.
*/
public static class Builder {

private double cpuCores = 0.0;
private MemorySize taskHeapMemory = MemorySize.ZERO;
private double cpuCores;
private MemorySize taskHeapMemory;
private MemorySize taskOffHeapMemory = MemorySize.ZERO;
private MemorySize onHeapManagedMemory = MemorySize.ZERO;
private MemorySize offHeapManagedMemory = MemorySize.ZERO;
private GPUResource gpuResource;

private Builder(double cpuCores, MemorySize taskHeapMemory) {
this.cpuCores = cpuCores;
this.taskHeapMemory = taskHeapMemory;
}

public Builder setCpuCores(double cpuCores) {
this.cpuCores = cpuCores;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,42 +36,34 @@ public class ResourceSpecTest extends TestLogger {

@Test
public void testIsValid() throws Exception {
ResourceSpec rs = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
ResourceSpec rs = ResourceSpec.newBuilder(1.0, 100).build();
assertTrue(rs.isValid());

rs = ResourceSpec.newBuilder().
setCpuCores(1.0).
setTaskHeapMemoryMB(100).
rs = ResourceSpec.newBuilder(1.0, 100).
setGPUResource(1).
build();
assertTrue(rs.isValid());

rs = ResourceSpec.newBuilder().
setCpuCores(1.0).
setTaskHeapMemoryMB(100).
rs = ResourceSpec.newBuilder(1.0, 100).
setGPUResource(-1).
build();
assertFalse(rs.isValid());
}

@Test
public void testLessThanOrEqual() throws Exception {
ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
ResourceSpec rs1 = ResourceSpec.newBuilder(1.0, 100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder(1.0, 100).build();
assertTrue(rs1.lessThanOrEqual(rs2));
assertTrue(rs2.lessThanOrEqual(rs1));

ResourceSpec rs3 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setTaskHeapMemoryMB(100).
ResourceSpec rs3 = ResourceSpec.newBuilder(1.0, 100).
setGPUResource(1.1).
build();
assertTrue(rs1.lessThanOrEqual(rs3));
assertFalse(rs3.lessThanOrEqual(rs1));

ResourceSpec rs4 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setTaskHeapMemoryMB(100).
ResourceSpec rs4 = ResourceSpec.newBuilder(1.0, 100).
setGPUResource(2.2).
build();
assertFalse(rs4.lessThanOrEqual(rs3));
Expand All @@ -80,65 +72,51 @@ public void testLessThanOrEqual() throws Exception {

@Test
public void testEquals() throws Exception {
ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
ResourceSpec rs1 = ResourceSpec.newBuilder(1.0, 100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder(1.0, 100).build();
assertEquals(rs1, rs2);
assertEquals(rs2, rs1);

ResourceSpec rs3 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setTaskHeapMemoryMB(100).
ResourceSpec rs3 = ResourceSpec.newBuilder(1.0, 100).
setGPUResource(2.2).
build();
ResourceSpec rs4 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setTaskHeapMemoryMB(100).
ResourceSpec rs4 = ResourceSpec.newBuilder(1.0, 100).
setGPUResource(1).
build();
assertNotEquals(rs3, rs4);

ResourceSpec rs5 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setTaskHeapMemoryMB(100).
ResourceSpec rs5 = ResourceSpec.newBuilder(1.0, 100).
setGPUResource(2.2).
build();
assertEquals(rs3, rs5);
}

@Test
public void testHashCode() throws Exception {
ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
ResourceSpec rs1 = ResourceSpec.newBuilder(1.0, 100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder(1.0, 100).build();
assertEquals(rs1.hashCode(), rs2.hashCode());

ResourceSpec rs3 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setTaskHeapMemoryMB(100).
ResourceSpec rs3 = ResourceSpec.newBuilder(1.0, 100).
setGPUResource(2.2).
build();
ResourceSpec rs4 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setTaskHeapMemoryMB(100).
ResourceSpec rs4 = ResourceSpec.newBuilder(1.0, 100).
setGPUResource(1).
build();
assertNotEquals(rs3.hashCode(), rs4.hashCode());

ResourceSpec rs5 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setTaskHeapMemoryMB(100).
ResourceSpec rs5 = ResourceSpec.newBuilder(1.0, 100).
setGPUResource(2.2).
build();
assertEquals(rs3.hashCode(), rs5.hashCode());
}

@Test
public void testMerge() throws Exception {
ResourceSpec rs1 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setTaskHeapMemoryMB(100).
ResourceSpec rs1 = ResourceSpec.newBuilder(1.0, 100).
setGPUResource(1.1).
build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder(1.0, 100).build();

ResourceSpec rs3 = rs1.merge(rs2);
assertEquals(2.0, rs3.getCpuCores(), 0.000001);
Expand All @@ -151,9 +129,7 @@ public void testMerge() throws Exception {

@Test
public void testSerializable() throws Exception {
ResourceSpec rs1 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setTaskHeapMemoryMB(100).
ResourceSpec rs1 = ResourceSpec.newBuilder(1.0, 100).
setGPUResource(1.1).
build();

Expand All @@ -164,9 +140,7 @@ public void testSerializable() throws Exception {
@Test
public void testMergeThisUnknown() throws Exception {
final ResourceSpec spec1 = ResourceSpec.UNKNOWN;
final ResourceSpec spec2 = ResourceSpec.newBuilder()
.setCpuCores(1.0)
.setTaskHeapMemoryMB(100)
final ResourceSpec spec2 = ResourceSpec.newBuilder(1.0, 100)
.setGPUResource(1.1)
.build();

Expand All @@ -177,9 +151,7 @@ public void testMergeThisUnknown() throws Exception {

@Test
public void testMergeOtherUnknown() throws Exception {
final ResourceSpec spec1 = ResourceSpec.newBuilder()
.setCpuCores(1.0)
.setTaskHeapMemoryMB(100)
final ResourceSpec spec1 = ResourceSpec.newBuilder(1.0, 100)
.setGPUResource(1.1)
.build();
final ResourceSpec spec2 = ResourceSpec.UNKNOWN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ public void testConfigurationOfResource() throws Exception{
opMethod.setAccessible(true);

// verify explicit change in resources
ResourceSpec minResources = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
ResourceSpec preferredResources = ResourceSpec.newBuilder().setCpuCores(2.0).setTaskHeapMemoryMB(200).build();
ResourceSpec minResources = ResourceSpec.newBuilder(1.0, 100).build();
ResourceSpec preferredResources = ResourceSpec.newBuilder(2.0, 200).build();
opMethod.invoke(operator, minResources, preferredResources);

assertEquals(minResources, operator.getMinResources());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ public class JobGraphGeneratorTest {
*/
@Test
public void testResourcesForChainedOperators() throws Exception {
ResourceSpec resource1 = ResourceSpec.newBuilder().setCpuCores(0.1).setTaskHeapMemoryMB(100).build();
ResourceSpec resource2 = ResourceSpec.newBuilder().setCpuCores(0.2).setTaskHeapMemoryMB(200).build();
ResourceSpec resource3 = ResourceSpec.newBuilder().setCpuCores(0.3).setTaskHeapMemoryMB(300).build();
ResourceSpec resource4 = ResourceSpec.newBuilder().setCpuCores(0.4).setTaskHeapMemoryMB(400).build();
ResourceSpec resource5 = ResourceSpec.newBuilder().setCpuCores(0.5).setTaskHeapMemoryMB(500).build();
ResourceSpec resource6 = ResourceSpec.newBuilder().setCpuCores(0.6).setTaskHeapMemoryMB(600).build();
ResourceSpec resource7 = ResourceSpec.newBuilder().setCpuCores(0.7).setTaskHeapMemoryMB(700).build();
ResourceSpec resource1 = ResourceSpec.newBuilder(0.1, 100).build();
ResourceSpec resource2 = ResourceSpec.newBuilder(0.2, 200).build();
ResourceSpec resource3 = ResourceSpec.newBuilder(0.3, 300).build();
ResourceSpec resource4 = ResourceSpec.newBuilder(0.4, 400).build();
ResourceSpec resource5 = ResourceSpec.newBuilder(0.5, 500).build();
ResourceSpec resource6 = ResourceSpec.newBuilder(0.6, 600).build();
ResourceSpec resource7 = ResourceSpec.newBuilder(0.7, 700).build();

Method opMethod = Operator.class.getDeclaredMethod("setResources", ResourceSpec.class);
opMethod.setAccessible(true);
Expand Down Expand Up @@ -147,12 +147,12 @@ public boolean filter(Long value) throws Exception {
*/
@Test
public void testResourcesForDeltaIteration() throws Exception{
ResourceSpec resource1 = ResourceSpec.newBuilder().setCpuCores(0.1).setTaskHeapMemoryMB(100).build();
ResourceSpec resource2 = ResourceSpec.newBuilder().setCpuCores(0.2).setTaskHeapMemoryMB(200).build();
ResourceSpec resource3 = ResourceSpec.newBuilder().setCpuCores(0.3).setTaskHeapMemoryMB(300).build();
ResourceSpec resource4 = ResourceSpec.newBuilder().setCpuCores(0.4).setTaskHeapMemoryMB(400).build();
ResourceSpec resource5 = ResourceSpec.newBuilder().setCpuCores(0.5).setTaskHeapMemoryMB(500).build();
ResourceSpec resource6 = ResourceSpec.newBuilder().setCpuCores(0.6).setTaskHeapMemoryMB(600).build();
ResourceSpec resource1 = ResourceSpec.newBuilder(0.1, 100).build();
ResourceSpec resource2 = ResourceSpec.newBuilder(0.2, 200).build();
ResourceSpec resource3 = ResourceSpec.newBuilder(0.3, 300).build();
ResourceSpec resource4 = ResourceSpec.newBuilder(0.4, 400).build();
ResourceSpec resource5 = ResourceSpec.newBuilder(0.5, 500).build();
ResourceSpec resource6 = ResourceSpec.newBuilder(0.6, 600).build();

Method opMethod = Operator.class.getDeclaredMethod("setResources", ResourceSpec.class);
opMethod.setAccessible(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,10 @@ public void testMatchRequirement() {
ResourceProfile rp5 = new ResourceProfile(2.0, 100, 100, 100, 100, 100, null);
assertFalse(rp4.isMatching(rp5));

ResourceSpec rs1 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setTaskHeapMemoryMB(100).
ResourceSpec rs1 = ResourceSpec.newBuilder(1.0, 100).
setGPUResource(2.2).
build();
ResourceSpec rs2 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setTaskHeapMemoryMB(100).
ResourceSpec rs2 = ResourceSpec.newBuilder(1.0, 100).
setGPUResource(1.1).
build();

Expand All @@ -88,25 +84,19 @@ public void testUnknownMatchesUnknown() {

@Test
public void testEquals() {
ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
ResourceSpec rs1 = ResourceSpec.newBuilder(1.0, 100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder(1.0, 100).build();
assertEquals(ResourceProfile.fromResourceSpec(rs1), ResourceProfile.fromResourceSpec(rs2));

ResourceSpec rs3 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setTaskHeapMemoryMB(100).
ResourceSpec rs3 = ResourceSpec.newBuilder(1.0, 100).
setGPUResource(2.2).
build();
ResourceSpec rs4 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setTaskHeapMemoryMB(100).
ResourceSpec rs4 = ResourceSpec.newBuilder(1.0, 100).
setGPUResource(1.1).
build();
assertNotEquals(ResourceProfile.fromResourceSpec(rs3), ResourceProfile.fromResourceSpec(rs4));

ResourceSpec rs5 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setTaskHeapMemoryMB(100).
ResourceSpec rs5 = ResourceSpec.newBuilder(1.0, 100).
setGPUResource(2.2).
build();
MemorySize networkMemory = MemorySize.parse(100 + "m");
Expand All @@ -132,39 +122,31 @@ public void testEquals() {

@Test
public void testCompareTo() {
ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
ResourceSpec rs1 = ResourceSpec.newBuilder(1.0, 100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder(1.0, 100).build();
assertEquals(0, ResourceProfile.fromResourceSpec(rs1).compareTo(ResourceProfile.fromResourceSpec(rs2)));

ResourceSpec rs3 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setTaskHeapMemoryMB(100).
ResourceSpec rs3 = ResourceSpec.newBuilder(1.0, 100).
setGPUResource(2.2).
build();
assertEquals(-1, ResourceProfile.fromResourceSpec(rs1).compareTo(ResourceProfile.fromResourceSpec(rs3)));
assertEquals(1, ResourceProfile.fromResourceSpec(rs3).compareTo(ResourceProfile.fromResourceSpec(rs1)));

ResourceSpec rs4 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setTaskHeapMemoryMB(100).
ResourceSpec rs4 = ResourceSpec.newBuilder(1.0, 100).
setGPUResource(1.1).
build();
assertEquals(1, ResourceProfile.fromResourceSpec(rs3).compareTo(ResourceProfile.fromResourceSpec(rs4)));
assertEquals(-1, ResourceProfile.fromResourceSpec(rs4).compareTo(ResourceProfile.fromResourceSpec(rs3)));

ResourceSpec rs5 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setTaskHeapMemoryMB(100).
ResourceSpec rs5 = ResourceSpec.newBuilder(1.0, 100).
setGPUResource(2.2).
build();
assertEquals(0, ResourceProfile.fromResourceSpec(rs3).compareTo(ResourceProfile.fromResourceSpec(rs5)));
}

@Test
public void testGet() {
ResourceSpec rs = ResourceSpec.newBuilder().
setCpuCores(1.0).
setTaskHeapMemoryMB(100).
ResourceSpec rs = ResourceSpec.newBuilder(1.0, 100).
setGPUResource(1.6).
build();
ResourceProfile rp = ResourceProfile.fromResourceSpec(rs, MemorySize.parse(50 + "m"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ public void testJobSubmissionWithPartialResourceConfigured() throws Exception {

DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);

ResourceSpec resourceSpec = ResourceSpec.newBuilder().setCpuCores(2).setTaskHeapMemoryMB(0).build();
ResourceSpec resourceSpec = ResourceSpec.newBuilder(2.0, 0).build();

final JobVertex firstVertex = new JobVertex("firstVertex");
firstVertex.setInvokableClass(NoOpInvokable.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,26 +547,26 @@ public void invoke(Long value) throws Exception {
public void testResources() throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

ResourceSpec minResource1 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
ResourceSpec preferredResource1 = ResourceSpec.newBuilder().setCpuCores(2.0).setTaskHeapMemoryMB(200).build();
ResourceSpec minResource1 = ResourceSpec.newBuilder(1.0, 100).build();
ResourceSpec preferredResource1 = ResourceSpec.newBuilder(2.0, 200).build();

ResourceSpec minResource2 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(200).build();
ResourceSpec preferredResource2 = ResourceSpec.newBuilder().setCpuCores(2.0).setTaskHeapMemoryMB(300).build();
ResourceSpec minResource2 = ResourceSpec.newBuilder(1.0, 200).build();
ResourceSpec preferredResource2 = ResourceSpec.newBuilder(2.0, 300).build();

ResourceSpec minResource3 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(300).build();
ResourceSpec preferredResource3 = ResourceSpec.newBuilder().setCpuCores(2.0).setTaskHeapMemoryMB(400).build();
ResourceSpec minResource3 = ResourceSpec.newBuilder(1.0, 300).build();
ResourceSpec preferredResource3 = ResourceSpec.newBuilder(2.0, 400).build();

ResourceSpec minResource4 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(400).build();
ResourceSpec preferredResource4 = ResourceSpec.newBuilder().setCpuCores(2.0).setTaskHeapMemoryMB(500).build();
ResourceSpec minResource4 = ResourceSpec.newBuilder(1.0, 400).build();
ResourceSpec preferredResource4 = ResourceSpec.newBuilder(2.0, 500).build();

ResourceSpec minResource5 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(500).build();
ResourceSpec preferredResource5 = ResourceSpec.newBuilder().setCpuCores(2.0).setTaskHeapMemoryMB(600).build();
ResourceSpec minResource5 = ResourceSpec.newBuilder(1.0, 500).build();
ResourceSpec preferredResource5 = ResourceSpec.newBuilder(2.0, 600).build();

ResourceSpec minResource6 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(600).build();
ResourceSpec preferredResource6 = ResourceSpec.newBuilder().setCpuCores(2.0).setTaskHeapMemoryMB(700).build();
ResourceSpec minResource6 = ResourceSpec.newBuilder(1.0, 600).build();
ResourceSpec preferredResource6 = ResourceSpec.newBuilder(2.0, 700).build();

ResourceSpec minResource7 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(700).build();
ResourceSpec preferredResource7 = ResourceSpec.newBuilder().setCpuCores(2.0).setTaskHeapMemoryMB(800).build();
ResourceSpec minResource7 = ResourceSpec.newBuilder(1.0, 700).build();
ResourceSpec preferredResource7 = ResourceSpec.newBuilder(2.0, 800).build();

Method opMethod = SingleOutputStreamOperator.class.getDeclaredMethod("setResources", ResourceSpec.class, ResourceSpec.class);
opMethod.setAccessible(true);
Expand Down
Loading

0 comments on commit 001733b

Please sign in to comment.