Skip to content

Commit

Permalink
[FLINK-27031][runtime] Assign even empty old state to the task if the…
Browse files Browse the repository at this point in the history
… upstream has output states since the task should be prepared to filter these old incoming states from upstream
  • Loading branch information
akalash authored and rkhachatryan committed Jun 30, 2022
1 parent ac94fa1 commit d98a34b
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,12 @@ public void assignStates() {

// actually assign the state
for (TaskStateAssignment stateAssignment : vertexAssignments.values()) {
if (stateAssignment.hasNonFinishedState || stateAssignment.isFullyFinished) {
// If upstream has output states, even the empty task state should be assigned for the
// current task in order to notify this task that the old states will send to it which
// likely should be filtered.
if (stateAssignment.hasNonFinishedState
|| stateAssignment.isFullyFinished
|| stateAssignment.hasUpstreamOutputStates()) {
assignTaskStateToExecutionJobVertices(stateAssignment);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ public OperatorSubtaskState getSubtaskState(OperatorInstanceID instanceID) {
.build();
}

public boolean hasUpstreamOutputStates() {
return Arrays.stream(getUpstreamAssignments())
.anyMatch(assignment -> assignment.hasOutputState);
}

private InflightDataGateOrPartitionRescalingDescriptor log(
InflightDataGateOrPartitionRescalingDescriptor descriptor, int subtask, int partition) {
LOG.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

/** Tests to verify state assignment operation. */
public class StateAssignmentOperationTest extends TestLogger {
Expand Down Expand Up @@ -744,6 +745,46 @@ public void testChannelStateAssignmentUpscaling() throws JobException, JobExecut
.getInputRescalingDescriptor());
}

@Test
public void testOnlyUpstreamChannelStateAssignment()
throws JobException, JobExecutionException {
// given: There is only input channel state for one subpartition.
List<OperatorID> operatorIds = buildOperatorIds(2);
Map<OperatorID, OperatorState> states = new HashMap<>();
Random random = new Random();
OperatorState upstreamState = new OperatorState(operatorIds.get(0), 2, MAX_P);
OperatorSubtaskState state =
OperatorSubtaskState.builder()
.setResultSubpartitionState(
new StateObjectCollection<>(
asList(
createNewResultSubpartitionStateHandle(10, random),
createNewResultSubpartitionStateHandle(
10, random))))
.build();
upstreamState.putState(0, state);

states.put(operatorIds.get(0), upstreamState);

Map<OperatorID, ExecutionJobVertex> vertices =
buildVertices(operatorIds, 3, RANGE, ROUND_ROBIN);

// when: States are assigned.
new StateAssignmentOperation(0, new HashSet<>(vertices.values()), states, false)
.assignStates();

// then: All subtask have not null TaskRestore information(even if it is empty).
ExecutionJobVertex jobVertexWithFinishedOperator = vertices.get(operatorIds.get(0));
for (ExecutionVertex task : jobVertexWithFinishedOperator.getTaskVertices()) {
assertNotNull(task.getCurrentExecutionAttempt().getTaskRestore());
}

ExecutionJobVertex jobVertexWithoutFinishedOperator = vertices.get(operatorIds.get(1));
for (ExecutionVertex task : jobVertexWithoutFinishedOperator.getTaskVertices()) {
assertNotNull(task.getCurrentExecutionAttempt().getTaskRestore());
}
}

@Test
public void testStateWithFullyFinishedOperators() throws JobException, JobExecutionException {
List<OperatorID> operatorIds = buildOperatorIds(2);
Expand Down

0 comments on commit d98a34b

Please sign in to comment.