Skip to content

Commit

Permalink
[FLINK-4380] Introduce KeyGroupAssigner and Max-Parallelism Parameter
Browse files Browse the repository at this point in the history
This introduces a new KeySelector that assigns keys to key groups and
also adds the max parallelism parameter throughout all API levels.

This also adds tests for the newly introduced features.
  • Loading branch information
tillrohrmann authored and aljoscha committed Aug 31, 2016
1 parent 7cd9bb5 commit ec975aa
Show file tree
Hide file tree
Showing 57 changed files with 1,962 additions and 390 deletions.
2 changes: 2 additions & 0 deletions docs/dev/api_concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,8 @@ With the closure cleaner disabled, it might happen that an anonymous user functi

- `getParallelism()` / `setParallelism(int parallelism)` Set the default parallelism for the job.

- `getMaxParallelism()` / `setMaxParallelism(int parallelism)` Set the default maximum parallelism for the job. This setting determines the maximum degree of parallelism and specifies the upper limit for dynamic scaling.

- `getNumberOfExecutionRetries()` / `setNumberOfExecutionRetries(int numberOfExecutionRetries)` Sets the number of times that failed tasks are re-executed. A value of zero effectively disables fault tolerance. A value of `-1` indicates that the system default value (as defined in the configuration) should be used.

- `getExecutionRetryDelay()` / `setExecutionRetryDelay(long executionRetryDelay)` Sets the delay in milliseconds that the system waits after a job has failed, before re-executing it. The delay starts after all tasks have been successfully been stopped on the TaskManagers, and once the delay is past, the tasks are re-started. This parameter is useful to delay re-execution in order to let certain time-out related failures surface fully (like broken connections that have not fully timed out), before attempting a re-execution and immediately failing again due to the same problem. This parameter only has an effect if the number of execution re-tries is one or more.
Expand Down
7 changes: 7 additions & 0 deletions flink-contrib/flink-statebackend-rocksdb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ log4j.appender.A1=org.apache.log4j.ConsoleAppender

# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ log4j.rootLogger=OFF, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target = System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ public class ExecutionConfig implements Serializable {
*/
public static final int PARALLELISM_DEFAULT = -1;

/**
* The flag value indicating an unknown or unset parallelism. This value is
* not a valid parallelism and indicates that the parallelism should remain
* unchanged.
*/
public static final int PARALLELISM_UNKNOWN = -2;

private static final long DEFAULT_RESTART_DELAY = 10000L;

