Skip to content

Commit

Permalink
[FLINK-23531][table] Supports compact-changes in row-time deduplicate…
Browse files Browse the repository at this point in the history
… mini-batch

This closes apache#16630
  • Loading branch information
Tartarus0zm authored Aug 4, 2021
1 parent 900af3e commit 4d8157f
Show file tree
Hide file tree
Showing 6 changed files with 600 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
import org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
import org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepFirstRowFunction;
import org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction;
import org.apache.flink.table.runtime.operators.deduplicate.ProcTimeMiniBatchDeduplicateKeepFirstRowFunction;
import org.apache.flink.table.runtime.operators.deduplicate.ProcTimeMiniBatchDeduplicateKeepLastRowFunction;
import org.apache.flink.table.runtime.operators.deduplicate.RowTimeDeduplicateFunction;
import org.apache.flink.table.runtime.operators.deduplicate.RowTimeMiniBatchDeduplicateFunction;
import org.apache.flink.table.runtime.operators.deduplicate.RowTimeMiniBatchLatestChangeDeduplicateFunction;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -89,6 +91,17 @@ public class StreamExecDeduplicate extends ExecNodeBase<RowData>
+ "but there will be additional overhead."
+ "Default is true.");

@Experimental
public static final ConfigOption<Boolean> TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES =
ConfigOptions.key("table.exec.deduplicate.mini-batch.compact-changes")
.booleanType()
.defaultValue(false)
.withDescription(
"Set whether to compact the changes sent downstream in row-time mini-batch. "
+ "If true, Flink will compact changes, only send the latest change to downstream. "
+ "Notes: If the downstream needs the details of versioned data, this optimization cannot be opened. "
+ "If false, Flink will send all changes to downstream just like when the mini-batch is not on.");

@JsonProperty(FIELD_NAME_UNIQUE_KEYS)
private final int[] uniqueKeys;

Expand Down Expand Up @@ -223,6 +236,12 @@ protected boolean isMiniBatchEnabled() {
.getBoolean(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED);
}

protected boolean isCompactChanges() {
return tableConfig
.getConfiguration()
.getBoolean(TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES);
}

protected long getMinRetentionTime() {
return tableConfig.getMinIdleStateRetentionTime();
}
Expand Down Expand Up @@ -275,15 +294,28 @@ OneInputStreamOperator<RowData, RowData> createDeduplicateOperator() {
checkArgument(rowtimeIndex >= 0);
if (isMiniBatchEnabled()) {
CountBundleTrigger<RowData> trigger = new CountBundleTrigger<>(getMiniBatchSize());
RowTimeMiniBatchDeduplicateFunction processFunction =
new RowTimeMiniBatchDeduplicateFunction(
rowTypeInfo,
typeSerializer,
getMinRetentionTime(),
rowtimeIndex,
generateUpdateBefore,
generateInsert(),
keepLastRow);
MapBundleFunction processFunction;
if (isCompactChanges()) {
processFunction =
new RowTimeMiniBatchLatestChangeDeduplicateFunction(
rowTypeInfo,
typeSerializer,
getMinRetentionTime(),
rowtimeIndex,
generateUpdateBefore,
generateInsert(),
keepLastRow);
} else {
processFunction =
new RowTimeMiniBatchDeduplicateFunction(
rowTypeInfo,
typeSerializer,
getMinRetentionTime(),
rowtimeIndex,
generateUpdateBefore,
generateInsert(),
keepLastRow);
}
return new KeyedMapBundleOperator<>(processFunction, trigger);
} else {
RowTimeDeduplicateFunction processFunction =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExt
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.MiniBatchMode
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeduplicate
import org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.{MiniBatchMode, MiniBatchOn}
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
import org.apache.flink.table.planner.runtime.utils._
import org.apache.flink.table.utils.LegacyRowResource
import org.apache.flink.types.Row

import org.junit.Assert._
import org.junit.{Rule, Test}
import org.junit.{Assume, Rule, Test}
import org.junit.runner.RunWith
import org.junit.runners.Parameterized

Expand Down Expand Up @@ -189,6 +189,40 @@ class DeduplicateITCase(miniBatch: MiniBatchMode, mode: StateBackendMode)
assertEquals(expected.sorted, rawResult.sorted)
}

@Test
def testFirstRowWithoutAllChangelogOnRowtime(): Unit = {
Assume.assumeTrue("Without all change log only for minibatch.", miniBatch == MiniBatchOn)
tEnv.getConfig.getConfiguration.setBoolean(
StreamExecDeduplicate.TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES, true)
val t = env.fromCollection(rowtimeTestData)
.assignTimestampsAndWatermarks(new RowtimeExtractor)
.toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime())
tEnv.registerTable("T", t)
createSinkTable("rowtime_sink")

val sql =
"""
|INSERT INTO rowtime_sink
| SELECT a, b, c, rowtime
| FROM (
| SELECT *,
| ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime) as rowNum
| FROM T
| )
| WHERE rowNum = 1
""".stripMargin

tEnv.executeSql(sql).await()
val rawResult = TestValuesTableFactory.getRawResults("rowtime_sink")

val expected = List(
"+I(1,1,Hi,1970-01-01T00:00:00.001)",
"+I(2,3,I am fine.,1970-01-01T00:00:00.003)",
"+I(3,4,Comment#2,1970-01-01T00:00:00.004)",
"+I(4,4,Comment#3,1970-01-01T00:00:00.004)")
assertEquals(expected.sorted, rawResult.sorted)
}

