Skip to content

Commit

Permalink
[FLINK-24033][table-planner] Propagate unique keys for fromChangelogS…
Browse files Browse the repository at this point in the history
…tream

This closes apache#17048.
  • Loading branch information
twalthr committed Aug 30, 2021
1 parent 6f5995d commit 93c5357
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 39 deletions.
15 changes: 8 additions & 7 deletions docs/content.zh/docs/dev/table/data_stream_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1629,8 +1629,8 @@ tableEnv
// +----+--------------------------------+-------------+
// | +I | Bob | 5 |
// | +I | Alice | 12 |
// | -D | Alice | 12 |
// | +I | Alice | 100 |
// | -U | Alice | 12 |
// | +U | Alice | 100 |
// +----+--------------------------------+-------------+
```
{{< /tab >}}
Expand Down Expand Up @@ -1704,8 +1704,8 @@ tableEnv
// +----+--------------------------------+-------------+
// | +I | Bob | 5 |
// | +I | Alice | 12 |
// | -D | Alice | 12 |
// | +I | Alice | 100 |
// | -U | Alice | 12 |
// | +U | Alice | 100 |
// +----+--------------------------------+-------------+
```
{{< /tab >}}
Expand Down Expand Up @@ -1775,8 +1775,8 @@ t_env.execute_sql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY
# +----+--------------------------------+-------------+
# | +I | Bob | 5 |
# | +I | Alice | 12 |
# | -D | Alice | 12 |
# | +I | Alice | 100 |
# | -U | Alice | 12 |
# | +U | Alice | 100 |
# +----+--------------------------------+-------------+
```
{{< /tab >}}
Expand All @@ -1786,7 +1786,8 @@ The default `ChangelogMode` shown in example 1 should be sufficient for most use
all kinds of changes.

However, example 2 shows how to limit the kinds of incoming changes for efficiency by reducing the
number of update messages by 50%.
number of update messages by 50% using upsert mode. The number of result messages can be reduced by
defining a primary key and upsert changelog mode for `toChangelogStream`.

### Examples for `toChangelogStream`

Expand Down
15 changes: 8 additions & 7 deletions docs/content/docs/dev/table/data_stream_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1628,8 +1628,8 @@ tableEnv
// +----+--------------------------------+-------------+
// | +I | Bob | 5 |
// | +I | Alice | 12 |
// | -D | Alice | 12 |
// | +I | Alice | 100 |
// | -U | Alice | 12 |
// | +U | Alice | 100 |
// +----+--------------------------------+-------------+
```
{{< /tab >}}
Expand Down Expand Up @@ -1703,8 +1703,8 @@ tableEnv
// +----+--------------------------------+-------------+
// | +I | Bob | 5 |
// | +I | Alice | 12 |
// | -D | Alice | 12 |
// | +I | Alice | 100 |
// | -U | Alice | 12 |
// | +U | Alice | 100 |
// +----+--------------------------------+-------------+
```
{{< /tab >}}
Expand Down Expand Up @@ -1774,8 +1774,8 @@ t_env.execute_sql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY
# +----+--------------------------------+-------------+
# | +I | Bob | 5 |
# | +I | Alice | 12 |
# | -D | Alice | 12 |
# | +I | Alice | 100 |
# | -U | Alice | 12 |
# | +U | Alice | 100 |
# +----+--------------------------------+-------------+
```
{{< /tab >}}
Expand All @@ -1785,7 +1785,8 @@ The default `ChangelogMode` shown in example 1 should be sufficient for most use
all kinds of changes.

However, example 2 shows how to limit the kinds of incoming changes for efficiency by reducing the
number of update messages by 50%.
number of update messages by 50% using upsert mode. The number of result messages can be reduced by
defining a primary key and upsert changelog mode for `toChangelogStream`.

### Examples for `toChangelogStream`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
Expand All @@ -40,7 +38,6 @@
import org.apache.calcite.schema.Table;

import java.util.HashSet;
import java.util.Optional;
import java.util.Set;

import static java.lang.String.format;
Expand Down Expand Up @@ -92,7 +89,8 @@ private FlinkStatistic getStatistic(
return FlinkStatistic.builder()
.tableStats(extractTableStats(lookupResult, identifier))
// this is a temporary solution, FLINK-15123 will resolve this
.uniqueKeys(extractUniqueKeys(resolvedBaseTable.getResolvedSchema()))
.uniqueKeys(
resolvedBaseTable.getResolvedSchema().getPrimaryKey().orElse(null))
.build();
case VIEW:
default:
Expand Down Expand Up @@ -123,18 +121,6 @@ private TableStats extractTableStats(
}
}

private static Set<Set<String>> extractUniqueKeys(ResolvedSchema schema) {
Optional<UniqueConstraint> primaryKeyConstraint = schema.getPrimaryKey();
if (primaryKeyConstraint.isPresent()) {
Set<String> primaryKey = new HashSet<>(primaryKeyConstraint.get().getColumns());
Set<Set<String>> uniqueKeys = new HashSet<>();
uniqueKeys.add(primaryKey);
return uniqueKeys;
} else {
return null;
}
}