// --------------------------------------------------------------------------------------------
Expand All @@ -85,6 +92,13 @@ public class ExecutionConfig implements Serializable {

private int parallelism = PARALLELISM_DEFAULT;

/**
* The program wide maximum parallelism used for operators which haven't specified a maximum
* parallelism. The maximum parallelism specifies the upper limit for dynamic scaling and the
* number of key groups used for partitioned state.
*/
private int maxParallelism = -1;

/**
* @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
*/
Expand Down Expand Up @@ -219,12 +233,41 @@ public int getParallelism() {
* @param parallelism The parallelism to use
*/
public ExecutionConfig setParallelism(int parallelism) {
Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
"The parallelism of an operator must be at least 1.");
if (parallelism != PARALLELISM_UNKNOWN) {
if (parallelism < 1 && parallelism != PARALLELISM_DEFAULT) {
throw new IllegalArgumentException(
"Parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT (use system default).");
}
this.parallelism = parallelism;
}
return this;
}

this.parallelism = parallelism;
/**
* Gets the maximum degree of parallelism defined for the program.
*
* The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
* defines the number of key groups used for partitioned state.
*
* @return Maximum degree of parallelism
*/
@PublicEvolving
public int getMaxParallelism() {
return maxParallelism;
}

return this;
/**
* Sets the maximum degree of parallelism defined for the program.
*
* The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
* defines the number of key groups used for partitioned state.
*
* @param maxParallelism Maximum degree of parallelism to be used for the program.
*/
@PublicEvolving
public void setMaxParallelism(int maxParallelism) {
Preconditions.checkArgument(maxParallelism > 0, "The maximum parallelism must be greater than 0.");
this.maxParallelism = maxParallelism;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.state;

import org.apache.flink.annotation.Internal;

import java.io.Serializable;

/**
* Assigns a key to a key group index. A key group is the smallest unit of partitioned state
* which is assigned to an operator. An operator can be assigned multiple key groups.
*
* @param <K> Type of the key
*/
@Internal
public interface KeyGroupAssigner<K> extends Serializable {
/**
* Calculates the key group index for the given key.
*
* @param key Key to be used
* @return Key group index for the given key
*/
int getKeyGroupIndex(K key);

/**
* Setups the key group assigner with the maximum parallelism (= number of key groups).
*
* @param maxParallelism Maximum parallelism (= number of key groups)
*/
void setup(int maxParallelism);
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
* <p>Subclasses must correctly implement {@link #equals(Object)} and {@link #hashCode()}.
*
* @param <S> The type of the State objects created from this {@code StateDescriptor}.
* @param <T> The type of the value of the state object described by this state descriptor.
*/
@PublicEvolving
public abstract class StateDescriptor<S extends State, T> implements Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,6 @@ public class CheckpointCoordinator {
/** Helper for tracking checkpoint statistics */
private final CheckpointStatsTracker statsTracker;

private final int numberKeyGroups;

// --------------------------------------------------------------------------------------------

public CheckpointCoordinator(
Expand All @@ -165,7 +163,6 @@ public CheckpointCoordinator(
long checkpointTimeout,
long minPauseBetweenCheckpoints,
int maxConcurrentCheckpointAttempts,
int numberKeyGroups,
ExecutionVertex[] tasksToTrigger,
ExecutionVertex[] tasksToWaitFor,
ExecutionVertex[] tasksToCommitTo,
Expand Down Expand Up @@ -202,7 +199,6 @@ public CheckpointCoordinator(
this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
this.userClassLoader = checkNotNull(userClassLoader);
this.statsTracker = checkNotNull(statsTracker);
this.numberKeyGroups = numberKeyGroups;

this.timer = new Timer("Checkpoint Timer", true);

Expand Down Expand Up @@ -797,7 +793,7 @@ public boolean restoreLatestCheckpointedState(

int counter = 0;

List<Set<Integer>> keyGroupPartitions = createKeyGroupPartitions(numberKeyGroups, executionJobVertex.getParallelism());
List<Set<Integer>> keyGroupPartitions = createKeyGroupPartitions(executionJobVertex.getMaxParallelism(), executionJobVertex.getParallelism());

for (int i = 0; i < executionJobVertex.getParallelism(); i++) {
SubtaskState subtaskState = taskState.getState(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
*/
public class InputChannelDeploymentDescriptor implements Serializable {

private static final long serialVersionUID = 373711381640454080L;
private static Logger LOG = LoggerFactory.getLogger(InputChannelDeploymentDescriptor.class);

/** The ID of the partition the input channel is going to consume. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
*/
public class InputGateDeploymentDescriptor implements Serializable {

private static final long serialVersionUID = -7143441863165366704L;
/**
* The ID of the consumed intermediate result. Each input gate consumes partitions of the
* intermediate result specified by this ID. This ID also identifies the input gate at the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
*/
public class ResultPartitionLocation implements Serializable {

private static final long serialVersionUID = -6354238166937194463L;
/** The type of location for the result partition. */
private final LocationType locationType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,6 @@ public void enableSnapshotCheckpointing(
long checkpointTimeout,
long minPauseBetweenCheckpoints,
int maxConcurrentCheckpoints,
int numberKeyGroups,
List<ExecutionJobVertex> verticesToTrigger,
List<ExecutionJobVertex> verticesToWaitFor,
List<ExecutionJobVertex> verticesToCommitTo,
Expand Down Expand Up @@ -373,7 +372,6 @@ public void enableSnapshotCheckpointing(
checkpointTimeout,
minPauseBetweenCheckpoints,
maxConcurrentCheckpoints,
numberKeyGroups,
tasksToTrigger,
tasksToWaitFor,
tasksToCommitTo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.util.SerializableObject;

import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

import scala.concurrent.duration.FiniteDuration;
Expand Down Expand Up @@ -69,6 +69,8 @@ public class ExecutionJobVertex {
private final List<IntermediateResult> inputs;

private final int parallelism;

private final int maxParallelism;

private final boolean[] finishedSubtasks;

Expand All @@ -81,16 +83,23 @@ public class ExecutionJobVertex {
private final InputSplit[] inputSplits;

private InputSplitAssigner splitAssigner;

public ExecutionJobVertex(
ExecutionGraph graph,
JobVertex jobVertex,
int defaultParallelism,
FiniteDuration timeout) throws JobException {

public ExecutionJobVertex(ExecutionGraph graph, JobVertex jobVertex,
int defaultParallelism, FiniteDuration timeout) throws JobException {
this(graph, jobVertex, defaultParallelism, timeout, System.currentTimeMillis());
}

public ExecutionJobVertex(ExecutionGraph graph, JobVertex jobVertex,
int defaultParallelism, FiniteDuration timeout, long createTimestamp)
throws JobException
{
public ExecutionJobVertex(
ExecutionGraph graph,
JobVertex jobVertex,
int defaultParallelism,
FiniteDuration timeout,
long createTimestamp) throws JobException {

if (graph == null || jobVertex == null) {
throw new NullPointerException();
}
Expand All @@ -102,6 +111,14 @@ public ExecutionJobVertex(ExecutionGraph graph, JobVertex jobVertex,
int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism;

this.parallelism = numTaskVertices;

int maxParallelism = jobVertex.getMaxParallelism();

Preconditions.checkArgument(maxParallelism >= parallelism, "The maximum parallelism (" +
maxParallelism + ") must be greater or equal than the parallelism (" + parallelism +
").");
this.maxParallelism = maxParallelism;

this.taskVertices = new ExecutionVertex[numTaskVertices];

this.inputs = new ArrayList<IntermediateResult>(jobVertex.getInputs().size());
Expand Down Expand Up @@ -177,6 +194,10 @@ public int getParallelism() {
return parallelism;
}

public int getMaxParallelism() {
return maxParallelism;
}

public JobID getJobId() {
return graph.getJobID();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ public int getParallelSubtaskIndex() {
return this.subTaskIndex;
}

public int getMaxParallelism() {
return this.jobVertex.getMaxParallelism();
}

public int getNumberOfInputs() {
return this.inputEdges.length;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

package org.apache.flink.runtime.jobgraph;

import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitSource;
Expand All @@ -31,6 +28,9 @@
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.util.Preconditions;

import java.util.ArrayList;
import java.util.List;

/**
* The base class for job vertexes.
*/
Expand All @@ -57,6 +57,9 @@ public class JobVertex implements java.io.Serializable {
/** Number of subtasks to split this task into at runtime.*/
private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;

/** Maximum number of subtasks to split this taks into a runtime. */
private int maxParallelism = Short.MAX_VALUE;

/** Custom configuration passed to the assigned task at runtime. */
private Configuration configuration;

Expand Down Expand Up @@ -234,6 +237,27 @@ public void setParallelism(int parallelism) {
this.parallelism = parallelism;
}

/**
* Gets the maximum parallelism for the task.
*
* @return The maximum parallelism for the task.
*/
public int getMaxParallelism() {
return maxParallelism;
}

/**
* Sets the maximum parallelism for the task.
*
* @param maxParallelism The maximum parallelism to be set.
*/
public void setMaxParallelism(int maxParallelism) {
org.apache.flink.util.Preconditions.checkArgument(
maxParallelism > 0 && maxParallelism <= Short.MAX_VALUE, "The max parallelism must be at least 1.");

this.maxParallelism = maxParallelism;
}

public InputSplitSource<?> getInputSplitSource() {
return inputSplitSource;
}
Expand Down
Loading

0 comments on commit ec975aa

Please sign in to comment.