Skip to content

Commit

Permalink
[FLINK-36764] Add checkpoint type and unaligned flag to the checkpoin…
Browse files Browse the repository at this point in the history
…t trace
  • Loading branch information
pnowojski committed Nov 27, 2024
1 parent c6a026e commit 09ad451
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 13 deletions.
12 changes: 10 additions & 2 deletions docs/content.zh/docs/ops/traces.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ Flink reports a single span trace for the whole checkpoint and job initializatio
</thead>
<tbody>
<tr>
<th rowspan="18">org.apache.flink.</br>runtime.checkpoint.</br>CheckpointStatsTracker</th>
<th rowspan="6"><strong>Checkpoint</strong></th>
<th rowspan="20">org.apache.flink.</br>runtime.checkpoint.</br>CheckpointStatsTracker</th>
<th rowspan="8"><strong>Checkpoint</strong></th>
<td>startTs</td>
<td>Timestamp when the checkpoint has started.</td>
</tr>
Expand All @@ -122,6 +122,14 @@ Flink reports a single span trace for the whole checkpoint and job initializatio
<td>checkpointStatus</td>
<td>What was the state of this checkpoint: FAILED or COMPLETED.</td>
</tr>
<tr>
<td>checkpointType</td>
<td>Type of the checkpoint. For example: "Checkpoint", "Full Checkpoint" or "Terminate Savepoint" ...</td>
</tr>
<tr>
<td>isUnaligned</td>
<td>Whether checkpoint was aligned or unaligned.</td>
</tr>
<tr>
<th rowspan="12"><strong>JobInitialization</strong></th>
<td>startTs</td>
Expand Down
12 changes: 10 additions & 2 deletions docs/content/docs/ops/traces.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ Flink reports a single span trace for the whole checkpoint and job initializatio
</thead>
<tbody>
<tr>
<th rowspan="18">org.apache.flink.</br>runtime.checkpoint.</br>CheckpointStatsTracker</th>
<th rowspan="6"><strong>Checkpoint</strong></th>
<th rowspan="20">org.apache.flink.</br>runtime.checkpoint.</br>CheckpointStatsTracker</th>
<th rowspan="8"><strong>Checkpoint</strong></th>
<td>startTs</td>
<td>Timestamp when the checkpoint has started.</td>
</tr>
Expand All @@ -122,6 +122,14 @@ Flink reports a single span trace for the whole checkpoint and job initializatio
<td>checkpointStatus</td>
<td>What was the state of this checkpoint: FAILED or COMPLETED.</td>
</tr>
<tr>
<td>checkpointType</td>
<td>Type of the checkpoint. For example: "Checkpoint", "Full Checkpoint" or "Terminate Savepoint" ...</td>
</tr>
<tr>
<td>isUnaligned</td>
<td>Whether checkpoint was aligned or unaligned.</td>
</tr>
<tr>
<th rowspan="12"><strong>JobInitialization</strong></th>
<td>startTs</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,13 @@ private void logCheckpointStatistics(AbstractCheckpointStats checkpointStats) {
.setAttribute("checkpointId", checkpointStats.getCheckpointId())
.setAttribute("fullSize", checkpointStats.getStateSize())
.setAttribute("checkpointedSize", checkpointStats.getCheckpointedSize())
.setAttribute("checkpointStatus", checkpointStats.getStatus().name()));
.setAttribute("checkpointStatus", checkpointStats.getStatus().name())
.setAttribute(
"isUnaligned",
Boolean.toString(checkpointStats.isUnalignedCheckpoint()))
.setAttribute(
"checkpointType",
checkpointStats.getProperties().getCheckpointType().getName()));
if (LOG.isDebugEnabled()) {
StringWriter sw = new StringWriter();
MAPPER.writeValue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import static java.util.Collections.singletonMap;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -379,17 +378,37 @@ public void addSpan(SpanBuilder spanBuilder) {
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
singletonMap(jobVertexID, 1));

pending.reportSubtaskStats(jobVertexID, createSubtaskStats(0));
pending.reportSubtaskStats(jobVertexID, createSubtaskStats(0, false));

// Complete checkpoint => new snapshot
tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null));

assertThat(reportedSpans.size()).isEqualTo(1);
Span reportedSpan = Iterables.getOnlyElement(reportedSpans);
assertThat(reportedSpan.getAttributes().get("checkpointId")).isEqualTo(42L);
assertThat(reportedSpan.getAttributes().get("checkpointType")).isEqualTo("Checkpoint");
assertThat(reportedSpan.getAttributes().get("isUnaligned")).isEqualTo("false");

reportedSpans.clear();

pending =
tracker.reportPendingCheckpoint(
43,
1,
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
singletonMap(jobVertexID, 1));

pending.reportSubtaskStats(jobVertexID, createSubtaskStats(0, true));

// Complete checkpoint => new snapshot
tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null));

assertThat(reportedSpans.size()).isEqualTo(1);
assertThat(
reportedSpans.stream()
.map(span -> span.getAttributes().get("checkpointId"))
.collect(Collectors.toList()))
.containsExactly(42L);
reportedSpan = Iterables.getOnlyElement(reportedSpans);
assertThat(reportedSpan.getAttributes().get("checkpointId")).isEqualTo(43L);
assertThat(reportedSpan.getAttributes().get("checkpointType")).isEqualTo("Checkpoint");
assertThat(reportedSpan.getAttributes().get("isUnaligned")).isEqualTo("true");
}

@Test
Expand Down Expand Up @@ -763,7 +782,11 @@ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
// ------------------------------------------------------------------------

private SubtaskStateStats createSubtaskStats(int index) {
return new SubtaskStateStats(index, 0, 0, 0, 0, 0, 0, 0, 0, 0, false, true);
return createSubtaskStats(index, false);
}

private SubtaskStateStats createSubtaskStats(int index, boolean unaligned) {
return new SubtaskStateStats(index, 0, 0, 0, 0, 0, 0, 0, 0, 0, unaligned, true);
}

private void reportRestoredCheckpoint(
Expand Down

0 comments on commit 09ad451

Please sign in to comment.