Skip to content

Commit

Permalink
[FLINK-13774][table] FieldComputer should return ResolvedExpression
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored and wuchong committed Aug 29, 2019
1 parent de22d7c commit e922393
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,19 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ResolvedFieldReference;
import org.apache.flink.table.types.DataType;

import java.sql.Timestamp;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.typeLiteral;
import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedCall;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST;
import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -99,10 +100,11 @@ public Expression getExpression(ResolvedFieldReference[] fieldAccesses) {
case TIMESTAMP_WITHOUT_TIME_ZONE:
return fieldReferenceExpr;
case VARCHAR:
return unresolvedCall(
DataType outputType = TIMESTAMP(3).bridgedTo(Timestamp.class);
return new CallExpression(
CAST,
fieldReferenceExpr,
typeLiteral(TIMESTAMP(3).bridgedTo(Timestamp.class)));
Arrays.asList(fieldReferenceExpr, typeLiteral(outputType)),
outputType);
default:
throw new RuntimeException("Unsupport type: " + type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ package org.apache.flink.table.planner.sources
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.table.api.{DataTypes, ValidationException}
import org.apache.flink.table.expressions.ResolvedFieldReference
import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, unresolvedCall, valueLiteral}
import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, valueLiteral}
import org.apache.flink.table.expressions.{CallExpression, ResolvedExpression, ResolvedFieldReference}
import org.apache.flink.table.functions.BuiltInFunctionDefinitions
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.expressions.RexNodeConverter
Expand Down Expand Up @@ -285,11 +285,14 @@ object TableSourceUtil {
// add cast to requested type and convert expression to RexNode
// blink runner treats numeric types as seconds in the cast of timestamp and numerical types.
// So we use REINTERPRET_CAST to keep the mills of numeric types.
val castExpression = unresolvedCall(
val outputType = DataTypes.TIMESTAMP(3).bridgedTo(classOf[Timestamp])
val castExpression = new CallExpression(
BuiltInFunctionDefinitions.REINTERPRET_CAST,
expression,
typeLiteral(DataTypes.TIMESTAMP(3).bridgedTo(classOf[Timestamp])),
valueLiteral(false))
Seq(
expression.asInstanceOf[ResolvedExpression],
typeLiteral(outputType),
valueLiteral(false)),
outputType)
val rexExpression = castExpression.accept(new RexNodeConverter(relBuilder))
relBuilder.clear()
rexExpression
Expand Down

0 comments on commit e922393

Please sign in to comment.