Skip to content

Commit

Permalink
[hotfix][network] Make InputGate#requestPartitions a private method
Browse files Browse the repository at this point in the history
  • Loading branch information
pnowojski committed Jul 9, 2019
1 parent c27d0d6 commit 8d277d4
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ public abstract class InputGate implements AsyncDataInput<BufferOrEvent>, AutoCl

public abstract boolean isFinished();

public abstract void requestPartitions() throws IOException, InterruptedException;

/**
* Blocking call waiting for next {@link BufferOrEvent}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,32 @@ public void setup() throws IOException, InterruptedException {
requestPartitions();
}

private void requestPartitions() throws IOException, InterruptedException {
synchronized (requestLock) {
if (!requestedPartitionsFlag) {
if (closeFuture.isDone()) {
throw new IllegalStateException("Already released.");
}

// Sanity checks
if (numberOfInputChannels != inputChannels.size()) {
throw new IllegalStateException(String.format(
"Bug in input gate setup logic: mismatch between " +
"number of total input channels [%s] and the currently set number of input " +
"channels [%s].",
inputChannels.size(),
numberOfInputChannels));
}

for (InputChannel inputChannel : inputChannels.values()) {
inputChannel.requestSubpartition(consumedSubpartitionIndex);
}
}

requestedPartitionsFlag = true;
}
}

// ------------------------------------------------------------------------
// Properties
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -436,30 +462,6 @@ public boolean isFinished() {
return hasReceivedAllEndOfPartitionEvents;
}

@Override
public void requestPartitions() throws IOException, InterruptedException {
synchronized (requestLock) {
if (!requestedPartitionsFlag) {
if (closeFuture.isDone()) {
throw new IllegalStateException("Already released.");
}

// Sanity checks
if (numberOfInputChannels != inputChannels.size()) {
throw new IllegalStateException("Bug in input gate setup logic: mismatch between " +
"number of total input channels and the currently set number of input " +
"channels.");
}

for (InputChannel inputChannel : inputChannels.values()) {
inputChannel.requestSubpartition(consumedSubpartitionIndex);
}
}

requestedPartitionsFlag = true;
}
}

// ------------------------------------------------------------------------
// Consume
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,6 @@ public class UnionInputGate extends InputGate {
*/
private final Map<InputGate, Integer> inputGateToIndexOffsetMap;

/** Flag indicating whether partitions have been requested. */
private boolean requestedPartitionsFlag;

public UnionInputGate(InputGate... inputGates) {
this.inputGates = checkNotNull(inputGates);
checkArgument(inputGates.length > 1, "Union input gate should union at least two input gates.");
Expand Down Expand Up @@ -141,17 +138,6 @@ public boolean isFinished() {
return inputGatesWithRemainingData.isEmpty();
}

@Override
public void requestPartitions() throws IOException, InterruptedException {
if (!requestedPartitionsFlag) {
for (InputGate inputGate : inputGates) {
inputGate.requestPartitions();
}

requestedPartitionsFlag = true;
}
}

@Override
public Optional<BufferOrEvent> getNext() throws IOException, InterruptedException {
return getNextBufferOrEvent(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,6 @@ public void setup() throws IOException, InterruptedException {
inputGate.setup();
}

@Override
public void requestPartitions() throws IOException, InterruptedException {
inputGate.requestPartitions();
}

@Override
public Optional<BufferOrEvent> getNext() throws IOException, InterruptedException {
return updateMetrics(inputGate.getNext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
Expand Down Expand Up @@ -192,51 +191,52 @@ public void testBackwardsEventWithUninitializedChannel() throws Exception {

// Setup reader with one local and one unknown input channel

final SingleInputGate inputGate = createInputGate();
final BufferPool bufferPool = mock(BufferPool.class);
when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);

inputGate.setBufferPool(bufferPool);

// Local
ResultPartitionID localPartitionId = new ResultPartitionID();
NettyShuffleEnvironment environment = createNettyShuffleEnvironment();
final SingleInputGate inputGate = createInputGate(environment, 2, ResultPartitionType.PIPELINED);
try {
// Local
ResultPartitionID localPartitionId = new ResultPartitionID();

InputChannelBuilder.newBuilder()
.setPartitionId(localPartitionId)
.setPartitionManager(partitionManager)
.setTaskEventPublisher(taskEventDispatcher)
.buildLocalAndSetToGate(inputGate);
InputChannelBuilder.newBuilder()
.setPartitionId(localPartitionId)
.setPartitionManager(partitionManager)
.setTaskEventPublisher(taskEventDispatcher)
.buildLocalAndSetToGate(inputGate);

// Unknown
ResultPartitionID unknownPartitionId = new ResultPartitionID();
// Unknown
ResultPartitionID unknownPartitionId = new ResultPartitionID();

InputChannelBuilder.newBuilder()
.setChannelIndex(1)
.setPartitionId(unknownPartitionId)
.setPartitionManager(partitionManager)
.setTaskEventPublisher(taskEventDispatcher)
.buildUnknownAndSetToGate(inputGate);
InputChannelBuilder.newBuilder()
.setChannelIndex(1)
.setPartitionId(unknownPartitionId)
.setPartitionManager(partitionManager)
.setTaskEventPublisher(taskEventDispatcher)
.buildUnknownAndSetToGate(inputGate);

// Request partitions
inputGate.requestPartitions();
inputGate.setup();

// Only the local channel can request
verify(partitionManager, times(1)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class));
// Only the local channel can request
verify(partitionManager, times(1)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class));

// Send event backwards and initialize unknown channel afterwards
final TaskEvent event = new TestTaskEvent();
inputGate.sendTaskEvent(event);
// Send event backwards and initialize unknown channel afterwards
final TaskEvent event = new TestTaskEvent();
inputGate.sendTaskEvent(event);

// Only the local channel can send out the event
verify(taskEventDispatcher, times(1)).publish(any(ResultPartitionID.class), any(TaskEvent.class));
// Only the local channel can send out the event
verify(taskEventDispatcher, times(1)).publish(any(ResultPartitionID.class), any(TaskEvent.class));

// After the update, the pending event should be send to local channel
// After the update, the pending event should be send to local channel

ResourceID location = ResourceID.generate();
inputGate.updateInputChannel(location, createRemoteWithIdAndLocation(unknownPartitionId.getPartitionId(), location));
ResourceID location = ResourceID.generate();
inputGate.updateInputChannel(location, createRemoteWithIdAndLocation(unknownPartitionId.getPartitionId(), location));

verify(partitionManager, times(2)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class));
verify(taskEventDispatcher, times(2)).publish(any(ResultPartitionID.class), any(TaskEvent.class));
verify(partitionManager, times(2)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class));
verify(taskEventDispatcher, times(2)).publish(any(ResultPartitionID.class), any(TaskEvent.class));
}
finally {
inputGate.close();
environment.close();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,6 @@ public boolean isFinished() {
return false;
}

@Override
public void requestPartitions() {}

@Override
public Optional<BufferOrEvent> getNext() throws IOException, InterruptedException {
currentChannel = (currentChannel + 1) % numberOfChannels;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,6 @@ public Optional<BufferOrEvent> pollNext() {
return getNext();
}

@Override
public void requestPartitions() {
}

@Override
public void sendTaskEvent(TaskEvent event) {
}
Expand Down

0 comments on commit 8d277d4

Please sign in to comment.