Skip to content

Commit

Permalink
[FLINK-32091][checkpoint] Add file size metrics for file-merging (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
fredia authored Jun 14, 2024
1 parent bc5c4b8 commit 5497868
Show file tree
Hide file tree
Showing 12 changed files with 264 additions and 13 deletions.
21 changes: 21 additions & 0 deletions docs/content.zh/docs/ops/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -1358,6 +1358,27 @@ Note that for failed checkpoints, metrics are updated on a best efforts basis an
<td>The time in nanoseconds that elapsed between the creation of the last checkpoint and the time when the checkpointing process has started by this Task. This delay shows how long it takes for the first checkpoint barrier to reach the task. A high value indicates back-pressure. If only a specific task has a long start delay, the most likely reason is data skew.</td>
<td>Gauge</td>
</tr>
<tr>
<th rowspan="4"><strong>Job (only available on TaskManager)</strong></th>
<td>fileMerging.logicalFileCount</td>
<td>The number of logical files of file merging mechanism.</td>
<td>Gauge</td>
</tr>
<tr>
<td>fileMerging.logicalFileSize</td>
<td>The total size of logical files of file merging mechanism on one task manager for one job.</td>
<td>Gauge</td>
</tr>
<tr>
<td>fileMerging.physicalFileCount</td>
<td>The number of physical files of file merging mechanism.</td>
<td>Gauge</td>
</tr>
<tr>
<td>fileMerging.physicalFileSize</td>
<td>The total size of physical files of file merging mechanism on one task manager for one job, usually larger than <samp>fileMerging.logicalFileSize</samp>.</td>
<td>Gauge</td>
</tr>
</tbody>
</table>

Expand Down
21 changes: 21 additions & 0 deletions docs/content/docs/ops/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -1348,6 +1348,27 @@ Note that for failed checkpoints, metrics are updated on a best efforts basis an
<td>The time in nanoseconds that elapsed between the creation of the last checkpoint and the time when the checkpointing process has started by this Task. This delay shows how long it takes for the first checkpoint barrier to reach the task. A high value indicates back-pressure. If only a specific task has a long start delay, the most likely reason is data skew.</td>
<td>Gauge</td>
</tr>
<tr>
<th rowspan="4"><strong>Job (only available on TaskManager)</strong></th>
<td>fileMerging.logicalFileCount</td>
<td>The number of logical files of file merging mechanism.</td>
<td>Gauge</td>
</tr>
<tr>
<td>fileMerging.logicalFileSize</td>
<td>The total size of logical files of file merging mechanism on one task manager for one job.</td>
<td>Gauge</td>
</tr>
<tr>
<td>fileMerging.physicalFileCount</td>
<td>The number of physical files of file merging mechanism.</td>
<td>Gauge</td>
</tr>
<tr>
<td>fileMerging.physicalFileSize</td>
<td>The total size of physical files of file merging mechanism on one task manager for one job, usually larger than <samp>fileMerging.logicalFileSize</samp>.</td>
<td>Gauge</td>
</tr>
</tbody>
</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.runtime.checkpoint.filemerging;

import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.CheckpointedStateScope;

import javax.annotation.Nonnull;
Expand All @@ -35,8 +36,9 @@ public AcrossCheckpointFileMergingSnapshotManager(
long maxFileSize,
PhysicalFilePool.Type filePoolType,
float maxSpaceAmplification,
Executor ioExecutor) {
super(id, maxFileSize, filePoolType, maxSpaceAmplification, ioExecutor);
Executor ioExecutor,
MetricGroup metricGroup) {
super(id, maxFileSize, filePoolType, maxSpaceAmplification, ioExecutor, metricGroup);
filePool = createPhysicalPool();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint.filemerging;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;

import net.jcip.annotations.ThreadSafe;

/**
* Metrics related to the file merging snapshot manager. Thread-safety is required because it is
* used by multiple task threads.
*/
@ThreadSafe
public class FileMergingMetricGroup extends ProxyMetricGroup<MetricGroup> {

private static final String PREFIX = "fileMerging";
@VisibleForTesting public static final String LOGICAL_FILE_COUNT = PREFIX + ".logicalFileCount";
@VisibleForTesting public static final String LOGICAL_FILE_SIZE = PREFIX + ".logicalFileSize";

@VisibleForTesting
public static final String PHYSICAL_FILE_COUNT = PREFIX + ".physicalFileCount";

@VisibleForTesting public static final String PHYSICAL_FILE_SIZE = PREFIX + ".physicalFileSize";

public FileMergingMetricGroup(
MetricGroup parentMetricGroup, FileMergingSnapshotManager.SpaceStat spaceStat) {
super(parentMetricGroup);
registerMetrics(spaceStat);
}

public void registerMetrics(FileMergingSnapshotManager.SpaceStat spaceStat) {
gauge(LOGICAL_FILE_COUNT, spaceStat.logicalFileCount::get);
gauge(LOGICAL_FILE_SIZE, spaceStat.logicalFileSize::get);
gauge(PHYSICAL_FILE_COUNT, spaceStat.physicalFileCount::get);
gauge(PHYSICAL_FILE_SIZE, spaceStat.physicalFileSize::get);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.OutputStreamAndPath;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
Expand Down Expand Up @@ -153,19 +154,24 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps
/** The current space statistic, updated on file creation/deletion. */
protected SpaceStat spaceStat;

/** The metric group for file merging snapshot manager. */
protected FileMergingMetricGroup metricGroup;

public FileMergingSnapshotManagerBase(
String id,
long maxFileSize,
PhysicalFilePool.Type filePoolType,
float maxSpaceAmplification,
Executor ioExecutor) {
Executor ioExecutor,
MetricGroup parentMetricGroup) {
this.id = id;
this.maxPhysicalFileSize = maxFileSize;
this.filePoolType = filePoolType;
this.maxSpaceAmplification =
maxSpaceAmplification < 1f ? Float.MAX_VALUE : maxSpaceAmplification;
this.ioExecutor = ioExecutor;
this.spaceStat = new SpaceStat();
this.metricGroup = new FileMergingMetricGroup(parentMetricGroup, spaceStat);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.flink.runtime.checkpoint.filemerging;

import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;
Expand All @@ -43,6 +45,8 @@ public class FileMergingSnapshotManagerBuilder {

@Nullable private Executor ioExecutor = null;

@Nullable private TaskManagerJobMetricGroup metricGroup;

/**
* Initialize the builder.
*
Expand Down Expand Up @@ -86,6 +90,11 @@ public FileMergingSnapshotManagerBuilder setIOExecutor(@Nullable Executor ioExec
return this;
}

public FileMergingSnapshotManagerBuilder setMetricGroup(TaskManagerJobMetricGroup metricGroup) {
this.metricGroup = metricGroup;
return this;
}

/**
* Create file-merging snapshot manager based on configuration.
*
Expand All @@ -99,14 +108,22 @@ public FileMergingSnapshotManager build() {
maxFileSize,
filePoolType,
maxSpaceAmplification,
ioExecutor == null ? Runnable::run : ioExecutor);
ioExecutor == null ? Runnable::run : ioExecutor,
metricGroup == null
? new UnregisteredMetricGroups
.UnregisteredTaskManagerJobMetricGroup()
: metricGroup);
case MERGE_ACROSS_CHECKPOINT:
return new AcrossCheckpointFileMergingSnapshotManager(
id,
maxFileSize,
filePoolType,
maxSpaceAmplification,
ioExecutor == null ? Runnable::run : ioExecutor);
ioExecutor == null ? Runnable::run : ioExecutor,
metricGroup == null
? new UnregisteredMetricGroups
.UnregisteredTaskManagerJobMetricGroup()
: metricGroup);
default:
throw new UnsupportedOperationException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.checkpoint.filemerging;

import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.CheckpointedStateScope;

import javax.annotation.Nonnull;
Expand All @@ -43,9 +44,10 @@ public WithinCheckpointFileMergingSnapshotManager(
long maxFileSize,
PhysicalFilePool.Type filePoolType,
float maxSpaceAmplification,
Executor ioExecutor) {
Executor ioExecutor,
MetricGroup metricGroup) {
// currently there is no file size limit For WITHIN_BOUNDARY mode
super(id, maxFileSize, filePoolType, maxSpaceAmplification, ioExecutor);
super(id, maxFileSize, filePoolType, maxSpaceAmplification, ioExecutor, metricGroup);
writablePhysicalFilePool = new HashMap<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingType;
import org.apache.flink.runtime.checkpoint.filemerging.PhysicalFilePool;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.util.ShutdownHookUtil;

import org.slf4j.Logger;
Expand Down Expand Up @@ -84,7 +85,8 @@ public TaskExecutorFileMergingManager() {
@Nonnull JobID jobId,
@Nonnull ExecutionAttemptID executionAttemptID,
Configuration clusterConfiguration,
Configuration jobConfiguration) {
Configuration jobConfiguration,
TaskManagerJobMetricGroup metricGroup) {
boolean mergingEnabled =
jobConfiguration
.getOptional(FILE_MERGING_ENABLED)
Expand Down Expand Up @@ -136,6 +138,7 @@ public TaskExecutorFileMergingManager() {
? PhysicalFilePool.Type.BLOCKING
: PhysicalFilePool.Type.NON_BLOCKING)
.setMaxSpaceAmplification(spaceAmplification)
.setMetricGroup(metricGroup)
.build(),
new HashSet<>());
fileMergingSnapshotManagerByJobId.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,8 @@ public CompletableFuture<Acknowledge> submitTask(
jobId,
tdd.getExecutionAttemptId(),
taskManagerConfiguration.getConfiguration(),
jobInformation.getJobConfiguration());
jobInformation.getJobConfiguration(),
jobGroup);

final FileMergingSnapshotManagerClosableWrapper fileMergingSnapshotManagerClosable =
fileMergingSnapshotManager == null
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint.filemerging;

import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;

import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link FileMergingMetricGroup}. */
class FileMergingMetricsTest {
@Test
void testMetricsRegistration() {
final Collection<String> registeredGaugeNames = new ArrayList<>();

MetricGroup metricGroup =
new UnregisteredMetricsGroup() {
@Override
public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
if (gauge != null) {
registeredGaugeNames.add(name);
}
return gauge;
}
};
FileMergingSnapshotManager.SpaceStat spaceStat = new FileMergingSnapshotManager.SpaceStat();
FileMergingMetricGroup fileMergingMetricGroup =
new FileMergingMetricGroup(metricGroup, spaceStat);

assertThat(registeredGaugeNames)
.containsAll(
Arrays.asList(
FileMergingMetricGroup.LOGICAL_FILE_COUNT,
FileMergingMetricGroup.LOGICAL_FILE_SIZE,
FileMergingMetricGroup.PHYSICAL_FILE_COUNT,
FileMergingMetricGroup.PHYSICAL_FILE_SIZE));
assertThat(registeredGaugeNames.size()).isEqualTo(4);
}

@Test
@SuppressWarnings("unchecked")
void testMetricsAreUpdated() {
final Map<String, Gauge<?>> registeredGauges = new HashMap<>();

MetricGroup metricGroup =
new UnregisteredMetricsGroup() {
@Override
public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
registeredGauges.put(name, gauge);
return gauge;
}
};
FileMergingSnapshotManager.SpaceStat spaceStat = new FileMergingSnapshotManager.SpaceStat();
FileMergingMetricGroup fileMergingMetricGroup =
new FileMergingMetricGroup(metricGroup, spaceStat);
Gauge<Long> logicalFileCountGauge =
(Gauge<Long>) registeredGauges.get(FileMergingMetricGroup.LOGICAL_FILE_COUNT);
Gauge<Long> logicalFileSizeGauge =
(Gauge<Long>) registeredGauges.get(FileMergingMetricGroup.LOGICAL_FILE_SIZE);
Gauge<Long> physicalFileCountGauge =
(Gauge<Long>) registeredGauges.get(FileMergingMetricGroup.PHYSICAL_FILE_COUNT);
Gauge<Long> physicalFileSizeGauge =
(Gauge<Long>) registeredGauges.get(FileMergingMetricGroup.PHYSICAL_FILE_SIZE);

assertThat(logicalFileCountGauge.getValue()).isEqualTo(0L);
assertThat(logicalFileSizeGauge.getValue()).isEqualTo(0L);
assertThat(physicalFileCountGauge.getValue()).isEqualTo(0L);
assertThat(physicalFileSizeGauge.getValue()).isEqualTo(0L);

// update space stat
spaceStat.onLogicalFileCreate(100L);
spaceStat.onPhysicalFileCreate();
spaceStat.onPhysicalFileUpdate(100L);
assertThat(logicalFileCountGauge.getValue()).isEqualTo(1L);
assertThat(logicalFileSizeGauge.getValue()).isEqualTo(100L);
assertThat(physicalFileCountGauge.getValue()).isEqualTo(1L);
assertThat(physicalFileSizeGauge.getValue()).isEqualTo(100L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SpaceStat;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
Expand Down Expand Up @@ -594,6 +595,9 @@ FileMergingSnapshotManager createFileMergingSnapshotManager(
.setMaxFileSize(maxFileSize)
.setFilePoolType(filePoolType)
.setMaxSpaceAmplification(spaceAmplification)
.setMetricGroup(
new UnregisteredMetricGroups
.UnregisteredTaskManagerJobMetricGroup())
.build();
fmsm.initFileSystem(
LocalFileSystem.getSharedInstance(),
Expand Down
Loading

0 comments on commit 5497868

Please sign in to comment.