@Test
def testFirstRowOnRowTimeFollowedByUnboundedAgg(): Unit = {
val t = env.fromCollection(rowtimeTestData)
Expand Down Expand Up @@ -262,6 +296,42 @@ class DeduplicateITCase(miniBatch: MiniBatchMode, mode: StateBackendMode)
assertEquals(expected.sorted, rawResult.sorted)
}

@Test
def testLastRowWithoutAllChangelogOnRowtime(): Unit = {
Assume.assumeTrue("Without all change log only for minibatch.", miniBatch == MiniBatchOn)
tEnv.getConfig.getConfiguration.setBoolean(
StreamExecDeduplicate.TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES, true)
val t = env.fromCollection(rowtimeTestData)
.assignTimestampsAndWatermarks(new RowtimeExtractor)
.toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime())
tEnv.registerTable("T", t)
createSinkTable("rowtime_sink")

val sql =
"""
|INSERT INTO rowtime_sink
| SELECT a, b, c, rowtime
| FROM (
| SELECT *,
| ROW_NUMBER() OVER (PARTITION BY b ORDER BY rowtime DESC) as rowNum
| FROM T
| )
| WHERE rowNum = 1
""".stripMargin

tEnv.executeSql(sql).await()
val rawResult = TestValuesTableFactory.getRawResults("rowtime_sink")

val expected = List(
"+I(1,1,Hi,1970-01-01T00:00:00.001)",
"+I(1,2,Hello world,1970-01-01T00:00:00.002)",
"+I(2,3,I am fine.,1970-01-01T00:00:00.003)",
"+I(2,6,Comment#1,1970-01-01T00:00:00.006)",
"+I(3,5,Comment#2,1970-01-01T00:00:00.005)",
"+I(4,4,Comment#3,1970-01-01T00:00:00.004)")
assertEquals(expected.sorted, rawResult.sorted)
}

