From dd7472d19ee63dda7c71a9b6cbef75224f4f102d Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 7 Mar 2022 15:10:52 +0100 Subject: [PATCH] [FLINK-26551][table-planner] Change legacy casting config option to DISABLED by default This closes #19020. --- docs/content/docs/dev/table/types.md | 4 +-- .../execution_config_configuration.html | 2 +- .../connectors/hive/HiveRunnerITCase.java | 6 ++--- .../jdbc/catalog/PostgresCatalogITCase.java | 2 +- .../kafka/table/KafkaTableITCase.java | 25 ++++++++----------- .../AdvancedFunctionsExampleITCase.java | 18 ++++++------- .../api/config/ExecutionConfigOptions.java | 2 +- .../jsonplan/CorrelateJsonPlanITCase.java | 4 ++- .../runtime/batch/sql/CalcITCase.scala | 4 +-- 9 files changed, 33 insertions(+), 34 deletions(-) diff --git a/docs/content/docs/dev/table/types.md b/docs/content/docs/dev/table/types.md index 27d4c2860d3f5..9d3fb63c9e65d 100644 --- a/docs/content/docs/dev/table/types.md +++ b/docs/content/docs/dev/table/types.md @@ -1553,7 +1553,7 @@ regardless of whether the function used is `CAST` or `TRY_CAST`. ### Legacy casting Pre Flink 1.15 casting behaviour can be enabled by setting `table.exec.legacy-cast-behaviour` to `enabled`. -In Flink 1.15 this flag is enabled by default. +In Flink 1.15 this flag is disabled by default. In particular, this will: @@ -1562,7 +1562,7 @@ In particular, this will: * Formatting of some casting to `CHAR`/`VARCHAR`/`STRING` produces slightly different results. {{< hint warning >}} -We **discourage** the use of this flag and we **strongly suggest** for new projects to disable this flag and use the new casting behaviour. +We **discourage** the use of this flag and we **strongly suggest** for new projects to keep this flag disabled and use the new casting behaviour. This flag will be removed in the next Flink versions. {{< /hint >}} diff --git a/docs/layouts/shortcodes/generated/execution_config_configuration.html b/docs/layouts/shortcodes/generated/execution_config_configuration.html index 74cf7f2e3daa5..364d9bf9d814c 100644 --- a/docs/layouts/shortcodes/generated/execution_config_configuration.html +++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html @@ -42,7 +42,7 @@
table.exec.legacy-cast-behaviour

Batch Streaming - ENABLED + DISABLED

Enum

Determines whether CAST will operate following the legacy behaviour or the new one that introduces various fixes and improvements.

