Skip to content

Commit

Permalink
[FLINK-4684] [checkpoints] Remove redundant class loader from Checkpo…
Browse files Browse the repository at this point in the history
…intCoordinator
  • Loading branch information
StephanEwen committed Sep 26, 2016
1 parent 8fa313c commit 70e71c1
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,6 @@ public class CheckpointCoordinator {
* need to be ascending across job managers. */
private final CheckpointIDCounter checkpointIdCounter;

/** Class loader used to deserialize the state handles (as they may be user-defined) */
private final ClassLoader userClassLoader;

/** The base checkpoint interval. Actual trigger time may be affected by the
* max concurrent checkpoints and minimum-pause values */
private final long baseInterval;
Expand Down Expand Up @@ -167,7 +164,6 @@ public CheckpointCoordinator(
ExecutionVertex[] tasksToTrigger,
ExecutionVertex[] tasksToWaitFor,
ExecutionVertex[] tasksToCommitTo,
ClassLoader userClassLoader,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
SavepointStore savepointStore,
Expand Down Expand Up @@ -198,7 +194,6 @@ public CheckpointCoordinator(
this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
this.savepointStore = checkNotNull(savepointStore);
this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
this.userClassLoader = checkNotNull(userClassLoader);
this.statsTracker = checkNotNull(statsTracker);

this.timer = new Timer("Checkpoint Timer", true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public TaskState(JobVertexID jobVertexID, int parallelism, int maxParallelism) {
"Parallelism " + parallelism + " is not smaller or equal to max parallelism " + maxParallelism + ".");

this.jobVertexID = jobVertexID;
//preallocate lists of the required size, so that we can randomly set values to indexes

this.subtaskStates = new HashMap<>(parallelism);
this.keyGroupsStateHandles = new HashMap<>(parallelism);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,6 @@ public void enableSnapshotCheckpointing(
tasksToTrigger,
tasksToWaitFor,
tasksToCommitTo,
userClassLoader,
checkpointIDCounter,
checkpointStore,
savepointStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static void bestEffortDiscardAllStateObjects(
* occurring exceptions are suppressed and collected until the iteration is over and emitted as a single exception.
*
* @param handlesToDiscard State handles to discard. Passed iterable is allowed to deliver null values.
* @throws Exception exception that is a collection of all suppressed exceptions that were caught during iteration
* @throws IOException exception that is a collection of all suppressed exceptions that were caught during iteration
*/
public static void bestEffortCloseAllStateObjects(
Iterable<? extends StateObject> handlesToDiscard) throws IOException {
Expand Down
Loading

0 comments on commit 70e71c1

Please sign in to comment.