@Override
public Set<String> getTableNames() {
return catalogManager.listTables(catalogName, databaseName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,18 @@ public static RelNode convertDataStreamToRel(
final DynamicTableSource tableSource =
new ExternalDynamicSource<>(
identifier, dataStream, physicalDataType, isTopLevelRecord, changelogMode);
final FlinkStatistic statistic =
FlinkStatistic.builder()
// this is a temporary solution, FLINK-15123 will resolve this
.uniqueKeys(catalogTable.getResolvedSchema().getPrimaryKey().orElse(null))
.build();
return convertSourceToRel(
isBatchMode,
config,
relBuilder,
identifier,
catalogTable,
FlinkStatistic.UNKNOWN(),
statistic,
Collections.emptyList(),
tableSource);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@

package org.apache.flink.table.planner.plan.stats

import org.apache.flink.table.catalog.{ResolvedSchema, UniqueConstraint}
import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
import org.apache.flink.table.planner.plan.`trait`.{RelModifiedMonotonicity, RelWindowProperties}

import com.google.common.collect.ImmutableList
import org.apache.calcite.rel.{RelCollation, RelDistribution, RelReferentialConstraint}
import org.apache.calcite.schema.Statistic
import org.apache.calcite.util.ImmutableBitSet

import javax.annotation.Nullable

import java.util
import java.util.{HashSet, Optional, Set}

import scala.collection.JavaConversions._

Expand Down Expand Up @@ -172,11 +177,23 @@ object FlinkStatistic {
this
}

def uniqueKeys(uniqueKeys: util.Set[_ <: util.Set[String]]): Builder = {
def uniqueKeys(@Nullable uniqueKeys: util.Set[_ <: util.Set[String]]): Builder = {
this.uniqueKeys = uniqueKeys
this
}

def uniqueKeys(@Nullable uniqueConstraint: UniqueConstraint): Builder = {
val uniqueKeySet = if (uniqueConstraint == null) {
null
} else {
val uniqueKey = new util.HashSet[String](uniqueConstraint.getColumns)
val uniqueKeySet = new util.HashSet[util.Set[String]]
uniqueKeySet.add(uniqueKey)
uniqueKeySet
}
uniqueKeys(uniqueKeySet)
}

def relModifiedMonotonicity(monotonicity: RelModifiedMonotonicity): Builder = {
this.relModifiedMonotonicity = monotonicity
this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ public void testFromAndToChangelogStreamRetract() throws Exception {
}

@Test
public void testFromAndToChangelogStreamUpsert() throws Exception {
public void testFromChangelogStreamUpsert() {
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

final List<Either<Row, Row>> inputOrOutput =
Expand All @@ -435,21 +435,21 @@ public void testFromAndToChangelogStreamUpsert() throws Exception {
output(RowKind.INSERT, "bob", 0),
// --
input(RowKind.UPDATE_AFTER, "bob", 1),
output(RowKind.DELETE, "bob", 0),
output(RowKind.INSERT, "bob", 1),
output(RowKind.UPDATE_BEFORE, "bob", 0),
output(RowKind.UPDATE_AFTER, "bob", 1),
// --
input(RowKind.INSERT, "alice", 1),
output(RowKind.INSERT, "alice", 1),
// --
input(RowKind.INSERT, "alice", 1), // no impact
// --
input(RowKind.UPDATE_AFTER, "alice", 2),
output(RowKind.DELETE, "alice", 1),
output(RowKind.INSERT, "alice", 2),
output(RowKind.UPDATE_BEFORE, "alice", 1),
output(RowKind.UPDATE_AFTER, "alice", 2),
// --
input(RowKind.UPDATE_AFTER, "alice", 100),
output(RowKind.DELETE, "alice", 2),
output(RowKind.INSERT, "alice", 100));
output(RowKind.UPDATE_BEFORE, "alice", 2),
output(RowKind.UPDATE_AFTER, "alice", 100));

final DataStream<Row> changelogStream = env.fromElements(getInput(inputOrOutput));
tableEnv.createTemporaryView(
Expand All @@ -462,6 +462,40 @@ public void testFromAndToChangelogStreamUpsert() throws Exception {
final Table result = tableEnv.sqlQuery("SELECT f0, SUM(f1) FROM t GROUP BY f0");

testResult(result.execute(), getOutput(inputOrOutput));
}

@Test
public void testFromAndToChangelogStreamUpsert() throws Exception {
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

final List<Either<Row, Row>> inputOrOutput =
Arrays.asList(
input(RowKind.INSERT, "bob", 0),
output(RowKind.INSERT, "bob", 0),
// --
input(RowKind.UPDATE_AFTER, "bob", 1),
output(RowKind.UPDATE_AFTER, "bob", 1),
// --
input(RowKind.INSERT, "alice", 1),
output(RowKind.INSERT, "alice", 1),
// --
input(RowKind.INSERT, "alice", 1), // no impact
// --
input(RowKind.UPDATE_AFTER, "alice", 2),
output(RowKind.UPDATE_AFTER, "alice", 2),
// --
input(RowKind.UPDATE_AFTER, "alice", 100),
output(RowKind.UPDATE_AFTER, "alice", 100));

final DataStream<Row> changelogStream = env.fromElements(getInput(inputOrOutput));
tableEnv.createTemporaryView(
"t",
tableEnv.fromChangelogStream(
changelogStream,
Schema.newBuilder().primaryKey("f0").build(),
ChangelogMode.upsert()));

final Table result = tableEnv.sqlQuery("SELECT f0, SUM(f1) FROM t GROUP BY f0");

testResult(
tableEnv.toChangelogStream(
Expand Down

0 comments on commit 93c5357

Please sign in to comment.