Skip to content

Commit

Permalink
[FLINK-5628] [webfrontend] Fix serializability of checkpoint stats tr…
Browse files Browse the repository at this point in the history
…acker

This closes apache#3215.
  • Loading branch information
uce committed Jan 30, 2017
1 parent 126fb17 commit dcfa3fb
Show file tree
Hide file tree
Showing 30 changed files with 490 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.runtime.webmonitor.handlers.checkpoints;

import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
Expand All @@ -42,10 +41,13 @@ public CheckpointConfigHandler(ExecutionGraphHolder executionGraphHolder) {
@Override
public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
StringWriter writer = new StringWriter();

JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
JobSnapshottingSettings settings = graph.getJobSnapshottingSettings();

CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();
JobSnapshottingSettings settings = tracker.getSnapshottingSettings();
if (settings == null) {
return "{}";
}

gen.writeStartObject();
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
import org.apache.flink.runtime.checkpoint.TaskStateStats;
Expand Down Expand Up @@ -54,8 +53,10 @@ public String handleRequest(AccessExecutionGraph graph, Map<String, String> para
return "{}";
}

CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();
CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
if (snapshot == null) {
return "{}";
}

AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
import org.apache.flink.runtime.checkpoint.TaskStateStats;
Expand Down Expand Up @@ -72,8 +71,10 @@ public String handleRequest(AccessExecutionGraph graph, Map<String, String> para
return "{}";
}

CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();
CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
if (snapshot == null) {
return "{}";
}

AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
Expand Down Expand Up @@ -54,8 +53,10 @@ public String handleRequest(AccessExecutionGraph graph, Map<String, String> para
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();
CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
if (snapshot == null) {
return "{}";
}

gen.writeStartObject();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
Expand Down Expand Up @@ -60,9 +59,7 @@ public void testSimpleConfig() throws Exception {
true);

AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
when(tracker.getSnapshottingSettings()).thenReturn(settings);
when(graph.getJobSnapshottingSettings()).thenReturn(settings);

CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
Expand Down Expand Up @@ -98,9 +95,7 @@ public void testAtLeastOnce() throws Exception {
false); // at least once

AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
when(tracker.getSnapshottingSettings()).thenReturn(settings);
when(graph.getJobSnapshottingSettings()).thenReturn(settings);

CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
Expand Down Expand Up @@ -130,9 +125,7 @@ public void testEnabledExternalizedCheckpointSettings() throws Exception {
false); // at least once

AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
when(tracker.getSnapshottingSettings()).thenReturn(settings);
when(graph.getJobSnapshottingSettings()).thenReturn(settings);

CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
Expand Down Expand Up @@ -89,9 +88,7 @@ public void testCheckpointNotFound() throws Exception {
when(snapshot.getHistory()).thenReturn(history);

AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
when(tracker.createSnapshot()).thenReturn(snapshot);
when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);

CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
Map<String, String> params = new HashMap<>();
Expand Down Expand Up @@ -238,16 +235,14 @@ public void testCheckpointDetailsRequestFailedCheckpoint() throws Exception {

// ------------------------------------------------------------------------

static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throws Exception {
private static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throws Exception {
CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
when(history.getCheckpointById(anyLong())).thenReturn(checkpoint);
CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
when(snapshot.getHistory()).thenReturn(history);

AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
when(tracker.createSnapshot()).thenReturn(snapshot);
when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);

CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
Map<String, String> params = new HashMap<>();
Expand All @@ -258,7 +253,7 @@ static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throws Except
return mapper.readTree(json);
}

static void verifyTaskNode(TaskStateStats task, JsonNode parentNode) {
private static void verifyTaskNode(TaskStateStats task, JsonNode parentNode) {
long duration = ThreadLocalRandom.current().nextInt(128);

JsonNode taskNode = parentNode.get("tasks").get(task.getJobVertexId().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
Expand Down Expand Up @@ -179,9 +178,7 @@ public void testCheckpointStatsRequest() throws Exception {
when(snapshot.getLatestRestoredCheckpoint()).thenReturn(latestRestored);

AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
when(tracker.createSnapshot()).thenReturn(snapshot);
when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);

CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class));
String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
Expand Down Expand Up @@ -129,9 +128,7 @@ public void testCheckpointNotFound() throws Exception {
when(snapshot.getHistory()).thenReturn(history);

AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
when(tracker.createSnapshot()).thenReturn(snapshot);
when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);

CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
Map<String, String> params = new HashMap<>();
Expand Down Expand Up @@ -186,9 +183,7 @@ public void testJobVertexNotFound() throws Exception {
when(snapshot.getHistory()).thenReturn(history);

AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
when(tracker.createSnapshot()).thenReturn(snapshot);
when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);

CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
Map<String, String> params = new HashMap<>();
Expand All @@ -209,9 +204,7 @@ private static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throw
when(snapshot.getHistory()).thenReturn(history);

AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
when(tracker.createSnapshot()).thenReturn(snapshot);
when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);

CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
Map<String, String> params = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.runtime.jobgraph.JobVertexID;

import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;

Expand All @@ -30,7 +31,9 @@
/**
* Base class for checkpoint statistics.
*/
public abstract class AbstractCheckpointStats {
public abstract class AbstractCheckpointStats implements Serializable {

private static final long serialVersionUID = 1041218202028265151L;

/** ID of this checkpoint. */
final long checkpointId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,9 @@ boolean replacePendingCheckpointById(AbstractCheckpointStats completedOrFailed)
*
* <p>The iteration order is in reverse insertion order.
*/
private static class CheckpointsStatsHistoryIterable implements Iterable<AbstractCheckpointStats> {
private static class CheckpointsStatsHistoryIterable implements Iterable<AbstractCheckpointStats>, Serializable {

private static final long serialVersionUID = 726376482426055490L;

/** Copy of the checkpointsArray array at the point when this iterable was created. */
private final AbstractCheckpointStats[] checkpointsArray;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;

import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -52,9 +51,7 @@
* <p>The statistics are accessed via {@link #createSnapshot()} and exposed via
* both the web frontend and the {@link Metric} system.
*/
public class CheckpointStatsTracker implements Serializable {

private static final long serialVersionUID = 1694085244807339288L;
public class CheckpointStatsTracker {

/**
* Lock used to update stats and creating snapshots. Updates always happen
Expand All @@ -67,9 +64,6 @@ public class CheckpointStatsTracker implements Serializable {
*/
private final ReentrantLock statsReadWriteLock = new ReentrantLock();

/** The job vertices taking part in the checkpoints. */
private final List<ExecutionJobVertex> jobVertices;

/** Total number of subtasks to checkpoint. */
private final int totalSubtaskCount;

Expand All @@ -85,6 +79,9 @@ public class CheckpointStatsTracker implements Serializable {
/** History of checkpoints. */
private final CheckpointStatsHistory history;

/** The job vertices taking part in the checkpoints. */
private final transient List<ExecutionJobVertex> jobVertices;

/** The latest restored checkpoint. */
@Nullable
private RestoredCheckpointStats latestRestoredCheckpoint;
Expand Down Expand Up @@ -217,6 +214,11 @@ PendingCheckpointStats reportPendingCheckpoint(
return pending;
}

/**
* Callback when a checkpoint is restored.
*
* @param restored The restored checkpoint stats.
*/
void reportRestoredCheckpoint(RestoredCheckpointStats restored) {
checkNotNull(restored, "Restored checkpoint");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@
*/
public class CompletedCheckpointStats extends AbstractCheckpointStats {

/** Callback for the {@link CompletedCheckpoint} instance to notify about discard. */
private final DiscardCallback discardCallback;
private static final long serialVersionUID = 138833868551861343L;

/** Total checkpoint state size over all subtasks. */
private final long stateSize;
Expand Down Expand Up @@ -69,16 +68,16 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats {
* @param externalPath Optional external path if persisted externally.
*/
CompletedCheckpointStats(
long checkpointId,
long triggerTimestamp,
CheckpointProperties props,
int totalSubtaskCount,
Map<JobVertexID, TaskStateStats> taskStats,
int numAcknowledgedSubtasks,
long stateSize,
long alignmentBuffered,
SubtaskStateStats latestAcknowledgedSubtask,
@Nullable String externalPath) {
long checkpointId,
long triggerTimestamp,
CheckpointProperties props,
int totalSubtaskCount,
Map<JobVertexID, TaskStateStats> taskStats,
int numAcknowledgedSubtasks,
long stateSize,
long alignmentBuffered,
SubtaskStateStats latestAcknowledgedSubtask,
@Nullable String externalPath) {

super(checkpointId, triggerTimestamp, props, totalSubtaskCount, taskStats);
checkArgument(numAcknowledgedSubtasks == totalSubtaskCount, "Did not acknowledge all subtasks.");
Expand All @@ -87,7 +86,6 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats {
this.alignmentBuffered = alignmentBuffered;
this.latestAcknowledgedSubtask = checkNotNull(latestAcknowledgedSubtask);
this.externalPath = externalPath;
this.discardCallback = new DiscardCallback();
}

@Override
Expand Down Expand Up @@ -145,7 +143,7 @@ public boolean isDiscarded() {
* @return Callback for the {@link CompletedCheckpoint}.
*/
DiscardCallback getDiscardCallback() {
return discardCallback;
return new DiscardCallback();
}

/**
Expand Down
Loading

0 comments on commit dcfa3fb

Please sign in to comment.