@Test
def testLastRowOnRowTimeFollowedByUnboundedAgg(): Unit = {
val t = env.fromCollection(rowtimeTestData)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.operators.deduplicate;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;

import java.util.Map;

import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.checkInsertOnly;
import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.updateDeduplicateResult;

/**
* This function is used to get the first or last row for every key partition in miniBatch mode. But
* only send latest change log to downstream.
*/
public class RowTimeMiniBatchLatestChangeDeduplicateFunction
extends MiniBatchDeduplicateFunctionBase<RowData, RowData, RowData, RowData, RowData> {

private static final long serialVersionUID = 1L;

private final TypeSerializer<RowData> serializer;
private final boolean generateUpdateBefore;
private final boolean generateInsert;
private final int rowtimeIndex;
private final boolean keepLastRow;

public RowTimeMiniBatchLatestChangeDeduplicateFunction(
InternalTypeInfo<RowData> typeInfo,
TypeSerializer<RowData> serializer,
long minRetentionTime,
int rowtimeIndex,
boolean generateUpdateBefore,
boolean generateInsert,
boolean keepLastRow) {
super(typeInfo, minRetentionTime);
this.serializer = serializer;
this.generateUpdateBefore = generateUpdateBefore;
this.generateInsert = generateInsert;
this.rowtimeIndex = rowtimeIndex;
this.keepLastRow = keepLastRow;
}

@Override
public RowData addInput(@Nullable RowData value, RowData input) throws Exception {
if (isDuplicate(value, input, rowtimeIndex, keepLastRow)) {
return serializer.copy(input);
}
return value;
}

@Override
public void finishBundle(Map<RowData, RowData> buffer, Collector<RowData> out)
throws Exception {
for (Map.Entry<RowData, RowData> entry : buffer.entrySet()) {
RowData currentKey = entry.getKey();
RowData bufferedRow = entry.getValue();
ctx.setCurrentKey(currentKey);
RowData preRow = state.value();
checkInsertOnly(bufferedRow);
if (isDuplicate(preRow, bufferedRow, rowtimeIndex, keepLastRow)) {
updateDeduplicateResult(
generateUpdateBefore, generateInsert, preRow, bufferedRow, out);
state.update(bufferedRow);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,13 @@

package org.apache.flink.table.runtime.operators.deduplicate;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
import org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.apache.flink.types.RowKind;

import org.junit.Test;
Expand All @@ -55,24 +44,7 @@
* RowTimeMiniBatchDeduplicateFunction}.
*/
@RunWith(Parameterized.class)
public class RowTimeDeduplicateFunctionTest {

private final long miniBatchSize = 4L;
private Time minTtlTime = Time.milliseconds(10);
private InternalTypeInfo inputRowType =
InternalTypeInfo.ofFields(
new VarCharType(VarCharType.MAX_LENGTH), new IntType(), new BigIntType());
private TypeSerializer<RowData> serializer = inputRowType.toSerializer();
private int rowTimeIndex = 2;
private int rowKeyIndex = 0;
private RowDataKeySelector rowKeySelector =
HandwrittenSelectorUtil.getRowDataSelector(
new int[] {rowKeyIndex}, inputRowType.toRowFieldTypes());
private RowDataHarnessAssertor assertor =
new RowDataHarnessAssertor(
inputRowType.toRowFieldTypes(),
new GenericRowRecordSortComparator(
rowKeyIndex, inputRowType.toRowFieldTypes()[rowKeyIndex]));
public class RowTimeDeduplicateFunctionTest extends RowTimeDeduplicateFunctionTestBase {

private final boolean miniBatchEnable;

Expand Down Expand Up @@ -328,18 +300,6 @@ private void testRowTimeDeduplicateKeepLastRow(
testHarness.close();
}

private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(
KeyedProcessOperator<RowData, RowData, RowData> operator) throws Exception {
return new KeyedOneInputStreamOperatorTestHarness<>(
operator, rowKeySelector, rowKeySelector.getProducedType());
}

private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(
KeyedMapBundleOperator<RowData, RowData, RowData, RowData> operator) throws Exception {
return new KeyedOneInputStreamOperatorTestHarness<>(
operator, rowKeySelector, rowKeySelector.getProducedType());
}

@Parameterized.Parameters(name = "miniBatchEnable = {0}")
public static Collection<Boolean[]> runMode() {
return Arrays.asList(new Boolean[] {false}, new Boolean[] {true});
Expand Down
Loading

0 comments on commit 4d8157f

Please sign in to comment.