Skip to content

Commit

Permalink
[FLINK-20619][runtime] Remove unused InputDependencyConstraint
Browse files Browse the repository at this point in the history
This work is part of FLINK-20589

This closes apache#14579
  • Loading branch information
XComp authored and rmetzger committed Jan 11, 2021
1 parent 433c4cd commit 7404c95
Show file tree
Hide file tree
Showing 20 changed files with 50 additions and 731 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
* This flag defines if we use compression for the state snapshot data or not. Default: false
*/
private boolean useSnapshotCompression = false;
/** The default input dependency constraint to schedule tasks. */
private InputDependencyConstraint defaultInputDependencyConstraint =
InputDependencyConstraint.ANY;

// ------------------------------- User code values --------------------------------------------

Expand Down Expand Up @@ -528,36 +525,29 @@ public ExecutionMode getExecutionMode() {
}

/**
* Sets the default input dependency constraint for vertex scheduling. It indicates when a task
* should be scheduled considering its inputs status.
* This method is deprecated. It was used to set the {@link InputDependencyConstraint} utilized
* by the old scheduler implementations which got removed as part of FLINK-20589. The current
* implementation has no effect.
*
* <p>The default constraint is {@link InputDependencyConstraint#ANY}.
*
* @param inputDependencyConstraint The input dependency constraint.
* @param ignored Ignored parameter.
* @deprecated due to the deprecation of {@code InputDependencyConstraint}.
*/
@PublicEvolving
public void setDefaultInputDependencyConstraint(
InputDependencyConstraint inputDependencyConstraint) {
if (inputDependencyConstraint != null) {
this.defaultInputDependencyConstraint = inputDependencyConstraint;
} else {
// defaultInputDependencyConstraint is not allowed to be null
// setting it to ANY to not break existing jobs
this.defaultInputDependencyConstraint = InputDependencyConstraint.ANY;
}
}
@Deprecated
public void setDefaultInputDependencyConstraint(InputDependencyConstraint ignored) {}

/**
* Gets the default input dependency constraint for vertex scheduling. It indicates when a task
* should be scheduled considering its inputs status.
*
* <p>The default constraint is {@link InputDependencyConstraint#ANY}.
* This method is deprecated. It was used to return the {@link InputDependencyConstraint}
* utilized by the old scheduler implementations. These implementations were removed as part of
* FLINK-20589.
*
* @return The input dependency constraint of this job.
* @return The previous default constraint {@link InputDependencyConstraint#ANY}.
* @deprecated due to the deprecation of {@code InputDependencyConstraint}.
*/
@PublicEvolving
@Deprecated
public InputDependencyConstraint getDefaultInputDependencyConstraint() {
return defaultInputDependencyConstraint;
return InputDependencyConstraint.ANY;
}

