Skip to content

Commit

Permalink
[FLINK-33823] Make PlannerQueryOperation SQL serializable
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys committed Dec 20, 2023
1 parent aa5766e commit 5919251
Show file tree
Hide file tree
Showing 13 changed files with 83 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,10 @@ public static Stream<TestSpec> testData() {
.expectStr("OVERLAY(`f0` PLACING 'ABC' FROM 2 FOR 5)"),
TestSpec.forExpr($("f0").substr(2))
.withField("f0", DataTypes.STRING())
.expectStr("SUBSTR(`f0` FROM 2)"),
.expectStr("SUBSTR(`f0`, 2)"),
TestSpec.forExpr($("f0").substr(2, 5))
.withField("f0", DataTypes.STRING())
.expectStr("SUBSTR(`f0` FROM 2 FOR 5)"),
.expectStr("SUBSTR(`f0`, 2, 5)"),
TestSpec.forExpr($("f0").substring(2))
.withField("f0", DataTypes.STRING())
.expectStr("SUBSTRING(`f0` FROM 2)"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public Table fromValues(Object... values) {
public Table fromValues(AbstractDataType<?> dataType, Object... values) {
return env.fromValues(dataType, values);
}

@Override
public Table sqlQuery(String query) {
return env.sqlQuery(query);
}
});
}

