Skip to content

Commit

Permalink
[FLINK-13782] Implement type inference for more logical expressions
Browse files Browse the repository at this point in the history
Implemented input & output type inference for:
AND/OR/NOT/IS NULL/IS NOT NULL/IS TRUE/IS FALSE/IS NOT TRUE/IS NOT
FALSE/BETWEEN/NOT BETWEEN

This closes apache#12387
  • Loading branch information
dawidwys committed May 28, 2020
1 parent 618b779 commit 8e07ca0
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 222 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.flink.table.types.inference.InputTypeStrategies;
import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.StructuredType.StructuredComparision;
import org.apache.flink.util.Preconditions;

import java.lang.reflect.Field;
Expand All @@ -41,9 +43,12 @@
import static org.apache.flink.table.types.inference.InputTypeStrategies.TWO_EQUALS_COMPARABLE;
import static org.apache.flink.table.types.inference.InputTypeStrategies.TWO_FULLY_COMPARABLE;
import static org.apache.flink.table.types.inference.InputTypeStrategies.and;
import static org.apache.flink.table.types.inference.InputTypeStrategies.comparable;
import static org.apache.flink.table.types.inference.InputTypeStrategies.logical;
import static org.apache.flink.table.types.inference.InputTypeStrategies.or;
import static org.apache.flink.table.types.inference.InputTypeStrategies.sequence;
import static org.apache.flink.table.types.inference.InputTypeStrategies.varyingSequence;
import static org.apache.flink.table.types.inference.InputTypeStrategies.wildcardWithCount;
import static org.apache.flink.table.types.inference.TypeStrategies.explicit;
import static org.apache.flink.table.types.inference.TypeStrategies.nullable;

