Skip to content

Commit

Permalink
[FLINK-5632] [streaming api] Fix typo in StreamGraph variable name
Browse files Browse the repository at this point in the history
This closes apache#3203
  • Loading branch information
tony810430 authored and StephanEwen committed Jan 25, 2017
1 parent 54d3e26 commit 6f5c7d8
Showing 1 changed file with 11 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class StreamGraph extends StreamingPlan {
private Set<Integer> sources;
private Set<Integer> sinks;
private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes;
private Map<Integer, Tuple2<Integer, StreamPartitioner<?>>> virtuaPartitionNodes;
private Map<Integer, Tuple2<Integer, StreamPartitioner<?>>> virtualPartitionNodes;

protected Map<Integer, String> vertexIDtoBrokerID;
protected Map<Integer, Long> vertexIDtoLoopTimeout;
Expand All @@ -107,7 +107,7 @@ public StreamGraph(StreamExecutionEnvironment environment) {
public void clear() {
streamNodes = new HashMap<>();
virtualSelectNodes = new HashMap<>();
virtuaPartitionNodes = new HashMap<>();
virtualPartitionNodes = new HashMap<>();
vertexIDtoBrokerID = new HashMap<>();
vertexIDtoLoopTimeout = new HashMap<>();
iterationSourceSinkPairs = new HashSet<>();
Expand Down Expand Up @@ -303,11 +303,11 @@ public void addVirtualSelectNode(Integer originalId, Integer virtualId, List<Str
*/
public void addVirtualPartitionNode(Integer originalId, Integer virtualId, StreamPartitioner<?> partitioner) {

if (virtuaPartitionNodes.containsKey(virtualId)) {
if (virtualPartitionNodes.containsKey(virtualId)) {
throw new IllegalStateException("Already has virtual partition node with id " + virtualId);
}

virtuaPartitionNodes.put(virtualId,
virtualPartitionNodes.put(virtualId,
new Tuple2<Integer, StreamPartitioner<?>>(originalId, partitioner));
}

Expand All @@ -318,8 +318,8 @@ public String getSlotSharingGroup(Integer id) {
if (virtualSelectNodes.containsKey(id)) {
Integer mappedId = virtualSelectNodes.get(id).f0;
return getSlotSharingGroup(mappedId);
} else if (virtuaPartitionNodes.containsKey(id)) {
Integer mappedId = virtuaPartitionNodes.get(id).f0;
} else if (virtualPartitionNodes.containsKey(id)) {
Integer mappedId = virtualPartitionNodes.get(id).f0;
return getSlotSharingGroup(mappedId);
} else {
StreamNode node = getStreamNode(id);
Expand Down Expand Up @@ -351,11 +351,11 @@ private void addEdgeInternal(Integer upStreamVertexID,
outputNames = virtualSelectNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
} else if (virtuaPartitionNodes.containsKey(upStreamVertexID)) {
} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtuaPartitionNodes.get(virtualId).f0;
upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
if (partitioner == null) {
partitioner = virtuaPartitionNodes.get(virtualId).f1;
partitioner = virtualPartitionNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
} else {
Expand Down Expand Up @@ -387,8 +387,8 @@ private void addEdgeInternal(Integer upStreamVertexID,
}

public <T> void addOutputSelector(Integer vertexID, OutputSelector<T> outputSelector) {
if (virtuaPartitionNodes.containsKey(vertexID)) {
addOutputSelector(virtuaPartitionNodes.get(vertexID).f0, outputSelector);
if (virtualPartitionNodes.containsKey(vertexID)) {
addOutputSelector(virtualPartitionNodes.get(vertexID).f0, outputSelector);
} else if (virtualSelectNodes.containsKey(vertexID)) {
addOutputSelector(virtualSelectNodes.get(vertexID).f0, outputSelector);
} else {
Expand Down

0 comments on commit 6f5c7d8

Please sign in to comment.