Skip to content

Commit

Permalink
[FLINK-22638] Keep channels blocked on alignment timeout
Browse files Browse the repository at this point in the history
This commit keeps channels blocked in case an alignment timeout occurs.
That way we prioritize the channels that we have not received the
barrier yet. This solution is based on the assumption that all upstream
operators are working with aligned checkpoints and we do not mind
delaying the subsequent checkpoints on the blocked channels.

This closes apache#15897
  • Loading branch information
dawidwys committed May 21, 2021
1 parent 1938f63 commit f041516
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ public final BarrierHandlerState barrierReceived(
state.blockChannel(channelInfo);
if (controller.allBarriersReceived()) {
controller.triggerGlobalCheckpoint(checkpointBarrier);
state.unblockAllChannels();
return new AlternatingWaitingForFirstBarrier(state.getInputs());
return finishCheckpoint();
} else if (controller.isTimedOut(checkpointBarrier)) {
return alignmentTimeout(controller, checkpointBarrier)
.barrierReceived(controller, channelInfo, checkpointBarrier.asUnaligned());
Expand All @@ -72,7 +71,11 @@ public final BarrierHandlerState barrierReceived(

@Override
public final BarrierHandlerState abort(long cancelledId) throws IOException {
return finishCheckpoint();
}

private BarrierHandlerState finishCheckpoint() throws IOException {
state.unblockAllChannels();
return new AlternatingWaitingForFirstBarrier(state.getInputs());
return new AlternatingWaitingForFirstBarrier(state.emptyState());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@

import java.io.IOException;

/**
* We are performing aligned checkpoints with time out. We have seen at least a single aligned
* barrier.
*/
final class AlternatingCollectingBarriers extends AbstractAlternatingAlignedBarrierHandlerState {

AlternatingCollectingBarriers(ChannelState context) {
Expand All @@ -35,14 +39,13 @@ public BarrierHandlerState alignmentTimeout(
Controller controller, CheckpointBarrier checkpointBarrier)
throws IOException, CheckpointException {
state.prioritizeAllAnnouncements();
state.unblockAllChannels();
CheckpointBarrier unalignedBarrier = checkpointBarrier.asUnaligned();
controller.initInputsCheckpoint(unalignedBarrier);
for (CheckpointableInput input : state.getInputs()) {
input.checkpointStarted(unalignedBarrier);
}
controller.triggerGlobalCheckpoint(unalignedBarrier);
return new CollectingBarriersUnaligned(true, state.getInputs());
return new AlternatingCollectingBarriersUnaligned(true, state);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@

import java.io.IOException;

final class CollectingBarriersUnaligned implements BarrierHandlerState {
/**
* We either timed out or started unaligned. We have seen at least one barrier and we are waiting
* for the remaining barriers.
*/
final class AlternatingCollectingBarriersUnaligned implements BarrierHandlerState {

private final boolean alternating;
private final CheckpointableInput[] inputs;
private final ChannelState channelState;

CollectingBarriersUnaligned(boolean alternating, CheckpointableInput[] inputs) {
AlternatingCollectingBarriersUnaligned(boolean alternating, ChannelState channelState) {
this.alternating = alternating;
this.inputs = inputs;
this.channelState = channelState;
}

@Override
Expand All @@ -46,7 +50,7 @@ public BarrierHandlerState alignmentTimeout(
public BarrierHandlerState announcementReceived(
Controller controller, InputChannelInfo channelInfo, int sequenceNumber)
throws IOException {
inputs[channelInfo.getGateIdx()].convertToPriorityEvent(
channelState.getInputs()[channelInfo.getGateIdx()].convertToPriorityEvent(
channelInfo.getInputChannelIdx(), sequenceNumber);
return this;
}
Expand All @@ -57,31 +61,32 @@ public BarrierHandlerState barrierReceived(
InputChannelInfo channelInfo,
CheckpointBarrier checkpointBarrier)
throws CheckpointException, IOException {
// we received an out of order aligned barrier, we should resume consumption for the
// channel, as it is being blocked by the credit-based network
// we received an out of order aligned barrier, we should book keep this channel as blocked,
// as it is being blocked by the credit-based network
if (!checkpointBarrier.getCheckpointOptions().isUnalignedCheckpoint()) {
inputs[channelInfo.getGateIdx()].resumeConsumption(channelInfo);
channelState.blockChannel(channelInfo);
}

if (controller.allBarriersReceived()) {
return stopCheckpoint(checkpointBarrier.getId());
return finishCheckpoint(checkpointBarrier.getId());
}
return this;
}

@Override
public BarrierHandlerState abort(long cancelledId) throws IOException {
return stopCheckpoint(cancelledId);
return finishCheckpoint(cancelledId);
}

private BarrierHandlerState stopCheckpoint(long cancelledId) {
for (CheckpointableInput input : inputs) {
private BarrierHandlerState finishCheckpoint(long cancelledId) throws IOException {
for (CheckpointableInput input : channelState.getInputs()) {
input.checkpointStopped(cancelledId);
}
channelState.unblockAllChannels();
if (alternating) {
return new AlternatingWaitingForFirstBarrier(inputs);
return new AlternatingWaitingForFirstBarrier(channelState.emptyState());
} else {
return new WaitingForFirstBarrierUnaligned(false, inputs);
return new AlternatingWaitingForFirstBarrierUnaligned(false, channelState.emptyState());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,22 @@

import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;

import java.io.IOException;

/** We are performing aligned checkpoints with time out. We have not seen any barriers yet. */
final class AlternatingWaitingForFirstBarrier
extends AbstractAlternatingAlignedBarrierHandlerState {

AlternatingWaitingForFirstBarrier(CheckpointableInput[] inputs) {
super(new ChannelState(inputs));
AlternatingWaitingForFirstBarrier(ChannelState state) {
super(state);
}

@Override
public BarrierHandlerState alignmentTimeout(
Controller controller, CheckpointBarrier checkpointBarrier)
throws IOException, CheckpointException {
state.prioritizeAllAnnouncements();
state.unblockAllChannels();
return new WaitingForFirstBarrierUnaligned(true, state.getInputs());
return new AlternatingWaitingForFirstBarrierUnaligned(true, state);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@

import java.io.IOException;

final class WaitingForFirstBarrierUnaligned implements BarrierHandlerState {
/**
* We either timed out before seeing any barriers or started unaligned. We might've seen some
* announcements if we started aligned.
*/
final class AlternatingWaitingForFirstBarrierUnaligned implements BarrierHandlerState {

private final boolean alternating;
private final CheckpointableInput[] inputs;
private final ChannelState channelState;

WaitingForFirstBarrierUnaligned(boolean alternating, CheckpointableInput[] inputs) {
AlternatingWaitingForFirstBarrierUnaligned(boolean alternating, ChannelState channelState) {
this.alternating = alternating;
this.inputs = inputs;
this.channelState = channelState;
}

@Override
Expand All @@ -46,7 +50,7 @@ public BarrierHandlerState alignmentTimeout(
public BarrierHandlerState announcementReceived(
Controller controller, InputChannelInfo channelInfo, int sequenceNumber)
throws IOException {
inputs[channelInfo.getGateIdx()].convertToPriorityEvent(
channelState.getInputs()[channelInfo.getGateIdx()].convertToPriorityEvent(
channelInfo.getInputChannelIdx(), sequenceNumber);
return this;
}
Expand All @@ -57,37 +61,39 @@ public BarrierHandlerState barrierReceived(
InputChannelInfo channelInfo,
CheckpointBarrier checkpointBarrier)
throws CheckpointException, IOException {
// we received an out of order aligned barrier, we should resume consumption for the
// channel, as it is being blocked by the credit-based network

// we received an out of order aligned barrier, we should book keep this channel as blocked,
// as it is being blocked by the credit-based network
if (!checkpointBarrier.getCheckpointOptions().isUnalignedCheckpoint()) {
inputs[channelInfo.getGateIdx()].resumeConsumption(channelInfo);
channelState.blockChannel(channelInfo);
}

CheckpointBarrier unalignedBarrier = checkpointBarrier.asUnaligned();
controller.initInputsCheckpoint(unalignedBarrier);
for (CheckpointableInput input : inputs) {
for (CheckpointableInput input : channelState.getInputs()) {
input.checkpointStarted(unalignedBarrier);
}
controller.triggerGlobalCheckpoint(unalignedBarrier);
if (controller.allBarriersReceived()) {
for (CheckpointableInput input : inputs) {
for (CheckpointableInput input : channelState.getInputs()) {
input.checkpointStopped(unalignedBarrier.getId());
}
if (alternating) {
return new AlternatingWaitingForFirstBarrier(inputs);
} else {
return this;
}
return stopCheckpoint();
}
return new CollectingBarriersUnaligned(alternating, inputs);
return new AlternatingCollectingBarriersUnaligned(alternating, channelState);
}

@Override
public BarrierHandlerState abort(long cancelledId) {
public BarrierHandlerState abort(long cancelledId) throws IOException {
return stopCheckpoint();
}

private BarrierHandlerState stopCheckpoint() throws IOException {
channelState.unblockAllChannels();
if (alternating) {
return new AlternatingWaitingForFirstBarrier(inputs);
return new AlternatingWaitingForFirstBarrier(channelState.emptyState());
} else {
return this;
return new AlternatingWaitingForFirstBarrierUnaligned(false, channelState.emptyState());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.Map;
import java.util.Set;

import static org.apache.flink.util.Preconditions.checkState;

/**
* A controller for keeping track of channels state in {@link AbstractAlignedBarrierHandlerState}
* and {@link AbstractAlternatingAlignedBarrierHandlerState}.
Expand Down Expand Up @@ -82,4 +84,13 @@ public void addSeenAnnouncement(InputChannelInfo channelInfo, int sequenceNumber
public void removeSeenAnnouncement(InputChannelInfo channelInfo) {
this.sequenceNumberInAnnouncedChannels.remove(channelInfo);
}

public ChannelState emptyState() {
checkState(
blockedChannels.isEmpty(),
"We should not reset to an empty state if there are blocked channels: %s",
blockedChannels);
sequenceNumberInAnnouncedChannels.clear();
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.runtime.io.checkpointing;

/** We are performing aligned checkpoints. We have seen at least a single aligned * barrier. */
final class CollectingBarriers extends AbstractAlignedBarrierHandlerState {

CollectingBarriers(ChannelState context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public static SingleCheckpointBarrierHandler unaligned(
checkpointCoordinator,
clock,
numOpenChannels,
new WaitingForFirstBarrierUnaligned(false, inputs),
new AlternatingWaitingForFirstBarrierUnaligned(false, new ChannelState(inputs)),
false,
registerTimer,
inputs);
Expand Down Expand Up @@ -162,7 +162,7 @@ public static SingleCheckpointBarrierHandler alternating(
checkpointCoordinator,
clock,
numOpenChannels,
new AlternatingWaitingForFirstBarrier(inputs),
new AlternatingWaitingForFirstBarrier(new ChannelState(inputs)),
true,
registerTimer,
inputs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;

/** We are performing aligned checkpoints. We have not seen any barriers yet. */
final class WaitingForFirstBarrier extends AbstractAlignedBarrierHandlerState {

WaitingForFirstBarrier(CheckpointableInput[] inputs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,36 +79,6 @@ public class AlternatingCheckpointsTest {

private final ClockWithDelayedActions clock = new ClockWithDelayedActions();

@Test
public void testChannelUnblockedAfterDifferentBarriers() throws Exception {
CheckpointedInputGate gate =
new TestCheckpointedInputGateBuilder(
3, getTestBarrierHandlerFactory(new ValidatingCheckpointHandler()))
.build();
long barrierId = 1L;
long ts = clock.relativeTimeNanos();
long timeout = 10;

send(barrier(barrierId, ts, unaligned(getDefault())), 0, gate);

TestInputChannel acChannel = (TestInputChannel) gate.getChannel(1);
acChannel.setBlocked(true);
send(
barrier(barrierId, ts, alignedWithTimeout(getDefault(), Integer.MAX_VALUE)),
acChannel.getChannelIndex(),
gate);
assertFalse(acChannel.isBlocked());

clock.advanceTime(timeout, TimeUnit.MILLISECONDS);
TestInputChannel acChannelWithTimeout = (TestInputChannel) gate.getChannel(2);
acChannelWithTimeout.setBlocked(true);
send(
barrier(barrierId, ts, alignedWithTimeout(getDefault(), timeout)),
acChannelWithTimeout.getChannelIndex(),
gate);
assertFalse(acChannelWithTimeout.isBlocked());
}

private TestBarrierHandlerFactory getTestBarrierHandlerFactory(
ValidatingCheckpointHandler target) {
return TestBarrierHandlerFactory.forTarget(target)
Expand Down Expand Up @@ -507,13 +477,20 @@ numberOfChannels, getTestBarrierHandlerFactory(target))

// we set timer on announcement and test channels do not produce announcements by themselves
send(EventSerializer.toBuffer(new EventAnnouncement(checkpointBarrier, 0), true), 0, gate);
send(checkpointBarrierBuffer, 0, gate);
// emulate blocking channels on aligned barriers
((TestInputChannel) gate.getChannel(0)).setBlocked(true);
send(checkpointBarrierBuffer, 0, gate);

clock.advanceTime(alignmentTimeout + 1, TimeUnit.MILLISECONDS);
send(EventSerializer.toBuffer(new EventAnnouncement(checkpointBarrier, 0), true), 1, gate);
// emulate blocking channels on aligned barriers
((TestInputChannel) gate.getChannel(1)).setBlocked(true);
send(checkpointBarrierBuffer, 1, gate);

assertThat(target.getTriggeredCheckpointOptions().size(), equalTo(1));
assertThat(target.getTriggeredCheckpointOptions(), contains(unaligned(getDefault())));
assertFalse(((TestInputChannel) gate.getChannel(0)).isBlocked());
assertFalse(((TestInputChannel) gate.getChannel(1)).isBlocked());
}

@Test
Expand Down

0 comments on commit f041516

Please sign in to comment.