Skip to content

Commit

Permalink
[FLINK-15149][table-common] Merge InputTypeStrategy and InputTypeVali…
Browse files Browse the repository at this point in the history
…dator

This merges the concepts of InputTypeStrategy for inferring input data types
and enriching data types with conversion classes and InputTypeValidator for
validating arguments. It enables implicit casting and simplifies the overall
design.

This closes apache#10546.
  • Loading branch information
twalthr committed Dec 19, 2019
1 parent 36aff23 commit e72a5a7
Show file tree
Hide file tree
Showing 38 changed files with 1,445 additions and 1,648 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.DataTypeLookup;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.Expression;
Expand Down Expand Up @@ -184,7 +186,12 @@ private ResolvedExpression runTypeInference(

final Result inferenceResult = TypeInferenceUtil.runTypeInference(
inference,
new TableApiCallContext(name, unresolvedCall.getFunctionDefinition(), resolvedArgs), surroundingInfo);
new TableApiCallContext(
new UnsupportedDataTypeLookup(),
name,
unresolvedCall.getFunctionDefinition(),
resolvedArgs),
surroundingInfo);

final List<ResolvedExpression> adaptedArguments = adaptArguments(inferenceResult, resolvedArgs);

Expand Down Expand Up @@ -268,28 +275,48 @@ private FunctionDefinition prepareUserDefinedFunction(FunctionDefinition definit

// --------------------------------------------------------------------------------------------

private static class UnsupportedDataTypeLookup implements DataTypeLookup {

@Override
public Optional<DataType> lookupDataType(String name) {
throw new TableException("Data type lookup is not supported yet.");
}

@Override
public Optional<DataType> lookupDataType(UnresolvedIdentifier identifier) {
throw new TableException("Data type lookup is not supported yet.");
}

@Override
public DataType resolveRawDataType(Class<?> clazz) {
throw new TableException("Data type lookup is not supported yet.");
}
}

private static class TableApiCallContext implements CallContext {

private final DataTypeLookup lookup;

private final String name;

private final FunctionDefinition definition;

private final List<ResolvedExpression> resolvedArgs;

public TableApiCallContext(
DataTypeLookup lookup,
String name,
FunctionDefinition definition,
List<ResolvedExpression> resolvedArgs) {
this.lookup = lookup;
this.name = name;
this.definition = definition;
this.resolvedArgs = resolvedArgs;
}

@Override
public List<DataType> getArgumentDataTypes() {
return resolvedArgs.stream()
.map(ResolvedExpression::getOutputDataType)
.collect(Collectors.toList());
public DataTypeLookup getDataTypeLookup() {
return lookup;
}

@Override
Expand Down Expand Up @@ -321,6 +348,18 @@ public String getName() {
return name;
}

@Override
public List<DataType> getArgumentDataTypes() {
return resolvedArgs.stream()
.map(ResolvedExpression::getOutputDataType)
.collect(Collectors.toList());
}

@Override
public Optional<DataType> getOutputDataType() {
return Optional.empty();
}

private ResolvedExpression getArgument(int pos) {
if (pos >= resolvedArgs.size()) {
throw new IndexOutOfBoundsException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
package org.apache.flink.table.annotation;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.types.inference.InputTypeValidators;
import org.apache.flink.table.types.inference.InputTypeStrategies;
import org.apache.flink.table.types.logical.LogicalTypeFamily;

/**
* A list of commonly used pre-defined groups of similar types for accepting more than just one data
* type as an input argument in {@link DataTypeHint}s.
*
* <p>This list exposes a combination of {@link LogicalTypeFamily} and {@link InputTypeValidators}
* <p>This list exposes a combination of {@link LogicalTypeFamily} and {@link InputTypeStrategies}
* via annotations for convenient inline usage.
*/
@PublicEvolving
Expand All @@ -38,7 +38,7 @@ public enum InputGroup {
UNKNOWN,

/**
* Enables input wildcards. Any data type can be passed. The behavior is equal to {@link InputTypeValidators#ANY}.
* Enables input wildcards. Any data type can be passed. The behavior is equal to {@link InputTypeStrategies#ANY}.
*
* <p>Note: The class of the annotated element must be {@link Object} as this is the super class
* of all possibly passed data types.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.InputTypeValidator;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeStrategy;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -103,28 +103,28 @@ public Builder kind(FunctionKind kind) {
return this;
}

public Builder inputTypeValidator(InputTypeValidator inputTypeValidator) {
this.typeInferenceBuilder.inputTypeValidator(inputTypeValidator);
public Builder namedArguments(List<String> argumentNames) {
this.typeInferenceBuilder.namedArguments(argumentNames);
return this;
}

public Builder accumulatorTypeStrategy(TypeStrategy accumulatorTypeStrategy) {
this.typeInferenceBuilder.accumulatorTypeStrategy(accumulatorTypeStrategy);
public Builder typedArguments(List<DataType> argumentTypes) {
this.typeInferenceBuilder.typedArguments(argumentTypes);
return this;
}

public Builder outputTypeStrategy(TypeStrategy outputTypeStrategy) {
this.typeInferenceBuilder.outputTypeStrategy(outputTypeStrategy);
public Builder inputTypeStrategy(InputTypeStrategy inputTypeStrategy) {
this.typeInferenceBuilder.inputTypeStrategy(inputTypeStrategy);
return this;
}

public Builder namedArguments(List<String> argumentNames) {
this.typeInferenceBuilder.namedArguments(argumentNames);
public Builder accumulatorTypeStrategy(TypeStrategy accumulatorTypeStrategy) {
this.typeInferenceBuilder.accumulatorTypeStrategy(accumulatorTypeStrategy);
return this;
}

public Builder typedArguments(List<DataType> argumentTypes) {
this.typeInferenceBuilder.typedArguments(argumentTypes);
public Builder outputTypeStrategy(TypeStrategy outputTypeStrategy) {
this.typeInferenceBuilder.outputTypeStrategy(outputTypeStrategy);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import org.apache.flink.table.catalog.DataTypeLookup;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.extraction.DataTypeExtractor;
import org.apache.flink.table.types.inference.ArgumentTypeValidator;
import org.apache.flink.table.types.inference.InputTypeValidator;
import org.apache.flink.table.types.inference.InputTypeValidators;
import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
import org.apache.flink.table.types.inference.InputTypeStrategies;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.table.types.inference.TypeStrategy;

Expand All @@ -44,7 +44,7 @@

/**
* Internal representation of a {@link DataTypeHint} and template for creating a single {@link DataType}
* or a {@link InputTypeValidator} for groups of {@link DataType}s.
* or a {@link InputTypeStrategy} for groups of {@link DataType}s.
*
* <p>All parameters of a template are optional. An empty annotation results in a template where all
* members are {@code null}.
Expand Down Expand Up @@ -212,17 +212,17 @@ public boolean hasInputGroupDefinition() {
}

/**
* Converts this template into an {@link ArgumentTypeValidator}.
* Converts this template into an {@link ArgumentTypeStrategy}.
*/
public ArgumentTypeValidator toArgumentTypeValidator() {
public ArgumentTypeStrategy toArgumentTypeStrategy() {
// data type
if (hasDataTypeDefinition()) {
return InputTypeValidators.explicit(dataType);
return InputTypeStrategies.explicit(dataType);
}
// input group
else if (hasInputGroupDefinition()) {
if (inputGroup == InputGroup.ANY) {
return InputTypeValidators.ANY;
return InputTypeStrategies.ANY;
}
}
throw ExtractionUtils.extractionError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,32 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.types.DataType;

import java.util.Optional;

/**
* Validator that checks a single input argument type of a function call.
* Strategy for inferring and validating a single input argument type of a function call.
*/
@PublicEvolving
public interface ArgumentTypeValidator {
public interface ArgumentTypeStrategy {

/**
* Main logic for validating a single input type. Returns {@code true} if the argument is valid for the
* given call, {@code false} otherwise.
* Main logic for inferring and validating an argument. Returns the data type that is valid for
* the given call. If the returned type differs from {@link CallContext#getArgumentDataTypes()} at
* {@code argumentPos}, a casting operation can be inserted. An empty result means that the given
* input type could not be inferred.
*
* @param callContext provides details about the function call
* @param argumentPos argument index in the {@link CallContext}
* @param throwOnFailure whether this function is allowed to throw an {@link ValidationException}
* with a meaningful exception in case the validation is not successful or
* if this function should simply return {@code false}.
* with a meaningful exception in case the inference is not successful or
* if this function should simply return an empty result.
* @return three-state result for either "true, same data type as argument", "true, but argument
* must be casted to returned data type", or "false, no inferred data type could be found"
* @see CallContext#newValidationError(String, Object...)
*/
boolean validateArgument(CallContext callContext, int argumentPos, boolean throwOnFailure);
Optional<DataType> inferArgumentType(CallContext callContext, int argumentPos, boolean throwOnFailure);

/**
* Returns a summary of the function's expected argument at {@code argumentPos}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.flink.table.types.inference;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.DataTypeLookup;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.types.DataType;

Expand All @@ -32,6 +34,11 @@
@PublicEvolving
public interface CallContext {

/**
* Enables to lookup types in a catalog and resolve RAW types.
*/
DataTypeLookup getDataTypeLookup();

/**
* Returns the function definition that defines the function currently being called.
*/
Expand Down Expand Up @@ -71,6 +78,16 @@ public interface CallContext {
*/
List<DataType> getArgumentDataTypes();

/**
* Returns the inferred output data type of the function call.
*
* <p>It does this by inferring the input argument data type using
* {@link ArgumentTypeStrategy#inferArgumentType(CallContext, int, boolean)} of a wrapping call (if
* available) where this function call is an argument. For example, {@code takes_string(this_function(NULL))}
* would lead to a {@link DataTypes#STRING()} because the wrapping call expects a string argument.
*/
Optional<DataType> getOutputDataType();

/**
* Creates a validation error for exiting the type inference process with a meaningful exception.
*/
Expand Down
Loading

0 comments on commit e72a5a7

Please sign in to comment.