Possible values: diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerITCase.java index fbf49e139cd17..2b2d79327f5e1 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerITCase.java @@ -256,9 +256,9 @@ public void testDecimal() throws Exception { TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); tableEnv.executeSql("create database db1"); try { - tableEnv.executeSql("create table db1.src1 (x decimal(10,2))"); - tableEnv.executeSql("create table db1.src2 (x decimal(10,2))"); - tableEnv.executeSql("create table db1.dest (x decimal(10,2))"); + tableEnv.executeSql("create table db1.src1 (x decimal(12,2))"); + tableEnv.executeSql("create table db1.src2 (x decimal(12,2))"); + tableEnv.executeSql("create table db1.dest (x decimal(12,2))"); // populate src1 from Hive // TABLE keyword in INSERT INTO is mandatory prior to 1.1.0 hiveShell.execute( diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java index e6ec9cc2f5e7d..936230b0274e9 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java @@ -114,7 +114,7 @@ public void testGroupByInsert() throws Exception { tEnv.executeSql( String.format( "insert into `%s` " - + "select `int`, cast('A' as bytes), `short`, max(`long`), max(`real`), " + + "select `int`, cast('41' as bytes), `short`, max(`long`), max(`real`), " + "max(`double_precision`), max(`numeric`), max(`decimal`), max(`boolean`), " + "max(`text`), 'B', 'C', max(`character_varying`), max(`timestamp`), " + "max(`date`), max(`time`), max(`default_numeric`) " diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java index a6dab6da98843..c26845315e669 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.utils.EncodingUtils; import org.apache.flink.test.util.SuccessException; import org.apache.flink.types.Row; @@ -44,7 +45,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; @@ -58,6 +58,8 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.readLines; import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT; import static org.apache.flink.table.utils.TableTestMatchers.deepEqualTo; +import static org.apache.flink.util.CollectionUtil.entry; +import static org.apache.flink.util.CollectionUtil.map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -299,29 +301,20 @@ public void testKafkaSourceSinkWithMetadata() throws Exception { + " %s\n" + ")", topic, bootstraps, groupId, formatOptions()); - tEnv.executeSql(createTable); String initialValues = "INSERT INTO kafka\n" + "VALUES\n" - + " ('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k1', X'C0FFEE', 'k2', X'BABE'], TRUE),\n" + + " ('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k1', X'C0FFEE', 'k2', X'BABE01'], TRUE),\n" + " ('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', CAST(NULL AS MAP), FALSE),\n" - + " ('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k1', X'10', 'k2', X'20'], TRUE)"; + + " ('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k1', X'102030', 'k2', X'203040'], TRUE)"; tEnv.executeSql(initialValues).await(); // ---------- Consume stream from Kafka ------------------- final List result = collectRows(tEnv.sqlQuery("SELECT * FROM kafka"), 3); - final Map headers1 = new HashMap<>(); - headers1.put("k1", new byte[] {(byte) 0xC0, (byte) 0xFF, (byte) 0xEE}); - headers1.put("k2", new byte[] {(byte) 0xBA, (byte) 0xBE}); - - final Map headers3 = new HashMap<>(); - headers3.put("k1", new byte[] {(byte) 0x10}); - headers3.put("k2", new byte[] {(byte) 0x20}); - final List expected = Arrays.asList( Row.of( @@ -330,7 +323,9 @@ public void testKafkaSourceSinkWithMetadata() throws Exception { "CreateTime", LocalDateTime.parse("2020-03-08T13:12:11.123"), 0, - headers1, + map( + entry("k1", EncodingUtils.decodeHex("C0FFEE")), + entry("k2", EncodingUtils.decodeHex("BABE01"))), 0, topic, true), @@ -350,7 +345,9 @@ public void testKafkaSourceSinkWithMetadata() throws Exception { "CreateTime", LocalDateTime.parse("2020-03-10T13:12:11.123"), 0, - headers3, + map( + entry("k1", EncodingUtils.decodeHex("102030")), + entry("k2", EncodingUtils.decodeHex("203040"))), 0, topic, true)); diff --git a/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java b/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java index 4bc98c2e11b20..3306911da9ead 100644 --- a/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java +++ b/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java @@ -41,41 +41,41 @@ private void testExecuteLastDatedValueFunction(String consoleOutput) { assertThat( consoleOutput, containsString( - "| Guillermo Smith | (5,2020-12-05) |")); + "| Guillermo Smith | (5, 2020-12-05) |")); assertThat( consoleOutput, containsString( - "| John Turner | (12,2020-10-02) |")); + "| John Turner | (12, 2020-10-02) |")); assertThat( consoleOutput, containsString( - "| Brandy Sanders | (1,2020-10-14) |")); + "| Brandy Sanders | (1, 2020-10-14) |")); assertThat( consoleOutput, containsString( - "| Valeria Mendoza | (10,2020-06-02) |")); + "| Valeria Mendoza | (10, 2020-06-02) |")); assertThat( consoleOutput, containsString( - "| Ellen Ortega | (100,2020-06-18) |")); + "| Ellen Ortega | (100, 2020-06-18) |")); assertThat( consoleOutput, containsString( - "| Leann Holloway | (9,2020-05-26) |")); + "| Leann Holloway | (9, 2020-05-26) |")); } private void testExecuteInternalRowMergerFunction(String consoleOutput) { assertThat( consoleOutput, containsString( - "| Guillermo Smith | (1992-12-12,New Jersey,816-... |")); + "| Guillermo Smith | (1992-12-12, New Jersey, 81... |")); assertThat( consoleOutput, containsString( - "| Valeria Mendoza | (1970-03-28,Los Angeles,928... |")); + "| Valeria Mendoza | (1970-03-28, Los Angeles, 9... |")); assertThat( consoleOutput, containsString( - "| Leann Holloway | (1989-05-21,Eugene,614-889-... |")); + "| Leann Holloway | (1989-05-21, Eugene, 614-88... |")); } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java index 9a92e48b2054c..9aa8642dee4ab 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java @@ -420,7 +420,7 @@ public class ExecutionConfigOptions { public static final ConfigOption TABLE_EXEC_LEGACY_CAST_BEHAVIOUR = key("table.exec.legacy-cast-behaviour") .enumType(LegacyCastBehaviour.class) - .defaultValue(LegacyCastBehaviour.ENABLED) + .defaultValue(LegacyCastBehaviour.DISABLED) .withDescription( "Determines whether CAST will operate following the legacy behaviour " + "or the new one that introduces various fixes and improvements."); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java index 18ae4e98ebba1..a12a14f402a2b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java @@ -99,7 +99,9 @@ public void testFilter() throws ExecutionException, InterruptedException, IOExce "STRING_SPLIT", new JavaUserDefinedTableFunctions.StringSplit()); createTestValuesSinkTable("MySink", "a STRING", "b STRING"); String query = - "insert into MySink SELECT a, v FROM MyTable, lateral table(STRING_SPLIT(a, ',')) as T(v) where cast(v as int) > 0"; + "insert into MySink " + + "SELECT a, v FROM MyTable, lateral table(STRING_SPLIT(a, ',')) as T(v) " + + "where try_cast(v as int) > 0"; compileSqlAndExecutePlan(query).await(); List expected = Arrays.asList("+I[1,1,hi, 1]", "+I[1,1,hi, 1]"); assertResult(expected, TestValuesTableFactory.getResults("MySink")); diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala index e591f1076fea0..09699c532b411 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala @@ -1640,10 +1640,10 @@ class CalcITCase extends BatchTestBase { tEnv.executeSql(ddl) checkResult( - "select a from MyTable where cast(b as boolean)", + "select a from MyTable where try_cast(b as boolean)", Seq(row(1))) checkResult( - "select cast(b as boolean) from MyTable", + "select try_cast(b as boolean) from MyTable", Seq(row(true), row(false), row(null), row(null))) }