Skip to content

Commit

Permalink
[hotfix][tests] Close TaskExecutor/TMServices
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Apr 8, 2022
1 parent 637d08e commit 7d22a0c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.testutils.ClassLoaderUtils;
import org.apache.flink.util.ChildFirstClassLoader;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.CheckedSupplier;

Expand Down Expand Up @@ -184,7 +185,8 @@ public void testNonHeapMetricUsageNotStatic() throws Exception {
}

@Test
public void testManagedMemoryMetricsInitialization() throws MemoryAllocationException {
public void testManagedMemoryMetricsInitialization()
throws MemoryAllocationException, FlinkException {
final int maxMemorySize = 16284;
final int numberOfAllocatedPages = 2;
final int pageSize = 4096;
Expand All @@ -202,34 +204,38 @@ public void testManagedMemoryMetricsInitialization() throws MemoryAllocationExce
.build())
.setManagedMemorySize(maxMemorySize)
.build();
try {

List<String> actualSubGroupPath = new ArrayList<>();
final InterceptingOperatorMetricGroup metricGroup =
new InterceptingOperatorMetricGroup() {
@Override
public MetricGroup addGroup(String name) {
actualSubGroupPath.add(name);
return this;
}
};
MetricUtils.instantiateFlinkMemoryMetricGroup(
metricGroup,
taskManagerServices.getTaskSlotTable(),
taskManagerServices::getManagedMemorySize);

Gauge<Number> usedMetric = (Gauge<Number>) metricGroup.get("Used");
Gauge<Number> maxMetric = (Gauge<Number>) metricGroup.get("Total");

assertThat(usedMetric.getValue().intValue(), is(numberOfAllocatedPages * pageSize));
assertThat(maxMetric.getValue().intValue(), is(maxMemorySize));

assertThat(
actualSubGroupPath,
is(
Arrays.asList(
METRIC_GROUP_FLINK,
METRIC_GROUP_MEMORY,
METRIC_GROUP_MANAGED_MEMORY)));
List<String> actualSubGroupPath = new ArrayList<>();
final InterceptingOperatorMetricGroup metricGroup =
new InterceptingOperatorMetricGroup() {
@Override
public MetricGroup addGroup(String name) {
actualSubGroupPath.add(name);
return this;
}
};
MetricUtils.instantiateFlinkMemoryMetricGroup(
metricGroup,
taskManagerServices.getTaskSlotTable(),
taskManagerServices::getManagedMemorySize);

Gauge<Number> usedMetric = (Gauge<Number>) metricGroup.get("Used");
Gauge<Number> maxMetric = (Gauge<Number>) metricGroup.get("Total");

assertThat(usedMetric.getValue().intValue(), is(numberOfAllocatedPages * pageSize));
assertThat(maxMetric.getValue().intValue(), is(maxMemorySize));

assertThat(
actualSubGroupPath,
is(
Arrays.asList(
METRIC_GROUP_FLINK,
METRIC_GROUP_MEMORY,
METRIC_GROUP_MANAGED_MEMORY)));
} finally {
taskManagerServices.shutDown();
}
}

// --------------- utility methods and classes ---------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ public void close() throws Exception {
temporaryFolder.delete();

testingFatalErrorHandler.rethrowError();

taskExecutor.close();
}

public static final class Builder {
Expand Down

0 comments on commit 7d22a0c

Please sign in to comment.