Skip to content

Commit

Permalink
[FLINK-3064] [core] Add size check in GroupReduceOperatorBase
Browse files Browse the repository at this point in the history
This closes apache#1396.
  • Loading branch information
s1ck authored and tillrohrmann committed Nov 24, 2015
1 parent 66b5def commit d2b4391
Showing 1 changed file with 20 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,28 +195,30 @@ public int compare(IN o1, IN o2) {

ArrayList<OUT> result = new ArrayList<OUT>();

if (keyColumns.length == 0) {
final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
List<IN> inputDataCopy = new ArrayList<IN>(inputData.size());
for (IN in: inputData) {
inputDataCopy.add(inputSerializer.copy(in));
}
CopyingListCollector<OUT> collector = new CopyingListCollector<OUT>(result, outSerializer);
if (inputData.size() > 0) {
if (keyColumns.length == 0) {
final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
List<IN> inputDataCopy = new ArrayList<IN>(inputData.size());
for (IN in : inputData) {
inputDataCopy.add(inputSerializer.copy(in));
}
CopyingListCollector<OUT> collector = new CopyingListCollector<OUT>(result, outSerializer);

function.reduce(inputDataCopy, collector);
} else {
final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
boolean[] keyOrderings = new boolean[keyColumns.length];
final TypeComparator<IN> comparator = getTypeComparator(inputType, keyColumns, keyOrderings, executionConfig);
function.reduce(inputDataCopy, collector);
} else {
final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
boolean[] keyOrderings = new boolean[keyColumns.length];
final TypeComparator<IN> comparator = getTypeComparator(inputType, keyColumns, keyOrderings, executionConfig);

ListKeyGroupedIterator<IN> keyedIterator = new ListKeyGroupedIterator<IN>(inputData, inputSerializer, comparator);
ListKeyGroupedIterator<IN> keyedIterator = new ListKeyGroupedIterator<IN>(inputData, inputSerializer, comparator);

TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
CopyingListCollector<OUT> collector = new CopyingListCollector<OUT>(result, outSerializer);
TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
CopyingListCollector<OUT> collector = new CopyingListCollector<OUT>(result, outSerializer);

while (keyedIterator.nextKey()) {
function.reduce(keyedIterator.getValues(), collector);
while (keyedIterator.nextKey()) {
function.reduce(keyedIterator.getValues(), collector);
}
}
}

Expand Down

0 comments on commit d2b4391

Please sign in to comment.