Skip to content

Commit

Permalink
[FLINK-34312][table] Improve the handling of default node types for n…
Browse files Browse the repository at this point in the history
…amed parameters (apache#24235)


Co-authored-by: Shengkai <[email protected]>
  • Loading branch information
hackergin and fsk119 authored Feb 26, 2024
1 parent 922cc2a commit 1070c6e
Show file tree
Hide file tree
Showing 12 changed files with 240 additions and 297 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.calcite.sql.validate;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.planner.calcite.FlinkSqlCallBinding;

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlCall;
Expand Down Expand Up @@ -56,9 +57,9 @@ public final class ProcedureNamespace extends AbstractNamespace {
public RelDataType validateImpl(RelDataType targetRowType) {
validator.inferUnknownTypes(validator.unknownType, scope, call);
// The result is ignored but the type is derived to trigger the validation
validator.deriveTypeImpl(scope, call);
final SqlCallBinding callBinding = new FlinkSqlCallBinding(validator, scope, call);
validator.deriveTypeImpl(scope, callBinding.permutedCall());
final SqlOperator operator = call.getOperator();
final SqlCallBinding callBinding = new SqlCallBinding(validator, scope, call);
if (!(operator instanceof SqlTableFunction)) {
throw new IllegalArgumentException(
"Argument must be a table function: " + operator.getNameAsId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.calcite.sql.validate;

import org.apache.flink.table.planner.calcite.FlinkSqlCallBinding;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -158,16 +160,17 @@
* Default implementation of {@link SqlValidator}, the class was copied over because of
* CALCITE-4554.
*
* <p>Lines 1958 ~ 1978, Flink improves error message for functions without appropriate arguments in
* <p>Lines 1961 ~ 1981, Flink improves error message for functions without appropriate arguments in
* handleUnresolvedFunction at {@link SqlValidatorImpl#handleUnresolvedFunction}.
*
* <p>Lines 3736 ~ 3740, Flink improves Optimize the retrieval of sub-operands in SqlCall when using
* NamedParameters at {@link SqlValidatorImpl#checkRollUp}.
* <p>Lines 3739 ~ 3743, 6333 ~ 6339, Flink improves validating the SqlCall that uses named
* parameters, rearrange the order of sub-operands, and fill in missing operands with the default
* node.
*
* <p>Lines 5108 ~ 5121, Flink enables TIMESTAMP and TIMESTAMP_LTZ for system time period
* <p>Lines 5111 ~ 5124, Flink enables TIMESTAMP and TIMESTAMP_LTZ for system time period
* specification type at {@link org.apache.calcite.sql.validate.SqlValidatorImpl#validateSnapshot}.
*
* <p>Lines 5465 ~ 5471, Flink enables TIMESTAMP and TIMESTAMP_LTZ for first orderBy column in
* <p>Lines 5468 ~ 5474, Flink enables TIMESTAMP and TIMESTAMP_LTZ for first orderBy column in
* matchRecognize at {@link SqlValidatorImpl#validateMatchRecognize}.
*/
public class SqlValidatorImpl implements SqlValidatorWithHints {
Expand Down Expand Up @@ -6327,8 +6330,13 @@ public RelDataType visit(SqlLiteral literal) {

@Override
public RelDataType visit(SqlCall call) {
// ----- FLINK MODIFICATION BEGIN -----
FlinkSqlCallBinding flinkSqlCallBinding =
new FlinkSqlCallBinding(this.scope.getValidator(), scope, call);
final SqlOperator operator = call.getOperator();
return operator.deriveType(SqlValidatorImpl.this, scope, call);
return operator.deriveType(
SqlValidatorImpl.this, scope, flinkSqlCallBinding.permutedCall());
// ----- FLINK MODIFICATION END -----
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.planner.calcite.FlinkOperatorBinding;
import org.apache.flink.table.planner.calcite.FlinkSqlCallBinding;
import org.apache.flink.table.planner.calcite.TimestampSchemaVersion;
import org.apache.flink.table.planner.hint.ClearQueryHintsWithInvalidPropagationShuttle;
import org.apache.flink.table.planner.hint.FlinkHints;
Expand Down Expand Up @@ -235,14 +235,15 @@
* <p>FLINK modifications are at lines
*
* <ol>
* <li>Added in FLINK-29081, FLINK-28682, FLINK-33395: Lines 654 ~ 671
* <li>Added in Flink-24024: Lines 1435 ~ 1445, Lines 1459 ~ 1501
* <li>Added in FLINK-28682: Lines 2323 ~ 2340
* <li>Added in FLINK-28682: Lines 2377 ~ 2405
* <li>Added in FLINK-32474: Lines 2875 ~ 2887
* <li>Added in FLINK-32474: Lines 2987 ~ 3021
* <li>Added in FLINK-20873: Lines 5519 ~ 5528
* <li>Added in FLINK-34057, FLINK-34058: Lines 6090 ~ 6116
* <li>Added in FLINK-29081, FLINK-28682, FLINK-33395: Lines 655 ~ 673
* <li>Added in Flink-24024: Lines 1437 ~ 1447, Lines 1461 ~ 1503
* <li>Added in FLINK-28682: Lines 2325 ~ 2342
* <li>Added in FLINK-28682: Lines 2379 ~ 2407
* <li>Added in FLINK-32474: Lines 2877 ~ 2889
* <li>Added in FLINK-32474: Lines 2989 ~ 3023
* <li>Added in FLINK-20873: Lines 5521 ~ 5530
* <li>Added in FLINK-34312: Lines 5641 ~ 5644
* <li>Added in FLINK-34057, FLINK-34058, FLINK-34312: Lines 6093 ~ 6111
* </ol>
*/
@SuppressWarnings("UnstableApiUsage")
Expand Down Expand Up @@ -5637,8 +5638,10 @@ public RexNode visit(SqlCall call) {
() -> "agg.lookupAggregates for call " + call);
}
}
// ----- FLINK MODIFICATION BEGIN -----
return exprConverter.convertCall(
this, new SqlCallBinding(validator(), scope, call).permutedCall());
this, new FlinkSqlCallBinding(validator(), scope, call).permutedCall());
// ----- FLINK MODIFICATION END -----
}

@Override
Expand Down Expand Up @@ -6088,11 +6091,9 @@ private void translateAgg(
// switch out of agg mode
bb.agg = null;
// ----- FLINK MODIFICATION BEGIN -----
SqlCallBinding sqlCallBinding =
new SqlCallBinding(validator(), aggregatingSelectScope, call);
List<SqlNode> sqlNodes = sqlCallBinding.operands();
FlinkOperatorBinding flinkOperatorBinding =
new FlinkOperatorBinding(sqlCallBinding);
FlinkSqlCallBinding binding =
new FlinkSqlCallBinding(validator(), aggregatingSelectScope, call);
List<SqlNode> sqlNodes = binding.operands();
for (int i = 0; i < sqlNodes.size(); i++) {
SqlNode operand = sqlNodes.get(i);
// special case for COUNT(*): delete the *
Expand All @@ -6105,12 +6106,6 @@ private void translateAgg(
}
}
RexNode convertedExpr = bb.convertExpression(operand);
if (convertedExpr.getKind() == SqlKind.DEFAULT) {
RelDataType relDataType = flinkOperatorBinding.getOperandType(i);
convertedExpr =
((RexCall) convertedExpr)
.clone(relDataType, ((RexCall) convertedExpr).operands);
}
args.add(lookupOrCreateGroupExpr(convertedExpr));
}
// ----- FLINK MODIFICATION END -----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlCallBinding;
import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlIntervalQualifier;
import org.apache.calcite.sql.SqlKind;
Expand All @@ -44,10 +43,8 @@
import org.apache.calcite.sql2rel.SqlRexConvertletTable;
import org.apache.calcite.sql2rel.StandardConvertletTable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkArgument;

Expand All @@ -72,10 +69,6 @@ public SqlRexConvertlet get(SqlCall call) {
return this::convertSetSemanticsWindowTableFunction;
}

if (isContainsDefaultNode(call)) {
return this::convertSqlCallWithDefaultNode;
}

return StandardConvertletTable.INSTANCE.get(call);
}

Expand Down Expand Up @@ -120,12 +113,6 @@ private boolean isSetSemanticsWindowTableFunction(SqlCall call) {
return !operands.isEmpty() && operands.get(0).getKind() == SqlKind.SET_SEMANTICS_TABLE;
}

private boolean isContainsDefaultNode(SqlCall call) {
return call.getOperandList().stream()
.filter(Objects::nonNull)
.anyMatch(operand -> operand.getKind() == SqlKind.DEFAULT);
}

/**
* Due to CALCITE-6204, we need to manually extract partition keys and order keys and convert
* them to {@link RexSetSemanticsTableCall}.
Expand Down Expand Up @@ -215,30 +202,4 @@ private static int parseFieldIdx(RexNode e) {
// should not happen
throw new TableException("Unsupported partition key with type: " + e.getKind());
}

/**
* When the SqlCall contains a default operator, the type of the default node to ANY after
* converted to rel node. However, the ANY type cannot pass various checks well and cannot adapt
* well to types in flink. Therefore, we replace the ANY type with the argument type obtained
* from the operator.
*/
private RexNode convertSqlCallWithDefaultNode(SqlRexContext cx, final SqlCall call) {
RexNode rexCall = StandardConvertletTable.INSTANCE.convertCall(cx, call);
SqlCallBinding sqlCallBinding = new SqlCallBinding(cx.getValidator(), null, call);
FlinkOperatorBinding flinkOperatorBinding = new FlinkOperatorBinding(sqlCallBinding);
if (rexCall instanceof RexCall) {
List<RexNode> operands = new ArrayList<>(((RexCall) rexCall).operands);
for (int i = 0; i < operands.size(); i++) {
RexNode rexNode = operands.get(i);
if (rexNode.getKind() == SqlKind.DEFAULT && rexNode instanceof RexCall) {
RelDataType relDataType = flinkOperatorBinding.getOperandType(i);
operands.set(
i,
((RexCall) rexNode).clone(relDataType, ((RexCall) rexNode).operands));
}
}
return ((RexCall) rexCall).clone(rexCall.getType(), operands);
}
return rexCall;
}
}

This file was deleted.

Loading

0 comments on commit 1070c6e

Please sign in to comment.