Expand All @@ -58,19 +63,34 @@ public final class BuiltInFunctionDefinitions {
new BuiltInFunctionDefinition.Builder()
.name("and")
.kind(SCALAR)
.outputTypeStrategy(TypeStrategies.MISSING)
.inputTypeStrategy(
varyingSequence(
logical(LogicalTypeRoot.BOOLEAN),
logical(LogicalTypeRoot.BOOLEAN),
logical(LogicalTypeRoot.BOOLEAN)
)
)
.outputTypeStrategy(nullable(explicit(DataTypes.BOOLEAN())))
.build();
public static final BuiltInFunctionDefinition OR =
new BuiltInFunctionDefinition.Builder()
.name("or")
.kind(SCALAR)
.outputTypeStrategy(TypeStrategies.MISSING)
.inputTypeStrategy(
varyingSequence(
logical(LogicalTypeRoot.BOOLEAN),
logical(LogicalTypeRoot.BOOLEAN),
logical(LogicalTypeRoot.BOOLEAN)
)
)
.outputTypeStrategy(nullable(explicit(DataTypes.BOOLEAN())))
.build();
public static final BuiltInFunctionDefinition NOT =
new BuiltInFunctionDefinition.Builder()
.name("not")
.kind(SCALAR)
.outputTypeStrategy(TypeStrategies.MISSING)
.inputTypeStrategy(sequence(logical(LogicalTypeRoot.BOOLEAN)))
.outputTypeStrategy(nullable(explicit(DataTypes.BOOLEAN())))
.build();
public static final BuiltInFunctionDefinition IF =
new BuiltInFunctionDefinition.Builder()
Expand Down Expand Up @@ -126,49 +146,57 @@ public final class BuiltInFunctionDefinitions {
new BuiltInFunctionDefinition.Builder()
.name("isNull")
.kind(SCALAR)
.outputTypeStrategy(TypeStrategies.MISSING)
.inputTypeStrategy(wildcardWithCount(ConstantArgumentCount.of(1)))
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
.build();
public static final BuiltInFunctionDefinition IS_NOT_NULL =
new BuiltInFunctionDefinition.Builder()
.name("isNotNull")
.kind(SCALAR)
.outputTypeStrategy(TypeStrategies.MISSING)
.inputTypeStrategy(wildcardWithCount(ConstantArgumentCount.of(1)))
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
.build();
public static final BuiltInFunctionDefinition IS_TRUE =
new BuiltInFunctionDefinition.Builder()
.name("isTrue")
.kind(SCALAR)
.outputTypeStrategy(TypeStrategies.MISSING)
.inputTypeStrategy(sequence(logical(LogicalTypeRoot.BOOLEAN)))
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
.build();
public static final BuiltInFunctionDefinition IS_FALSE =
new BuiltInFunctionDefinition.Builder()
.name("isFalse")
.kind(SCALAR)
.outputTypeStrategy(TypeStrategies.MISSING)
.inputTypeStrategy(sequence(logical(LogicalTypeRoot.BOOLEAN)))
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
.build();
public static final BuiltInFunctionDefinition IS_NOT_TRUE =
new BuiltInFunctionDefinition.Builder()
.name("isNotTrue")
.kind(SCALAR)
.outputTypeStrategy(TypeStrategies.MISSING)
.inputTypeStrategy(sequence(logical(LogicalTypeRoot.BOOLEAN)))
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
.build();
public static final BuiltInFunctionDefinition IS_NOT_FALSE =
new BuiltInFunctionDefinition.Builder()
.name("isNotFalse")
.kind(SCALAR)
.outputTypeStrategy(TypeStrategies.MISSING)
.inputTypeStrategy(sequence(logical(LogicalTypeRoot.BOOLEAN)))
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
.build();
public static final BuiltInFunctionDefinition BETWEEN =
new BuiltInFunctionDefinition.Builder()
.name("between")
.kind(SCALAR)
.outputTypeStrategy(TypeStrategies.MISSING)
.inputTypeStrategy(comparable(ConstantArgumentCount.of(3), StructuredComparision.FULL))
.outputTypeStrategy(nullable(explicit(DataTypes.BOOLEAN())))
.build();
public static final BuiltInFunctionDefinition NOT_BETWEEN =
new BuiltInFunctionDefinition.Builder()
.name("notBetween")
.kind(SCALAR)
.outputTypeStrategy(TypeStrategies.MISSING)
.inputTypeStrategy(comparable(ConstantArgumentCount.of(3), StructuredComparision.FULL))
.outputTypeStrategy(nullable(explicit(DataTypes.BOOLEAN())))
.build();

// aggregate functions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,58 +160,14 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
expr.validateInput()
expr

case AND =>
assert(args.size >= 2)
args.reduceLeft(And)

case OR =>
assert(args.size >= 2)
args.reduceLeft(Or)

case NOT =>
assert(args.size == 1)
Not(args.head)

case IN =>
assert(args.size > 1)
In(args.head, args.drop(1))

case IS_NULL =>
assert(args.size == 1)
IsNull(args.head)

case IS_NOT_NULL =>
assert(args.size == 1)
IsNotNull(args.head)

case IS_TRUE =>
assert(args.size == 1)
IsTrue(args.head)

case IS_FALSE =>
assert(args.size == 1)
IsFalse(args.head)

case IS_NOT_TRUE =>
assert(args.size == 1)
IsNotTrue(args.head)

case IS_NOT_FALSE =>
assert(args.size == 1)
IsNotFalse(args.head)

case IF =>
assert(args.size == 3)
If(args.head, args(1), args.last)

case BETWEEN =>
assert(args.size == 3)
Between(args.head, args(1), args.last)

case NOT_BETWEEN =>
assert(args.size == 3)
NotBetween(args.head, args(1), args.last)

case DISTINCT =>
assert(args.size == 1)
DistinctAgg(args.head)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,46 +22,6 @@ import org.apache.flink.table.planner.validate._
import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils.isDecimal

abstract class BinaryPredicate extends BinaryExpression {
override private[flink] def resultType = BasicTypeInfo.BOOLEAN_TYPE_INFO

override private[flink] def validateInput(): ValidationResult = {
if (left.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO &&
right.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
ValidationSuccess
} else {
ValidationFailure(s"$this only accepts children of Boolean type, " +
s"get $left : ${left.resultType} and $right : ${right.resultType}")
}
}
}

case class Not(child: PlannerExpression) extends UnaryExpression {

override def toString = s"!($child)"

override private[flink] def resultType = BasicTypeInfo.BOOLEAN_TYPE_INFO

override private[flink] def validateInput(): ValidationResult = {
if (child.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
ValidationSuccess
} else {
ValidationFailure(s"Not operator requires a boolean expression as input, " +
s"but $child is of type ${child.resultType}")
}
}
}

case class And(left: PlannerExpression, right: PlannerExpression) extends BinaryPredicate {

override def toString = s"$left && $right"
}

case class Or(left: PlannerExpression, right: PlannerExpression) extends BinaryPredicate {

override def toString = s"$left || $right"
}

@deprecated(
"Use ifThenElse(...) instead. It is available through the implicit Scala DSL.",
"1.8.0")
Expand Down

0 comments on commit 8e07ca0

Please sign in to comment.