Skip to content

Commit

Permalink
[FLINK-27418][table-planner] Fix topN retraction for previously delet…
Browse files Browse the repository at this point in the history
…ed record

This closes apache#19778
  • Loading branch information
rovboyko authored Jun 8, 2022
1 parent 9184bdf commit 99f4511
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@

package org.apache.flink.table.runtime.operators.rank;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
Expand Down Expand Up @@ -80,6 +82,8 @@ public class RetractableTopNFunction extends AbstractTopNFunction {

private final ComparableRecordComparator serializableComparator;

private final TypeSerializer<RowData> inputRowSer;

public RetractableTopNFunction(
StateTtlConfig ttlConfig,
InternalTypeInfo<RowData> inputRowType,
Expand All @@ -102,6 +106,7 @@ public RetractableTopNFunction(
this.sortKeyType = sortKeySelector.getProducedType();
this.serializableComparator = comparableRecordComparator;
this.generatedEqualiser = generatedEqualiser;
this.inputRowSer = inputRowType.createSerializer(new ExecutionConfig());
}

@Override
Expand Down Expand Up @@ -320,7 +325,7 @@ private void emitRecordsWithoutRowNumber(
}
}
if (toDelete != null) {
collectDelete(out, toDelete);
collectDelete(out, inputRowSer.copy(toDelete));
}
if (toCollect != null) {
collectInsert(out, inputRow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.VarCharType;

import org.junit.Test;

Expand Down Expand Up @@ -509,4 +513,45 @@ public void testRetractRecordOutOfRankRangeWithRowNumber() throws Exception {
assertorWithRowNumber.assertOutputEquals(
"output wrong.", expectedOutput, testHarness.getOutput());
}

@Test
public void testRetractAndThenDeleteRecordWithoutRowNumber() throws Exception {
AbstractTopNFunction func =
new RetractableTopNFunction(
ttlConfig,
InternalTypeInfo.ofFields(
VarCharType.STRING_TYPE,
new BigIntType(),
new IntType(),
new IntType()),
comparableRecordComparator,
sortKeySelector,
RankType.ROW_NUMBER,
new ConstantRankRange(1, 1),
generatedEqualiser,
true,
false);

OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(func);
testHarness.open();
testHarness.processElement(insertRecord("a", 1L, 10, 0));
testHarness.processElement(insertRecord("a", 1L, 9, 0));
testHarness.processElement(deleteRecord("a", 1L, 10, 0));
testHarness.processElement(deleteRecord("a", 1L, 9, 0));
testHarness.processElement(insertRecord("a", 1L, 10, 1));
testHarness.processElement(insertRecord("a", 1L, 9, 1));
testHarness.close();

List<Object> expectedOutput = new ArrayList<>();
expectedOutput.add(insertRecord("a", 1L, 10, 0));
expectedOutput.add(deleteRecord("a", 1L, 10, 0));
expectedOutput.add(insertRecord("a", 1L, 9, 0));
expectedOutput.add(deleteRecord("a", 1L, 9, 0));
expectedOutput.add(insertRecord("a", 1L, 10, 1));
expectedOutput.add(deleteRecord("a", 1L, 10, 1));
expectedOutput.add(insertRecord("a", 1L, 9, 1));

assertorWithRowNumber.assertOutputEquals(
"output wrong.", expectedOutput, testHarness.getOutput());
}
}

0 comments on commit 99f4511

Please sign in to comment.