From 6e93394b4f2c22e5c50858242c17bcbd8fcf45c3 Mon Sep 17 00:00:00 2001 From: bvarghese1 Date: Fri, 26 Jan 2024 17:44:14 -0800 Subject: [PATCH] [FLINK-34248] Remove ChangelogNormalize Json Plan & IT tests --- .../stream/ChangelogSourceJsonPlanTest.java | 96 ---------- .../ChangelogSourceJsonPlanITCase.java | 113 ------------ .../testChangelogSource.out | 174 ------------------ .../testUpsertSource.out | 155 ---------------- 4 files changed, 538 deletions(-) delete mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest.java delete mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ChangelogSourceJsonPlanITCase.java delete mode 100644 flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testChangelogSource.out delete mode 100644 flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testUpsertSource.out diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest.java deleted file mode 100644 index 0e35bc11bca3e..0000000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.exec.stream; - -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.config.ExecutionConfigOptions; -import org.apache.flink.table.planner.utils.StreamTableTestUtil; -import org.apache.flink.table.planner.utils.TableTestBase; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -/** Test json serialization/deserialization for changelog source, including upsert source. */ -class ChangelogSourceJsonPlanTest extends TableTestBase { - - private StreamTableTestUtil util; - private TableEnvironment tEnv; - - @BeforeEach - void setup() { - util = streamTestUtil(TableConfig.getDefault()); - tEnv = util.getTableEnv(); - tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, true); - } - - @Test - void testChangelogSource() { - String srcTableDdl = - "CREATE TABLE MyTable (\n" - + " a bigint,\n" - + " b int not null,\n" - + " c varchar,\n" - + " d timestamp(3),\n" - + " PRIMARY KEY (a, b) NOT ENFORCED\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'changelog-mode' = 'I,UA,UB,D',\n" - + " 'bounded' = 'false')"; - tEnv.executeSql(srcTableDdl); - - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " a bigint,\n" - + " b int\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'sink-insert-only' = 'false',\n" - + " 'table-sink-class' = 'DEFAULT')"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan("insert into MySink select a, b from MyTable"); - } - - @Test - void testUpsertSource() { - String srcTableDdl = - "CREATE TABLE MyTable (\n" - + " a bigint,\n" - + " b int not null,\n" - + " c varchar,\n" - + " d timestamp(3),\n" - + " PRIMARY KEY (a, b) NOT ENFORCED\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'changelog-mode' = 'I,UA,D',\n" - + " 'bounded' = 'false')"; - tEnv.executeSql(srcTableDdl); - - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " a bigint,\n" - + " b int\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'sink-insert-only' = 'false',\n" - + " 'table-sink-class' = 'DEFAULT')"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan("insert into MySink select a, b from MyTable"); - } -} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ChangelogSourceJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ChangelogSourceJsonPlanITCase.java deleted file mode 100644 index a6b78346f7c42..0000000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ChangelogSourceJsonPlanITCase.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.runtime.stream.jsonplan; - -import org.apache.flink.table.planner.factories.TestValuesTableFactory; -import org.apache.flink.table.planner.runtime.utils.TestData; -import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; -import org.apache.flink.table.planner.utils.JsonPlanTestBase; - -import org.junit.jupiter.api.Test; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** Integration tests for operations on changelog source, including upsert source. */ -class ChangelogSourceJsonPlanITCase extends JsonPlanTestBase { - - @Test - void testChangelogSource() throws Exception { - registerChangelogSource(); - createTestNonInsertOnlyValuesSinkTable( - "user_sink", - "user_id STRING PRIMARY KEY NOT ENFORCED", - "user_name STRING", - "email STRING", - "balance DECIMAL(18,2)", - "balance2 DECIMAL(18,2)"); - - String dml = "INSERT INTO user_sink SELECT * FROM users"; - compileSqlAndExecutePlan(dml).await(); - - List expected = - Arrays.asList( - "+I[user1, Tom, tom123@gmail.com, 8.10, 16.20]", - "+I[user3, Bailey, bailey@qq.com, 9.99, 19.98]", - "+I[user4, Tina, tina@gmail.com, 11.30, 22.60]"); - assertResult(expected, TestValuesTableFactory.getResultsAsStrings("user_sink")); - } - - @Test - void testToUpsertSource() throws Exception { - registerUpsertSource(); - createTestNonInsertOnlyValuesSinkTable( - "user_sink", - "user_id STRING PRIMARY KEY NOT ENFORCED", - "user_name STRING", - "email STRING", - "balance DECIMAL(18,2)", - "balance2 DECIMAL(18,2)"); - - String dml = "INSERT INTO user_sink SELECT * FROM users"; - compileSqlAndExecutePlan(dml).await(); - - List expected = - Arrays.asList( - "+I[user1, Tom, tom123@gmail.com, 8.10, 16.20]", - "+I[user3, Bailey, bailey@qq.com, 9.99, 19.98]", - "+I[user4, Tina, tina@gmail.com, 11.30, 22.60]"); - assertResult(expected, TestValuesTableFactory.getResultsAsStrings("user_sink")); - } - - // ------------------------------------------------------------------------------------------ - - protected void registerChangelogSource() { - Map properties = new HashMap<>(); - properties.put("changelog-mode", "I,UA,UB,D"); - createTestValuesSourceTable( - "users", - JavaScalaConversionUtil.toJava(TestData.userChangelog()), - new String[] { - "user_id STRING", - "user_name STRING", - "email STRING", - "balance DECIMAL(18,2)", - "balance2 AS balance * 2" - }, - properties); - } - - protected void registerUpsertSource() { - Map properties = new HashMap<>(); - properties.put("changelog-mode", "I,UA,D"); - createTestValuesSourceTable( - "users", - JavaScalaConversionUtil.toJava(TestData.userUpsertlog()), - new String[] { - "user_id STRING PRIMARY KEY NOT ENFORCED", - "user_name STRING", - "email STRING", - "balance DECIMAL(18,2)", - "balance2 AS balance * 2" - }, - properties); - } -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testChangelogSource.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testChangelogSource.out deleted file mode 100644 index 50e4add8f4a7f..0000000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testChangelogSource.out +++ /dev/null @@ -1,174 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT NOT NULL" - }, { - "name" : "b", - "dataType" : "INT NOT NULL" - }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "d", - "dataType" : "TIMESTAMP(3)" - } ], - "watermarkSpecs" : [ ], - "primaryKey" : { - "name" : "PK_a_b", - "type" : "PRIMARY_KEY", - "columns" : [ "a", "b" ] - } - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "changelog-mode" : "I,UA,UB,D", - "connector" : "values" - } - } - }, - "abilities" : [ { - "type" : "ProjectPushDown", - "projectedFields" : [ [ 0 ], [ 1 ] ], - "producedType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL> NOT NULL" - }, { - "type" : "ReadingMetadata", - "metadataKeys" : [ ], - "producedType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL> NOT NULL" - } ] - }, - "outputType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b], metadata=[]]], fields=[a, b])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-drop-update-before_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL>", - "description" : "DropUpdateBefore" - }, { - "id" : 3, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0, 1 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL>", - "description" : "Exchange(distribution=[hash[a, b]])" - }, { - "id" : 4, - "type" : "stream-exec-changelog-normalize_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "false", - "table.exec.mini-batch.size" : "-1" - }, - "uniqueKeys" : [ 0, 1 ], - "generateUpdateBefore" : true, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "changelogNormalizeState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL>", - "description" : "ChangelogNormalize(key=[a, b])" - }, { - "id" : 5, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], - "inputUpsertKey" : [ 0, 1 ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 4, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 4, - "target" : 5, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testUpsertSource.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testUpsertSource.out deleted file mode 100644 index 1d7925cd82e3e..0000000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testUpsertSource.out +++ /dev/null @@ -1,155 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT NOT NULL" - }, { - "name" : "b", - "dataType" : "INT NOT NULL" - }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "d", - "dataType" : "TIMESTAMP(3)" - } ], - "watermarkSpecs" : [ ], - "primaryKey" : { - "name" : "PK_a_b", - "type" : "PRIMARY_KEY", - "columns" : [ "a", "b" ] - } - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "changelog-mode" : "I,UA,D", - "connector" : "values" - } - } - }, - "abilities" : [ { - "type" : "ProjectPushDown", - "projectedFields" : [ [ 0 ], [ 1 ] ], - "producedType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL> NOT NULL" - }, { - "type" : "ReadingMetadata", - "metadataKeys" : [ ], - "producedType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL> NOT NULL" - } ] - }, - "outputType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b], metadata=[]]], fields=[a, b])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0, 1 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL>", - "description" : "Exchange(distribution=[hash[a, b]])" - }, { - "id" : 3, - "type" : "stream-exec-changelog-normalize_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "false", - "table.exec.mini-batch.size" : "-1" - }, - "uniqueKeys" : [ 0, 1 ], - "generateUpdateBefore" : true, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "changelogNormalizeState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL>", - "description" : "ChangelogNormalize(key=[a, b])" - }, { - "id" : 4, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], - "inputUpsertKey" : [ 0, 1 ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 4, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -}