Skip to content

Commit

Permalink
[FLINK-26551][table-planner] Change legacy casting config option to D…
Browse files Browse the repository at this point in the history
…ISABLED by default

This closes apache#19020.
  • Loading branch information
slinkydeveloper authored and twalthr committed Mar 15, 2022
1 parent 1572908 commit dd7472d
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 34 deletions.
4 changes: 2 additions & 2 deletions docs/content/docs/dev/table/types.md
Original file line number Diff line number Diff line change
Expand Up @@ -1553,7 +1553,7 @@ regardless of whether the function used is `CAST` or `TRY_CAST`.
### Legacy casting

Pre Flink 1.15 casting behaviour can be enabled by setting `table.exec.legacy-cast-behaviour` to `enabled`.
In Flink 1.15 this flag is enabled by default.
In Flink 1.15 this flag is disabled by default.

In particular, this will:

Expand All @@ -1562,7 +1562,7 @@ In particular, this will:
* Formatting of some casting to `CHAR`/`VARCHAR`/`STRING` produces slightly different results.

{{< hint warning >}}
We **discourage** the use of this flag and we **strongly suggest** for new projects to disable this flag and use the new casting behaviour.
We **discourage** the use of this flag and we **strongly suggest** for new projects to keep this flag disabled and use the new casting behaviour.
This flag will be removed in the next Flink versions.
{{< /hint >}}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
</tr>
<tr>
<td><h5>table.exec.legacy-cast-behaviour</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">ENABLED</td>
<td style="word-wrap: break-word;">DISABLED</td>
<td><p>Enum</p></td>
<td>Determines whether CAST will operate following the legacy behaviour or the new one that introduces various fixes and improvements.<br /><br />Possible values:<ul><li>"ENABLED": CAST will operate following the legacy behaviour.</li><li>"DISABLED": CAST will operate following the new correct behaviour.</li></ul></td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,9 @@ public void testDecimal() throws Exception {
TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
tableEnv.executeSql("create database db1");
try {
tableEnv.executeSql("create table db1.src1 (x decimal(10,2))");
tableEnv.executeSql("create table db1.src2 (x decimal(10,2))");
tableEnv.executeSql("create table db1.dest (x decimal(10,2))");
tableEnv.executeSql("create table db1.src1 (x decimal(12,2))");
tableEnv.executeSql("create table db1.src2 (x decimal(12,2))");
tableEnv.executeSql("create table db1.dest (x decimal(12,2))");
// populate src1 from Hive
// TABLE keyword in INSERT INTO is mandatory prior to 1.1.0
hiveShell.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void testGroupByInsert() throws Exception {
tEnv.executeSql(
String.format(
"insert into `%s` "
+ "select `int`, cast('A' as bytes), `short`, max(`long`), max(`real`), "
+ "select `int`, cast('41' as bytes), `short`, max(`long`), max(`real`), "
+ "max(`double_precision`), max(`numeric`), max(`decimal`), max(`boolean`), "
+ "max(`text`), 'B', 'C', max(`character_varying`), max(`timestamp`), "
+ "max(`date`), max(`time`), max(`default_numeric`) "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.utils.EncodingUtils;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.types.Row;

Expand All @@ -44,7 +45,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
Expand All @@ -58,6 +58,8 @@
import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.readLines;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT;
import static org.apache.flink.table.utils.TableTestMatchers.deepEqualTo;
import static org.apache.flink.util.CollectionUtil.entry;
import static org.apache.flink.util.CollectionUtil.map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -299,29 +301,20 @@ public void testKafkaSourceSinkWithMetadata() throws Exception {
+ " %s\n"
+ ")",
topic, bootstraps, groupId, formatOptions());

tEnv.executeSql(createTable);

String initialValues =
"INSERT INTO kafka\n"
+ "VALUES\n"
+ " ('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k1', X'C0FFEE', 'k2', X'BABE'], TRUE),\n"
+ " ('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k1', X'C0FFEE', 'k2', X'BABE01'], TRUE),\n"
+ " ('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', CAST(NULL AS MAP<STRING, BYTES>), FALSE),\n"
+ " ('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k1', X'10', 'k2', X'20'], TRUE)";
+ " ('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k1', X'102030', 'k2', X'203040'], TRUE)";
tEnv.executeSql(initialValues).await();

// ---------- Consume stream from Kafka -------------------

final List<Row> result = collectRows(tEnv.sqlQuery("SELECT * FROM kafka"), 3);

final Map<String, byte[]> headers1 = new HashMap<>();
headers1.put("k1", new byte[] {(byte) 0xC0, (byte) 0xFF, (byte) 0xEE});
headers1.put("k2", new byte[] {(byte) 0xBA, (byte) 0xBE});

final Map<String, byte[]> headers3 = new HashMap<>();
headers3.put("k1", new byte[] {(byte) 0x10});
headers3.put("k2", new byte[] {(byte) 0x20});

final List<Row> expected =
Arrays.asList(
Row.of(
Expand All @@ -330,7 +323,9 @@ public void testKafkaSourceSinkWithMetadata() throws Exception {
"CreateTime",
LocalDateTime.parse("2020-03-08T13:12:11.123"),
0,
headers1,
map(
entry("k1", EncodingUtils.decodeHex("C0FFEE")),
entry("k2", EncodingUtils.decodeHex("BABE01"))),
0,
topic,
true),
Expand All @@ -350,7 +345,9 @@ public void testKafkaSourceSinkWithMetadata() throws Exception {
"CreateTime",
LocalDateTime.parse("2020-03-10T13:12:11.123"),
0,
headers3,
map(
entry("k1", EncodingUtils.decodeHex("102030")),
entry("k2", EncodingUtils.decodeHex("203040"))),
0,
topic,
true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,41 +41,41 @@ private void testExecuteLastDatedValueFunction(String consoleOutput) {
assertThat(
consoleOutput,
containsString(
"| Guillermo Smith | (5,2020-12-05) |"));
"| Guillermo Smith | (5, 2020-12-05) |"));
assertThat(
consoleOutput,
containsString(
"| John Turner | (12,2020-10-02) |"));
"| John Turner | (12, 2020-10-02) |"));
assertThat(
consoleOutput,
containsString(
"| Brandy Sanders | (1,2020-10-14) |"));
"| Brandy Sanders | (1, 2020-10-14) |"));
assertThat(
consoleOutput,
containsString(
"| Valeria Mendoza | (10,2020-06-02) |"));
"| Valeria Mendoza | (10, 2020-06-02) |"));
assertThat(
consoleOutput,
containsString(
"| Ellen Ortega | (100,2020-06-18) |"));
"| Ellen Ortega | (100, 2020-06-18) |"));
assertThat(
consoleOutput,
containsString(
"| Leann Holloway | (9,2020-05-26) |"));
"| Leann Holloway | (9, 2020-05-26) |"));
}