Expand All @@ -83,5 +88,8 @@ public interface TableEnvAccessor {

/** See {@link TableEnvironment#fromValues(AbstractDataType, Object...)}. */
Table fromValues(AbstractDataType<?> dataType, Object... values);

/** See {@link TableEnvironment#sqlQuery(String)}. */
Table sqlQuery(String query);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
public static final BuiltInFunctionDefinition SUBSTR =
BuiltInFunctionDefinition.newBuilder()
.name("substr")
.callSyntax("SUBSTR", SqlCallSyntax.SUBSTRING)
.sqlName("SUBSTR")
.kind(SCALAR)
.inputTypeStrategy(
or(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.planner.operations;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.OperationUtils;
Expand All @@ -33,6 +34,7 @@

import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;

/** Wrapper for valid logical plans generated by Planner. */
@Internal
Expand All @@ -41,8 +43,11 @@ public class PlannerQueryOperation implements QueryOperation {
private final RelNode calciteTree;
private final ResolvedSchema resolvedSchema;

public PlannerQueryOperation(RelNode calciteTree) {
private final Supplier<String> toSqlString;

public PlannerQueryOperation(RelNode calciteTree, Supplier<String> toSqlString) {
this.calciteTree = calciteTree;
this.toSqlString = toSqlString;

RelDataType rowType = calciteTree.getRowType();
String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
Expand Down Expand Up @@ -72,6 +77,15 @@ public String asSummaryString() {
"PlannerNode", Collections.emptyMap(), getChildren(), Operation::asSummaryString);
}

@Override
public String asSerializableString() {
try {
return toSqlString.get();
} catch (Exception e) {
throw new TableException("Given plan is not serializable into SQL", e);
}
}

@Override
public List<QueryOperation> getChildren() {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,12 @@ private Operation convertDelete(SqlDelete sqlDelete) {
}
}
// delete push down is not applicable, use row-level delete
PlannerQueryOperation queryOperation = new PlannerQueryOperation(tableModify);
PlannerQueryOperation queryOperation =
new PlannerQueryOperation(
tableModify,
() -> {
throw new TableException("Delete statements are not SQL serializable.");
});
return new SinkModifyOperation(
contextResolvedTable,
queryOperation,
Expand All @@ -1340,7 +1345,12 @@ private Operation convertUpdate(SqlUpdate sqlUpdate) {
catalogManager.getTableOrError(
catalogManager.qualifyIdentifier(unresolvedTableIdentifier));
// get query
PlannerQueryOperation queryOperation = new PlannerQueryOperation(tableModify);
PlannerQueryOperation queryOperation =
new PlannerQueryOperation(
tableModify,
() -> {
throw new TableException("Update statements are not SQL serializable.");
});

// TODO calc target column list to index array, currently only simple SqlIdentifiers are
// available, this should be updated after FLINK-31344 fixed
Expand Down Expand Up @@ -1379,6 +1389,6 @@ private String getQuotedSqlString(SqlNode sqlNode) {
private PlannerQueryOperation toQueryOperation(FlinkPlannerImpl planner, SqlNode validated) {
// transform to a relational tree
RelRoot relational = planner.rel(validated);
return new PlannerQueryOperation(relational.project());
return new PlannerQueryOperation(relational.project(), () -> getQuotedSqlString(validated));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class SqlNodeConvertUtils {
static PlannerQueryOperation toQueryOperation(SqlNode validated, ConvertContext context) {
// transform to a relational tree
RelRoot relational = context.toRelRoot(validated);
return new PlannerQueryOperation(relational.project());
return new PlannerQueryOperation(
relational.project(), () -> context.toQuotedSqlString(validated));
}

/** convert the query part of a VIEW statement into a {@link CatalogView}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public Optional<EnumSet<SqlKind>> supportedSqlKinds() {
public Operation convertSqlNode(SqlNode node, ConvertContext context) {
// transform to a relational tree
RelRoot relational = context.toRelRoot(node);
return new PlannerQueryOperation(relational.project());
return new PlannerQueryOperation(
relational.project(), () -> context.toQuotedSqlString(node));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ public Operation convertSqlNode(SqlReplaceTableAs sqlReplaceTableAs, ConvertCont
SqlNode asQuerySqlNode = sqlReplaceTableAs.getAsQuery();
context.getSqlValidator().validate(asQuerySqlNode);
QueryOperation query =
new PlannerQueryOperation(context.toRelRoot(asQuerySqlNode).project());
new PlannerQueryOperation(
context.toRelRoot(asQuerySqlNode).project(),
() -> context.toQuotedSqlString(asQuerySqlNode));

// get table comment
String tableComment =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,9 @@ abstract class PlannerBase(
val contextResolvedTable = catalogManager.getTableOrError(objectIdentifier)
val modifyOperation = new SinkModifyOperation(
contextResolvedTable,
new PlannerQueryOperation(modify.getInput)
new PlannerQueryOperation(
modify.getInput,
() => queryOperation.asSerializableString())
)
translateToRel(modifyOperation)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public List<TableTestProgram> programs() {
QueryOperationTestPrograms.GROUP_HOP_WINDOW_EVENT_TIME,
QueryOperationTestPrograms.SORT_LIMIT_DESC,
QueryOperationTestPrograms.GROUP_BY_UDF_WITH_MERGE,
QueryOperationTestPrograms.NON_WINDOW_INNER_JOIN);
QueryOperationTestPrograms.NON_WINDOW_INNER_JOIN,
QueryOperationTestPrograms.SQL_QUERY_OPERATION);
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public List<TableTestProgram> programs() {
QueryOperationTestPrograms.ORDER_BY_QUERY_OPERATION,
QueryOperationTestPrograms.WINDOW_AGGREGATE_QUERY_OPERATION,
QueryOperationTestPrograms.UNION_ALL_QUERY_OPERATION,
QueryOperationTestPrograms.LATERAL_JOIN_QUERY_OPERATION);
QueryOperationTestPrograms.LATERAL_JOIN_QUERY_OPERATION,
QueryOperationTestPrograms.SQL_QUERY_OPERATION);
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,30 @@ private static Instant dayOfSeconds(int second) {
+ ") ORDER BY `a` ASC, `b` DESC OFFSET 1 ROWS FETCH NEXT 2 ROWS ONLY")
.build();

static final TableTestProgram SQL_QUERY_OPERATION =
TableTestProgram.of("sql-query-operation", "verifies sql serialization")
.setupTableSource(
SourceTestStep.newBuilder("s")
.addSchema("a bigint", "b string")
.producedValues(Row.of(1L, "abc"), Row.of(2L, "cde"))
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink")
.addSchema("a bigint", "b string")
.consumedValues(Row.of(3L, "bc"), Row.of(4L, "de"))
.build())
.runTableApi(
t ->
t.sqlQuery("SELECT a, b FROM s")
.select($("a").plus(2), $("b").substr(2, 3)),
"sink")
.runSql(
"SELECT (`a` + 2) AS `_c0`, (SUBSTR(`b`, 2, 3)) AS `_c1` FROM (\n"
+ " SELECT `s`.`a`, `s`.`b`\n"
+ " FROM `default_catalog`.`default_database`.`s` AS `s`\n"
+ ")")
.build();

static final TableTestProgram GROUP_HOP_WINDOW_EVENT_TIME =
TableTestProgram.of(
"group-window-aggregate-hop-event-time",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ import _root_.scala.collection.JavaConversions._
import _root_.scala.io.Source
import org.apache.calcite.avatica.util.TimeUnit
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.rel2sql.RelToSqlConverter
import org.apache.calcite.sql.{SqlExplainLevel, SqlIntervalQualifier}
import org.apache.calcite.sql.dialect.AnsiSqlDialect
import org.apache.calcite.sql.parser.SqlParserPos
import org.assertj.core.api.Assertions.{assertThat, assertThatExceptionOfType, fail}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
Expand Down Expand Up @@ -1282,7 +1284,10 @@ case class StreamTableTestUtil(
rowtimeFieldIdx,
expr
)
val queryOperation = new PlannerQueryOperation(watermarkAssigner)
val queryOperation = new PlannerQueryOperation(
watermarkAssigner,
() =>
throw new TableException("Cannot convert a LogicalWatermarkAssigner back to a SQL string."))
testingTableEnv.createTemporaryView(tableName, testingTableEnv.createTable(queryOperation))
}

Expand Down

0 comments on commit 5919251

Please sign in to comment.