/**
Expand Down Expand Up @@ -913,8 +903,7 @@ public boolean equals(Object obj) {
&& registeredKryoTypes.equals(other.registeredKryoTypes)
&& registeredPojoTypes.equals(other.registeredPojoTypes)
&& taskCancellationIntervalMillis == other.taskCancellationIntervalMillis
&& useSnapshotCompression == other.useSnapshotCompression
&& defaultInputDependencyConstraint == other.defaultInputDependencyConstraint;
&& useSnapshotCompression == other.useSnapshotCompression;

} else {
return false;
Expand All @@ -940,8 +929,7 @@ public int hashCode() {
registeredKryoTypes,
registeredPojoTypes,
taskCancellationIntervalMillis,
useSnapshotCompression,
defaultInputDependencyConstraint);
useSnapshotCompression);
}

@Override
Expand Down Expand Up @@ -985,8 +973,6 @@ public String toString() {
+ taskCancellationTimeoutMillis
+ ", useSnapshotCompression="
+ useSnapshotCompression
+ ", defaultInputDependencyConstraint="
+ defaultInputDependencyConstraint
+ ", globalJobParameters="
+ globalJobParameters
+ ", registeredTypesWithKryoSerializers="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@

import org.apache.flink.annotation.PublicEvolving;

/** This constraint indicates when a task should be scheduled considering its inputs status. */
/**
* This constraint indicates when a task should be scheduled considering its inputs status.
*
* @deprecated {@code InputDependencyConstraint} is not used anymore and will be deleted in one of
* the future versions. It was previously used in the scheduler implementations that were
* removed as part of FLINK-20589.
*/
@PublicEvolving
@Deprecated
public enum InputDependencyConstraint {

/** Schedule the task if any input is consumable. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,13 +255,7 @@ public JobGraph compileJobGraph(OptimizedPlan program, JobID jobId) {
}

// add vertices to the graph
for (JobVertex vertex : this.vertices.values()) {
vertex.setInputDependencyConstraint(
program.getOriginalPlan()
.getExecutionConfig()
.getDefaultInputDependencyConstraint());
graph.addVertex(vertex);
}
this.vertices.values().forEach(graph::addVertex);

for (JobVertex vertex : this.auxVertices) {
graph.addVertex(vertex);
Expand Down
16 changes: 16 additions & 0 deletions flink-python/pyflink/common/execution_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import warnings

from typing import Dict, List

from pyflink.common.execution_mode import ExecutionMode
Expand Down Expand Up @@ -328,7 +330,14 @@ def set_default_input_dependency_constraint(
:param input_dependency_constraint: The input dependency constraint. The constraints could
be :data:`InputDependencyConstraint.ANY` or
:data:`InputDependencyConstraint.ALL`.
.. note:: Deprecated in 1.13. :class:`InputDependencyConstraint` is not used anymore in the
current scheduler implementations.
"""
warnings.warn("Deprecated in 1.13. InputDependencyConstraint is not used anywhere. "
"Therefore, the method call set_default_input_dependency_constraint is "
"obsolete.", DeprecationWarning)

self._j_execution_config.setDefaultInputDependencyConstraint(
input_dependency_constraint._to_j_input_dependency_constraint())
return self
Expand All @@ -344,7 +353,14 @@ def get_default_input_dependency_constraint(self) -> 'InputDependencyConstraint'
:return: The input dependency constraint of this job. The possible constraints are
:data:`InputDependencyConstraint.ANY` and :data:`InputDependencyConstraint.ALL`.
.. note:: Deprecated in 1.13. :class:`InputDependencyConstraint` is not used anymore in the
current scheduler implementations.
"""
warnings.warn("Deprecated in 1.13. InputDependencyConstraint is not used anywhere. "
"Therefore, the method call get_default_input_dependency_constraint is "
"obsolete.", DeprecationWarning)

j_input_dependency_constraint = self._j_execution_config\
.getDefaultInputDependencyConstraint()
return InputDependencyConstraint._from_j_input_dependency_constraint(
Expand Down
17 changes: 1 addition & 16 deletions flink-python/pyflink/common/tests/test_execution_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
# limitations under the License.
################################################################################
from pyflink.dataset import ExecutionEnvironment
from pyflink.common import (ExecutionConfig, RestartStrategies, ExecutionMode,
InputDependencyConstraint)
from pyflink.common import (ExecutionConfig, RestartStrategies, ExecutionMode)
from pyflink.java_gateway import get_gateway
from pyflink.testing.test_case_utils import PyFlinkTestCase

Expand Down Expand Up @@ -132,20 +131,6 @@ def test_get_set_execution_mode(self):

self.assertEqual(self.execution_config.get_execution_mode(), ExecutionMode.PIPELINED_FORCED)

def test_get_set_default_input_dependency_constraint(self):

self.execution_config.set_default_input_dependency_constraint(
InputDependencyConstraint.ALL)

self.assertEqual(self.execution_config.get_default_input_dependency_constraint(),
InputDependencyConstraint.ALL)

self.execution_config.set_default_input_dependency_constraint(
InputDependencyConstraint.ANY)

self.assertEqual(self.execution_config.get_default_input_dependency_constraint(),
InputDependencyConstraint.ANY)

def test_disable_enable_force_kryo(self):

self.execution_config.disable_force_kryo()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
Expand Down Expand Up @@ -365,10 +364,6 @@ public List<IntermediateResult> getInputs() {
return inputs;
}

public InputDependencyConstraint getInputDependencyConstraint() {
return getJobVertex().getInputDependencyConstraint();
}

public Collection<OperatorCoordinatorHolder> getOperatorCoordinators() {
return operatorCoordinators;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.io.InputSplit;
Expand Down Expand Up @@ -346,10 +345,6 @@ public Map<IntermediateResultPartitionID, IntermediateResultPartition> getProduc
return resultPartitions;
}

public InputDependencyConstraint getInputDependencyConstraint() {
return getJobVertex().getInputDependencyConstraint();
}

// --------------------------------------------------------------------------------------------
// Graph building
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -777,64 +772,6 @@ List<IntermediateResultPartition> finishAllBlockingPartitions() {
}
}

/**
* Check whether the InputDependencyConstraint is satisfied for this vertex.
*
* @return whether the input constraint is satisfied
*/
boolean checkInputDependencyConstraints() {
if (inputEdges.length == 0) {
return true;
}

final InputDependencyConstraint inputDependencyConstraint = getInputDependencyConstraint();
switch (inputDependencyConstraint) {
case ANY:
return isAnyInputConsumable();
case ALL:
return areAllInputsConsumable();
default:
throw new IllegalStateException(
"Unknown InputDependencyConstraint " + inputDependencyConstraint);
}
}

private boolean isAnyInputConsumable() {
for (int inputNumber = 0; inputNumber < inputEdges.length; inputNumber++) {
if (isInputConsumable(inputNumber)) {
return true;
}
}
return false;
}

private boolean areAllInputsConsumable() {
for (int inputNumber = 0; inputNumber < inputEdges.length; inputNumber++) {
if (!isInputConsumable(inputNumber)) {
return false;
}
}
return true;
}

/**
* Get whether an input of the vertex is consumable. An input is consumable when when any
* partition in it is consumable.
*
* <p>Note that a BLOCKING result partition is only consumable when all partitions in the result
* are FINISHED.
*
* @return whether the input is consumable
*/
boolean isInputConsumable(int inputNumber) {
for (ExecutionEdge executionEdge : inputEdges[inputNumber]) {
if (executionEdge.getSource().isConsumable()) {
return true;
}
}
return false;
}

// --------------------------------------------------------------------------------------------
// Notifications from the Execution Attempt
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.runtime.jobgraph;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitSource;
Expand Down Expand Up @@ -133,9 +132,6 @@ public class JobVertex implements java.io.Serializable {
*/
private String resultOptimizerProperties;

/** The input dependency constraint to schedule this vertex. */
private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY;

// --------------------------------------------------------------------------------------------

/**
Expand Down Expand Up @@ -577,14 +573,6 @@ public void setResultOptimizerProperties(String resultOptimizerProperties) {
this.resultOptimizerProperties = resultOptimizerProperties;
}

public InputDependencyConstraint getInputDependencyConstraint() {
return inputDependencyConstraint;
}

public void setInputDependencyConstraint(InputDependencyConstraint inputDependencyConstraint) {
this.inputDependencyConstraint = inputDependencyConstraint;
}

// --------------------------------------------------------------------------------------------

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,7 @@ private static DefaultExecutionVertex generateSchedulingExecutionVertex(

DefaultExecutionVertex schedulingVertex =
new DefaultExecutionVertex(
vertex.getID(),
producedPartitions,
vertex::getExecutionState,
vertex.getInputDependencyConstraint());
vertex.getID(), producedPartitions, vertex::getExecutionState);

producedPartitions.forEach(partition -> partition.setProducer(schedulingVertex));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.runtime.scheduler.adapter;

import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
Expand All @@ -40,18 +39,14 @@ class DefaultExecutionVertex implements SchedulingExecutionVertex {

private final Supplier<ExecutionState> stateSupplier;

private final InputDependencyConstraint inputDependencyConstraint;

DefaultExecutionVertex(
ExecutionVertexID executionVertexId,
List<DefaultResultPartition> producedPartitions,
Supplier<ExecutionState> stateSupplier,
InputDependencyConstraint constraint) {
Supplier<ExecutionState> stateSupplier) {
this.executionVertexId = checkNotNull(executionVertexId);
this.consumedResults = new ArrayList<>();
this.stateSupplier = checkNotNull(stateSupplier);
this.producedResults = checkNotNull(producedPartitions);
this.inputDependencyConstraint = checkNotNull(constraint);
}

@Override
Expand All @@ -74,11 +69,6 @@ public Iterable<DefaultResultPartition> getProducedResults() {
return producedResults;
}

@Override
public InputDependencyConstraint getInputDependencyConstraint() {
return inputDependencyConstraint;
}

void addConsumedResult(DefaultResultPartition result) {
consumedResults.add(result);
}
Expand Down
Loading

0 comments on commit 7404c95

Please sign in to comment.