Skip to content

Commit

Permalink
[FLINK-7385] Fix ArrayIndexOutOfBoundsException when object-reuse is …
Browse files Browse the repository at this point in the history
…enabled

This closes apache#4496.
  • Loading branch information
Xpray authored and tzulitai committed Aug 8, 2017
1 parent 4dfefd0 commit 6f5fa7f
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -612,8 +612,10 @@ public void collect(StreamRecord<T> record) {
output.collect(shallowCopy);
}

// don't copy for the last output
outputs[outputs.length - 1].collect(record);
if (outputs.length > 0) {
// don't copy for the last output
outputs[outputs.length - 1].collect(record);
}
}

@Override
Expand All @@ -625,8 +627,10 @@ public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
output.collect(outputTag, shallowCopy);
}

// don't copy for the last output
outputs[outputs.length - 1].collect(outputTag, record);
if (outputs.length > 0) {
// don't copy for the last output
outputs[outputs.length - 1].collect(outputTag, record);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.test.streaming.api;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
Expand All @@ -34,6 +35,8 @@
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;

import org.apache.flink.util.Collector;
import org.apache.flink.util.MathUtils;

import org.junit.Assert;
Expand Down Expand Up @@ -378,4 +381,18 @@ public static void clear() {
collections.clear();
}
}

@Test
public void testOperatorChainWithObjectReuseAndNoOutputOperators() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
DataStream<Integer> input = env.fromElements(1, 2, 3);
input.flatMap(new FlatMapFunction<Integer, Integer>() {
@Override
public void flatMap(Integer value, Collector<Integer> out) throws Exception {
out.collect(value << 1);
}
});
env.execute();
}
}

0 comments on commit 6f5fa7f

Please sign in to comment.