Skip to content

Commit

Permalink
[FLINK-27683][table-planner] Fix SQL hints can't work with targetColumns
Browse files Browse the repository at this point in the history
This closes apache#19847
  • Loading branch information
xuanyu66 authored and godfreyhe committed Jun 6, 2022
1 parent 80cd04a commit 9bcc7fd
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import org.apache.calcite.plan.RelOptTable
import org.apache.calcite.prepare.CalciteCatalogReader
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory, RelDataTypeField}
import org.apache.calcite.runtime.{CalciteContextException, Resources}
import org.apache.calcite.sql.`type`.{SqlTypeName, SqlTypeUtil}
import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlIdentifier, SqlKind, SqlLiteral, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil}
import org.apache.calcite.sql.`type`.SqlTypeUtil
import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlIdentifier, SqlKind, SqlLiteral, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlTableRef, SqlUtil}
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.calcite.sql.util.SqlBasicVisitor
Expand Down Expand Up @@ -119,7 +119,10 @@ object PreValidateReWriter {
source: SqlCall,
partitions: SqlNodeList): SqlCall = {
val calciteCatalogReader = validator.getCatalogReader.unwrap(classOf[CalciteCatalogReader])
val names = sqlInsert.getTargetTable.asInstanceOf[SqlIdentifier].names
val names = sqlInsert.getTargetTable match {
case si: SqlIdentifier => si.names
case st: SqlTableRef => st.getOperandList.get(0).asInstanceOf[SqlIdentifier].names
}
val table = calciteCatalogReader.getTable(names)
if (table == null) {
// There is no table exists in current catalog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -779,4 +779,20 @@ Sink(table=[default_catalog.default_database.SinkJoinChangeLog], fields=[person,
]]>
</Resource>
</TestCase>
<TestCase name="testInsertWithTargetColumnsAndSqlHint">
<Resource name="ast">
<![CDATA[
LogicalSink(table=[default_catalog.default_database.appendSink], fields=[EXPR$0, c], hints=[[[OPTIONS options:{sink.parallelism=1}]]])
+- LogicalProject(EXPR$0=[+($0, $1)], c=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Sink(table=[default_catalog.default_database.appendSink], fields=[EXPR$0, c], hints=[[[OPTIONS options:{sink.parallelism=1}]]], changelogMode=[NONE])
+- Calc(select=[+(a, b) AS EXPR$0, c], changelogMode=[I])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], changelogMode=[I])
]]>
</Resource>
</TestCase>
</Root>
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,23 @@ class TableSinkTest extends TableTestBase {
|)
|""".stripMargin)

@Test
def testInsertWithTargetColumnsAndSqlHint(): Unit = {
util.addTable(s"""
|CREATE TABLE appendSink (
| `a` BIGINT,
| `b` STRING
|) WITH (
| 'connector' = 'values',
| 'sink-insert-only' = 'false'
|)
|""".stripMargin)
val stmtSet = util.tableEnv.createStatementSet()
stmtSet.addInsertSql(
"INSERT INTO appendSink /*+ OPTIONS('sink.parallelism' = '1') */(a, b) SELECT a + b, c FROM MyTable")
util.verifyRelPlan(stmtSet, ExplainDetail.CHANGELOG_MODE)
}

@Test
def testInsertMismatchTypeForEmptyChar(): Unit = {
util.addTable(s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,33 @@ class TableSinkITCase extends StreamingTestBase {
assertEquals(expected.sorted, result.sorted)
}

@Test
def testInsertWithTargetColumnsAndSqlHint(): Unit = {
val t = env
.fromCollection(smallTupleData3)
.toTable(tEnv, 'id, 'num, 'text)
tEnv.createTemporaryView("src", t)

tEnv.executeSql(s"""
|CREATE TABLE appendSink (
| `t` INT,
| `num` BIGINT,
| `text` STRING
|) WITH (
| 'connector' = 'values',
| 'sink-insert-only' = 'true'
|)
|""".stripMargin)
tEnv
.executeSql(
"INSERT INTO appendSink /*+ OPTIONS('sink.parallelism' = '1') */(t, num, text) SELECT id, num, text FROM src")
.await()

val result = TestValuesTableFactory.getResults("appendSink")
val expected = List("1,1,Hi", "2,2,Hello", "3,2,Hello world")
assertEquals(expected.sorted, result.sorted)
}

@Test
def testAppendSinkWithNestedRow(): Unit = {
val t = env
Expand Down

0 comments on commit 9bcc7fd

Please sign in to comment.