Skip to content

Commit

Permalink
[FLINK-22857][table-planner] Simplify DefaultSqlExprToRexConverterFac…
Browse files Browse the repository at this point in the history
…tory to RexFactory
  • Loading branch information
twalthr committed May 2, 2022
1 parent acf7f4a commit 6d81f6c
Show file tree
Hide file tree
Showing 19 changed files with 157 additions and 224 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public class HiveParser extends ParserImpl {
catalogManager,
validatorSupplier,
calciteParserSupplier,
plannerContext.getSqlExprToRexConverterFactory());
plannerContext.getRexFactory());
this.plannerContext = plannerContext;
this.catalogReader = plannerContext.createCatalogReader(false);
this.frameworkConfig = plannerContext.createFrameworkConfig();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.calcite;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlDialect;

import javax.annotation.Nullable;

import java.util.function.Supplier;

/** Planner internal factory for parsing/translating to {@link RexNode}. */
@Internal
public class RexFactory {

private final FlinkTypeFactory typeFactory;

private final Supplier<FlinkPlannerImpl> plannerSupplier;

private final Supplier<SqlDialect> sqlDialectSupplier;

public RexFactory(
FlinkTypeFactory typeFactory,
Supplier<FlinkPlannerImpl> plannerSupplier,
Supplier<SqlDialect> sqlDialectSupplier) {
this.typeFactory = typeFactory;
this.plannerSupplier = plannerSupplier;
this.sqlDialectSupplier = sqlDialectSupplier;
}

/**
* Creates a new instance of {@link SqlToRexConverter} to convert SQL expression to {@link
* RexNode}.
*/
public SqlToRexConverter createSqlToRexConverter(
RelDataType inputRowType, @Nullable RelDataType outputType) {
return new SqlToRexConverter(
plannerSupplier.get(), sqlDialectSupplier.get(), inputRowType, outputType);
}

/**
* Creates a new instance of {@link SqlToRexConverter} to convert SQL expression to {@link
* RexNode}.
*/
public SqlToRexConverter createSqlToRexConverter(
RowType inputRowType, @Nullable LogicalType outputType) {
final RelDataType convertedInputRowType = typeFactory.buildRelNodeRowType(inputRowType);

final RelDataType convertedOutputType;
if (outputType != null) {
convertedOutputType = typeFactory.createFieldTypeFromLogicalType(outputType);
} else {
convertedOutputType = null;
}

return createSqlToRexConverter(convertedInputRowType, convertedOutputType);
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,21 @@

package org.apache.flink.table.planner.calcite;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.planner.parse.CalciteParser;
import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader;

import org.apache.calcite.config.CalciteConnectionConfigImpl;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.tools.FrameworkConfig;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.Properties;
import java.util.stream.Stream;

/** Standard implementation of {@link SqlExprToRexConverter}. */
public class SqlExprToRexConverterImpl implements SqlExprToRexConverter {
/** Converts SQL expressions to {@link RexNode}. */
@Internal
public class SqlToRexConverter {

private final FlinkPlannerImpl planner;

Expand All @@ -48,56 +42,50 @@ public class SqlExprToRexConverterImpl implements SqlExprToRexConverter {

private final @Nullable RelDataType outputType;

public SqlExprToRexConverterImpl(
FrameworkConfig config,
FlinkTypeFactory typeFactory,
RelOptCluster cluster,
public SqlToRexConverter(
FlinkPlannerImpl planner,
SqlDialect sqlDialect,
RelDataType inputRowType,
@Nullable RelDataType outputType) {
this.planner =
new FlinkPlannerImpl(
config,
(isLenient) -> createEmptyCatalogReader(typeFactory),
typeFactory,
cluster);
this.planner = planner;
this.sqlDialect = sqlDialect;
this.inputRowType = inputRowType;
this.outputType = outputType;
}

@Override
/**
* Converts the given SQL expression string to an expanded string with fully qualified function
* calls and escaped identifiers.
*
* <p>E.g. {@code my_udf(f0) + 1} to {@code `my_catalog`.`my_database`.`my_udf`(`f0`) + 1}
*/
public String expand(String expr) {
final CalciteParser parser = planner.parser();
final SqlNode node = parser.parseExpression(expr);
final SqlNode validated = planner.validateExpression(node, inputRowType, outputType);
return validated.toSqlString(sqlDialect).getSql();
}

@Override
/**
* Converts a SQL expression to a {@link RexNode} expression.
*
* @param expr SQL expression e.g. {@code `my_catalog`.`my_database`.`my_udf`(`f0`) + 1}
*/
public RexNode convertToRexNode(String expr) {
final CalciteParser parser = planner.parser();
return planner.rex(parser.parseExpression(expr), inputRowType, outputType);
}

@Override
/**
* Converts an array of SQL expressions to an array of {@link RexNode} expressions.
*
* @param exprs SQL expression e.g. {@code `my_catalog`.`my_database`.`my_udf`(`f0`) + 1}
*/
public RexNode[] convertToRexNodes(String[] exprs) {
final CalciteParser parser = planner.parser();
return Stream.of(exprs)
.map(parser::parseExpression)
.map(node -> planner.rex(node, inputRowType, null))
.toArray(RexNode[]::new);
}

// ------------------------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------------------------

private static CalciteCatalogReader createEmptyCatalogReader(FlinkTypeFactory typeFactory) {
return new FlinkCalciteCatalogReader(
CalciteSchema.createRootSchema(false),
Collections.emptyList(),
typeFactory,
new CalciteConnectionConfigImpl(new Properties()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ public Parser create(Context context) {
context.getCatalogManager(),
context.getPlannerContext()::createFlinkPlanner,
context.getPlannerContext()::createCalciteParser,
context.getPlannerContext().getSqlExprToRexConverterFactory());
context.getPlannerContext().getRexFactory());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.calcite.SqlExprToRexConverter;
import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
import org.apache.flink.table.planner.calcite.RexFactory;
import org.apache.flink.table.planner.calcite.SqlToRexConverter;
import org.apache.flink.table.planner.expressions.RexNodeExpression;
import org.apache.flink.table.planner.operations.SqlToOperationConverter;
import org.apache.flink.table.planner.parse.CalciteParser;
Expand Down Expand Up @@ -65,18 +65,18 @@ public class ParserImpl implements Parser {
// multiple statements parsing
private final Supplier<FlinkPlannerImpl> validatorSupplier;
private final Supplier<CalciteParser> calciteParserSupplier;
private final SqlExprToRexConverterFactory sqlExprToRexConverterFactory;
private final RexFactory rexFactory;
private static final ExtendedParser EXTENDED_PARSER = ExtendedParser.INSTANCE;

public ParserImpl(
CatalogManager catalogManager,
Supplier<FlinkPlannerImpl> validatorSupplier,
Supplier<CalciteParser> calciteParserSupplier,
SqlExprToRexConverterFactory sqlExprToRexConverterFactory) {
RexFactory rexFactory) {
this.catalogManager = catalogManager;
this.validatorSupplier = validatorSupplier;
this.calciteParserSupplier = calciteParserSupplier;
this.sqlExprToRexConverterFactory = sqlExprToRexConverterFactory;
this.rexFactory = rexFactory;
}

/**
Expand Down Expand Up @@ -118,12 +118,12 @@ public UnresolvedIdentifier parseIdentifier(String identifier) {
public ResolvedExpression parseSqlExpression(
String sqlExpression, RowType inputRowType, @Nullable LogicalType outputType) {
try {
final SqlExprToRexConverter sqlExprToRexConverter =
sqlExprToRexConverterFactory.create(inputRowType, outputType);
final RexNode rexNode = sqlExprToRexConverter.convertToRexNode(sqlExpression);
final SqlToRexConverter sqlToRexConverter =
rexFactory.createSqlToRexConverter(inputRowType, outputType);
final RexNode rexNode = sqlToRexConverter.convertToRexNode(sqlExpression);
final LogicalType logicalType = FlinkTypeFactory.toLogicalType(rexNode.getType());
// expand expression for serializable expression strings similar to views
final String sqlExpressionExpanded = sqlExprToRexConverter.expand(sqlExpression);
final String sqlExpressionExpanded = sqlToRexConverter.expand(sqlExpression);
return new RexNodeExpression(
rexNode,
TypeConversions.fromLogicalToDataType(logicalType),
Expand Down
Loading

0 comments on commit 6d81f6c

Please sign in to comment.