private void testExecuteInternalRowMergerFunction(String consoleOutput) {
assertThat(
consoleOutput,
containsString(
"| Guillermo Smith | (1992-12-12,New Jersey,816-... |"));
"| Guillermo Smith | (1992-12-12, New Jersey, 81... |"));
assertThat(
consoleOutput,
containsString(
"| Valeria Mendoza | (1970-03-28,Los Angeles,928... |"));
"| Valeria Mendoza | (1970-03-28, Los Angeles, 9... |"));
assertThat(
consoleOutput,
containsString(
"| Leann Holloway | (1989-05-21,Eugene,614-889-... |"));
"| Leann Holloway | (1989-05-21, Eugene, 614-88... |"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ public class ExecutionConfigOptions {
public static final ConfigOption<LegacyCastBehaviour> TABLE_EXEC_LEGACY_CAST_BEHAVIOUR =
key("table.exec.legacy-cast-behaviour")
.enumType(LegacyCastBehaviour.class)
.defaultValue(LegacyCastBehaviour.ENABLED)
.defaultValue(LegacyCastBehaviour.DISABLED)
.withDescription(
"Determines whether CAST will operate following the legacy behaviour "
+ "or the new one that introduces various fixes and improvements.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ public void testFilter() throws ExecutionException, InterruptedException, IOExce
"STRING_SPLIT", new JavaUserDefinedTableFunctions.StringSplit());
createTestValuesSinkTable("MySink", "a STRING", "b STRING");
String query =
"insert into MySink SELECT a, v FROM MyTable, lateral table(STRING_SPLIT(a, ',')) as T(v) where cast(v as int) > 0";
"insert into MySink "
+ "SELECT a, v FROM MyTable, lateral table(STRING_SPLIT(a, ',')) as T(v) "
+ "where try_cast(v as int) > 0";
compileSqlAndExecutePlan(query).await();
List<String> expected = Arrays.asList("+I[1,1,hi, 1]", "+I[1,1,hi, 1]");
assertResult(expected, TestValuesTableFactory.getResults("MySink"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1640,10 +1640,10 @@ class CalcITCase extends BatchTestBase {
tEnv.executeSql(ddl)

checkResult(
"select a from MyTable where cast(b as boolean)",
"select a from MyTable where try_cast(b as boolean)",
Seq(row(1)))
checkResult(
"select cast(b as boolean) from MyTable",
"select try_cast(b as boolean) from MyTable",
Seq(row(true), row(false), row(null), row(null)))
}

Expand Down

0 comments on commit dd7472d

Please sign in to comment.