diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index 68858bc0ceac8..0c9dd49c1d16b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -45,7 +45,6 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; -import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.types.Record; import org.apache.flink.util.MutableObjectIterator; import org.apache.flink.util.Preconditions; @@ -91,6 +90,8 @@ public class MockEnvironment implements Environment, AutoCloseable { private final JobVertexID jobVertexID; + private final TaskManagerRuntimeInfo taskManagerRuntimeInfo; + private final BroadcastVariableManager bcVarManager = new BroadcastVariableManager(); private final AccumulatorRegistry accumulatorRegistry; @@ -127,7 +128,8 @@ protected MockEnvironment( int parallelism, int subtaskIndex, ClassLoader userCodeClassLoader, - TaskMetricGroup taskMetricGroup) { + TaskMetricGroup taskMetricGroup, + TaskManagerRuntimeInfo taskManagerRuntimeInfo) { this.jobID = jobID; this.jobVertexID = jobVertexID; @@ -140,6 +142,7 @@ protected MockEnvironment( this.memManager = new MemoryManager(memorySize, 1); this.ioManager = new IOManagerAsync(); + this.taskManagerRuntimeInfo = taskManagerRuntimeInfo; this.executionConfig = executionConfig; this.inputSplitProvider = inputSplitProvider; @@ -212,7 +215,7 @@ public Configuration getJobConfiguration() { @Override public TaskManagerRuntimeInfo getTaskManagerInfo() { - return new TestingTaskManagerRuntimeInfo(); + return this.taskManagerRuntimeInfo; } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java index dfcc5f312e0f0..34a6ec492dc36 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java @@ -27,6 +27,8 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; public class MockEnvironmentBuilder { private String taskName = "mock-task"; @@ -43,6 +45,7 @@ public class MockEnvironmentBuilder { private JobID jobID = new JobID(); private JobVertexID jobVertexID = new JobVertexID(); private TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); + private TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo(); public MockEnvironmentBuilder setTaskName(String taskName) { this.taskName = taskName; @@ -79,6 +82,11 @@ public MockEnvironmentBuilder setExecutionConfig(ExecutionConfig executionConfig return this; } + public MockEnvironmentBuilder setTaskManagerRuntimeInfo(TaskManagerRuntimeInfo taskManagerRuntimeInfo){ + this.taskManagerRuntimeInfo = taskManagerRuntimeInfo; + return this; + } + public MockEnvironmentBuilder setMaxParallelism(int maxParallelism) { this.maxParallelism = maxParallelism; return this; @@ -129,6 +137,7 @@ public MockEnvironment build() { parallelism, subtaskIndex, userCodeClassLoader, - taskMetricGroup); + taskMetricGroup, + taskManagerRuntimeInfo); } }