Skip to content

Commit

Permalink
Revert "[FLINK-3232] [runtime] Add option to eagerly deploy channels"
Browse files Browse the repository at this point in the history
The reverted commit did not really fix anything, but hid the problem by
brute force, sending many more schedule or update consumers messages.
  • Loading branch information
uce committed Nov 10, 2016
1 parent 58204da commit 0d2e8b2
Show file tree
Hide file tree
Showing 18 changed files with 35 additions and 321 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.io.network.ConnectionID;
Expand Down Expand Up @@ -88,7 +89,9 @@ public String toString() {
* Creates an input channel deployment descriptor for each partition.
*/
public static InputChannelDeploymentDescriptor[] fromEdges(
ExecutionEdge[] edges, SimpleSlot consumerSlot) {
ExecutionEdge[] edges,
SimpleSlot consumerSlot,
boolean allowLazyDeployment) throws ExecutionGraphException {

final ResourceID consumerTaskManager = consumerSlot.getTaskManagerID();
final InputChannelDeploymentDescriptor[] icdd = new InputChannelDeploymentDescriptor[edges.length];
Expand All @@ -105,9 +108,11 @@ public static InputChannelDeploymentDescriptor[] fromEdges(

// The producing task needs to be RUNNING or already FINISHED
if (consumedPartition.isConsumable() && producerSlot != null &&
(producerState == ExecutionState.RUNNING
|| producerState == ExecutionState.FINISHED)) {

(producerState == ExecutionState.RUNNING ||
producerState == ExecutionState.FINISHED ||
producerState == ExecutionState.SCHEDULED ||
producerState == ExecutionState.DEPLOYING)) {

final TaskManagerLocation partitionTaskManagerLocation = producerSlot.getTaskManagerLocation();
final ResourceID partitionTaskManager = partitionTaskManagerLocation.getResourceID();

Expand All @@ -124,9 +129,11 @@ public static InputChannelDeploymentDescriptor[] fromEdges(
partitionLocation = ResultPartitionLocation.createRemote(connectionId);
}
}
else {
else if (allowLazyDeployment) {
// The producing task might not have registered the partition yet
partitionLocation = ResultPartitionLocation.createUnknown();
} else {
throw new ExecutionGraphException("Trying to eagerly schedule a task whose inputs are not ready.");
}

final ResultPartitionID consumedPartitionId = new ResultPartitionID(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,28 +48,18 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
/** The number of subpartitions. */
private final int numberOfSubpartitions;

/**
* Flag indicating whether to eagerly deploy consumers.
*
* <p>If <code>true</code>, the consumers are deployed as soon as the
* runtime result is registered at the result manager of the task manager.
*/
private final boolean eagerlyDeployConsumers;

public ResultPartitionDeploymentDescriptor(
IntermediateDataSetID resultId,
IntermediateResultPartitionID partitionId,
ResultPartitionType partitionType,
int numberOfSubpartitions,
boolean eagerlyDeployConsumers) {
int numberOfSubpartitions) {

this.resultId = checkNotNull(resultId);
this.partitionId = checkNotNull(partitionId);
this.partitionType = checkNotNull(partitionType);

checkArgument(numberOfSubpartitions >= 1);
this.numberOfSubpartitions = numberOfSubpartitions;
this.eagerlyDeployConsumers = eagerlyDeployConsumers;
}

public IntermediateDataSetID getResultId() {
Expand All @@ -88,16 +78,6 @@ public int getNumberOfSubpartitions() {
return numberOfSubpartitions;
}

/**
* Returns whether consumers should be deployed eagerly (as soon as they
* are registered at the result manager of the task manager).
*
* @return Whether consumers should be deployed eagerly
*/
public boolean getEagerlyDeployConsumers() {
return eagerlyDeployConsumers;
}

@Override
public String toString() {
return String.format("ResultPartitionDeploymentDescriptor [result id: %s, "
Expand Down Expand Up @@ -129,7 +109,6 @@ public static ResultPartitionDeploymentDescriptor from(IntermediateResultPartiti
}

return new ResultPartitionDeploymentDescriptor(
resultId, partitionId, partitionType, numberOfSubpartitions,
partition.getIntermediateResult().getEagerlyDeployConsumers());
resultId, partitionId, partitionType, numberOfSubpartitions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.runtime.executiongraph;

import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
Expand All @@ -30,13 +32,11 @@
import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
Expand All @@ -45,7 +45,6 @@
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;

import scala.Option;

import java.io.IOException;
Expand Down Expand Up @@ -161,8 +160,7 @@ public ExecutionJobVertex(
result.getId(),
this,
numTaskVertices,
result.getResultType(),
result.getEagerlyDeployConsumers());
result.getResultType());
}

// create all task vertices
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,11 @@ public class IntermediateResult {

private final ResultPartitionType resultType;

private final boolean eagerlyDeployConsumers;

public IntermediateResult(
IntermediateDataSetID id,
ExecutionJobVertex producer,
int numParallelProducers,
ResultPartitionType resultType,
boolean eagerlyDeployConsumers) {
ResultPartitionType resultType) {

this.id = checkNotNull(id);
this.producer = checkNotNull(producer);
Expand All @@ -71,8 +68,6 @@ public IntermediateResult(

// The runtime type for this produced result
this.resultType = checkNotNull(resultType);

this.eagerlyDeployConsumers = eagerlyDeployConsumers;
}

public void setPartition(int partitionNumber, IntermediateResultPartition partition) {
Expand Down Expand Up @@ -108,10 +103,6 @@ public ResultPartitionType getResultType() {
return resultType;
}

public boolean getEagerlyDeployConsumers() {
return eagerlyDeployConsumers;
}

public int registerConsumer() {
final int index = numConsumers;
numConsumers++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.JobVertexID;
Expand Down Expand Up @@ -158,8 +157,6 @@ public void registerTask(Task task) throws IOException {
throw new IllegalStateException("Unequal number of writers and partitions.");
}

ResultPartitionConsumableNotifier jobManagerNotifier;

synchronized (lock) {
if (isShutdown) {
throw new IllegalStateException("NetworkEnvironment is shut down");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
Expand All @@ -28,7 +29,6 @@
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.slf4j.Logger;
Expand Down Expand Up @@ -89,14 +89,6 @@ public class ResultPartition implements BufferPoolOwner {
/** Type of this partition. Defines the concrete subpartition implementation to use. */
private final ResultPartitionType partitionType;

/**
* Flag indicating whether to eagerly deploy consumers.
*
* <p>If <code>true</code>, the consumers are deployed as soon as the
* runtime result is registered at the result manager of the task manager.
*/
private final boolean doEagerDeployment;

/** The subpartitions of this partition. At least one. */
private final ResultSubpartition[] subpartitions;

Expand Down Expand Up @@ -137,7 +129,6 @@ public ResultPartition(
JobID jobId,
ResultPartitionID partitionId,
ResultPartitionType partitionType,
boolean doEagerDeployment,
int numberOfSubpartitions,
ResultPartitionManager partitionManager,
ResultPartitionConsumableNotifier partitionConsumableNotifier,
Expand All @@ -149,7 +140,6 @@ public ResultPartition(
this.jobId = checkNotNull(jobId);
this.partitionId = checkNotNull(partitionId);
this.partitionType = checkNotNull(partitionType);
this.doEagerDeployment = doEagerDeployment;
this.subpartitions = new ResultSubpartition[numberOfSubpartitions];
this.partitionManager = checkNotNull(partitionManager);
this.partitionConsumableNotifier = checkNotNull(partitionConsumableNotifier);
Expand Down Expand Up @@ -365,15 +355,6 @@ public Throwable getFailureCause() {
return cause;
}

/**
* Deploys consumers if eager deployment is activated
*/
public void deployConsumers() {
if (doEagerDeployment) {
partitionConsumableNotifier.notifyPartitionConsumable(jobId, partitionId, taskActions);
}
}

/**
* Releases buffers held by this result partition.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ public void registerResultPartition(ResultPartition partition) throws IOExceptio
throw new IllegalStateException("Result partition already registered.");
}

partition.deployConsumers();

LOG.debug("Registered {}.", partition);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,6 @@ public class IntermediateDataSet implements java.io.Serializable {

// The type of partition to use at runtime
private final ResultPartitionType resultType;

/**
* Flag indicating whether to eagerly deploy consumers.
*
* <p>If <code>true</code>, the consumers are deployed as soon as the
* runtime result is registered at the result manager of the task manager.
*/
private boolean eagerlyDeployConsumers;

// --------------------------------------------------------------------------------------------

Expand Down Expand Up @@ -87,29 +79,6 @@ public List<JobEdge> getConsumers() {
public ResultPartitionType getResultType() {
return resultType;
}

/**
* Sets the flag indicating whether to eagerly deploy consumers (default:
* <code>false</code>).
*
* @param eagerlyDeployConsumers If <code>true</code>, the consumers are
* deployed as soon as the runtime result is
* registered at the result manager of the
* task manager. Default is <code>false</code>.
*/
public void setEagerlyDeployConsumers(boolean eagerlyDeployConsumers) {
this.eagerlyDeployConsumers = eagerlyDeployConsumers;
}

/**
* Returns whether consumers should be deployed eagerly (as soon as they
* are registered at the result manager of the task manager).
*
* @return Whether consumers should be deployed eagerly
*/
public boolean getEagerlyDeployConsumers() {
return eagerlyDeployConsumers;
}

// --------------------------------------------------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,25 +382,15 @@ public JobEdge connectDataSetAsInput(IntermediateDataSet dataSet, DistributionPa
}

public JobEdge connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern) {
return connectNewDataSetAsInput(input, distPattern, ResultPartitionType.PIPELINED, false);
return connectNewDataSetAsInput(input, distPattern, ResultPartitionType.PIPELINED);
}

public JobEdge connectNewDataSetAsInput(
JobVertex input,
DistributionPattern distPattern,
ResultPartitionType partitionType) {

return connectNewDataSetAsInput(input, distPattern, partitionType, false);
}

public JobEdge connectNewDataSetAsInput(
JobVertex input,
DistributionPattern distPattern,
ResultPartitionType partitionType,
boolean eagerlyDeployConsumers) {

IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
dataSet.setEagerlyDeployConsumers(eagerlyDeployConsumers);

JobEdge edge = new JobEdge(dataSet, this, distPattern);
this.inputs.add(edge);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,6 @@ public Task(
jobId,
partitionId,
desc.getPartitionType(),
desc.getEagerlyDeployConsumers(),
desc.getNumberOfSubpartitions(),
networkEnvironment.getResultPartitionManager(),
resultPartitionConsumableNotifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ public void testSerialization() throws Exception {
resultId,
partitionId,
partitionType,
numberOfSubpartitions,
eagerlyDeployConsumers);
numberOfSubpartitions);

ResultPartitionDeploymentDescriptor copy =
CommonTestUtils.createCopySerializable(orig);
Expand All @@ -55,6 +54,5 @@ public void testSerialization() throws Exception {
assertEquals(partitionId, copy.getPartitionId());
assertEquals(partitionType, copy.getPartitionType());
assertEquals(numberOfSubpartitions, copy.getNumberOfSubpartitions());
assertEquals(eagerlyDeployConsumers, copy.getEagerlyDeployConsumers());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ public void testNoResourceAvailableFailure() throws Exception {
v1.setInvokableClass(BatchTask.class);
v2.setInvokableClass(BatchTask.class);

v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING, false);
v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);

// execution graph that executes actions synchronously
ExecutionGraph eg = new ExecutionGraph(
Expand Down
Loading

0 comments on commit 0d2e8b2

Please sign in to comment.