Skip to content

Commit

Permalink
[FLINK-12711][table] Separate function implementation and definition
Browse files Browse the repository at this point in the history
This closes apache#8661.
  • Loading branch information
twalthr committed Jun 18, 2019
1 parent 58b8cb0 commit 86ebb26
Show file tree
Hide file tree
Showing 32 changed files with 1,300 additions and 454 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.AggregateFunctionDefinition;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.ScalarFunctionDefinition;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableAggregateFunctionDefinition;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.TableFunctionDefinition;
import org.apache.flink.table.functions.UserDefinedAggregateFunction;
Expand All @@ -33,6 +37,7 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

/**
* Simple function catalog to store {@link FunctionDefinition}s in memory.
Expand Down Expand Up @@ -70,7 +75,10 @@ public <T> void registerTableFunction(

registerFunction(
name,
new TableFunctionDefinition(name, function, resultType)
new TableFunctionDefinition(
name,
function,
resultType)
);
}

Expand All @@ -84,14 +92,33 @@ public <T, ACC> void registerAggregateFunction(
// check if class could be instantiated
UserFunctionsTypeHelper.validateInstantiation(function.getClass());

final FunctionDefinition definition;
if (function instanceof AggregateFunction) {
definition = new AggregateFunctionDefinition(
name,
(AggregateFunction<?, ?>) function,
resultType,
accType);
} else if (function instanceof TableAggregateFunction) {
definition = new TableAggregateFunctionDefinition(
name,
(TableAggregateFunction<?, ?>) function,
resultType,
accType);
} else {
throw new TableException("Unknown function class: " + function.getClass());
}

registerFunction(
name,
new AggregateFunctionDefinition(name, function, resultType, accType)
definition
);
}

public String[] getUserDefinedFunctions() {
return userFunctions.values().stream().map(FunctionDefinition::getName).toArray(String[]::new);
return userFunctions.values().stream()
.map(FunctionDefinition::toString)
.toArray(String[]::new);
}

@Override
Expand All @@ -104,7 +131,8 @@ public Optional<FunctionLookup.Result> lookupFunction(String name) {
foundDefinition = BuiltInFunctionDefinitions.getDefinitions()
.stream()
.filter(f -> normalizeName(name).equals(normalizeName(f.getName())))
.findFirst();
.findFirst()
.map(Function.identity());
}

return foundDefinition.map(definition -> new FunctionLookup.Result(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
import static org.apache.flink.table.expressions.ExpressionUtils.extractValue;
import static org.apache.flink.table.expressions.ExpressionUtils.isFunctionOfType;
import static org.apache.flink.table.expressions.ExpressionUtils.isFunctionOfKind;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AS;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.WINDOW_PROPERTIES;
import static org.apache.flink.table.functions.FunctionDefinition.Type.AGGREGATE_FUNCTION;
import static org.apache.flink.table.functions.FunctionKind.AGGREGATE;

/**
* Utility methods for transforming {@link Expression} to use them in {@link QueryOperation}s.
Expand Down Expand Up @@ -158,7 +158,7 @@ public Void visitLookupCall(LookupCallExpression unresolvedCall) {
@Override
public Void visitCall(CallExpression call) {
FunctionDefinition functionDefinition = call.getFunctionDefinition();
if (isFunctionOfType(call, AGGREGATE_FUNCTION)) {
if (isFunctionOfKind(call, AGGREGATE)) {
aggregates.computeIfAbsent(call, expr -> "EXPR$" + uniqueId++);
} else if (WINDOW_PROPERTIES.contains(functionDefinition)) {
properties.computeIfAbsent(call, expr -> "EXPR$" + uniqueId++);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,6 @@ public int hashCode() {
@Override
public String toString() {
final List<String> argList = args.stream().map(Object::toString).collect(Collectors.toList());
return functionDefinition.getName() + "(" + String.join(", ", argList) + ")";
return functionDefinition.toString() + "(" + String.join(", ", argList) + ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.table.expressions;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.FunctionKind;

import java.util.Optional;

Expand Down Expand Up @@ -50,12 +50,12 @@ public static <V> Optional<V> extractValue(Expression expr, Class<V> targetClass
* Checks if the expression is a function call of given type.
*
* @param expr expression to check
* @param type expected type of function
* @param kind expected type of function
* @return true if the expression is function call of given type, false otherwise
*/
public static boolean isFunctionOfType(Expression expr, FunctionDefinition.Type type) {
public static boolean isFunctionOfKind(Expression expr, FunctionKind kind) {
return expr instanceof CallExpression &&
((CallExpression) expr).getFunctionDefinition().getType() == type;
((CallExpression) expr).getFunctionDefinition().getKind() == kind;
}

private ExpressionUtils() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@

import org.apache.flink.annotation.PublicEvolving;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

/**
* Base class for user-defined aggregates.
*
Expand Down Expand Up @@ -97,6 +101,9 @@
* }
* </pre>
*
* <p>If this aggregate function can only be applied in an OVER window, this can be declared using the
* requirement {@link FunctionRequirement#OVER_WINDOW_ONLY} in {@link #getRequirements()}.
*
* @param <T> the type of the aggregation result
* @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the
* aggregated values which are needed to compute an aggregation result.
Expand Down Expand Up @@ -124,8 +131,25 @@ public abstract class AggregateFunction<T, ACC> extends UserDefinedAggregateFunc
*
* @return <code>true</code> if the {@link AggregateFunction} requires an OVER window,
* <code>false</code> otherwise.
*
* @deprecated Use {@link #getRequirements()} instead.
*/
@Deprecated
public boolean requiresOver() {
return false;
}

@Override
public final FunctionKind getKind() {
return FunctionKind.AGGREGATE;
}

@Override
public Set<FunctionRequirement> getRequirements() {
final HashSet<FunctionRequirement> requirements = new HashSet<>();
if (requiresOver()) {
requirements.add(FunctionRequirement.OVER_WINDOW_ONLY);
}
return Collections.unmodifiableSet(requirements);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,34 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Preconditions;

import static org.apache.flink.table.functions.FunctionDefinition.Type.AGGREGATE_FUNCTION;
import java.util.Objects;
import java.util.Set;

/**
* The function definition of an user-defined aggregate function.
*
* <p>This class can be dropped once we introduce a new type inference.
*/
@PublicEvolving
public final class AggregateFunctionDefinition extends FunctionDefinition {
public final class AggregateFunctionDefinition implements FunctionDefinition {

private final UserDefinedAggregateFunction<?, ?> aggregateFunction;
private final String name;
private final AggregateFunction<?, ?> aggregateFunction;
private final TypeInformation<?> resultTypeInfo;
private final TypeInformation<?> accumulatorTypeInfo;

public AggregateFunctionDefinition(
String name,
UserDefinedAggregateFunction<?, ?> aggregateFunction,
AggregateFunction<?, ?> aggregateFunction,
TypeInformation<?> resultTypeInfo,
TypeInformation<?> accTypeInfo) {
super(name, AGGREGATE_FUNCTION);
this.name = Preconditions.checkNotNull(name);
this.aggregateFunction = Preconditions.checkNotNull(aggregateFunction);
this.resultTypeInfo = Preconditions.checkNotNull(resultTypeInfo);
this.accumulatorTypeInfo = Preconditions.checkNotNull(accTypeInfo);
}

public UserDefinedAggregateFunction<?, ?> getAggregateFunction() {
public AggregateFunction<?, ?> getAggregateFunction() {
return aggregateFunction;
}

Expand All @@ -56,4 +60,41 @@ public TypeInformation<?> getResultTypeInfo() {
public TypeInformation<?> getAccumulatorTypeInfo() {
return accumulatorTypeInfo;
}

@Override
public FunctionKind getKind() {
return FunctionKind.AGGREGATE;
}

@Override
public Set<FunctionRequirement> getRequirements() {
return aggregateFunction.getRequirements();
}

@Override
public boolean isDeterministic() {
return aggregateFunction.isDeterministic();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AggregateFunctionDefinition that = (AggregateFunctionDefinition) o;
return name.equals(that.name);
}

@Override
public int hashCode() {
return Objects.hash(name);
}

@Override
public String toString() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,9 @@ public abstract class AsyncTableFunction<T> extends UserDefinedFunction {
public TypeInformation<T> getResultType() {
return null;
}

@Override
public final FunctionKind getKind() {
return FunctionKind.ASYNC_TABLE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.functions;

import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;

/**
* Definition of a built-in function. It enables unique identification across different
* modules by reference equality.
*
* <p>Compared to regular {@link FunctionDefinition}, built-in functions have a default name.
*
* <p>Equality is defined by reference equality.
*/
@Internal
public final class BuiltInFunctionDefinition implements FunctionDefinition {

private final String name;

private final FunctionKind kind;

private BuiltInFunctionDefinition(
String name,
FunctionKind kind) {
this.name = Preconditions.checkNotNull(name, "Name must not be null.");
this.kind = Preconditions.checkNotNull(kind, "Kind must not be null.");
}

public String getName() {
return name;
}

@Override
public FunctionKind getKind() {
return kind;
}

@Override
public String toString() {
return name;
}

// --------------------------------------------------------------------------------------------

/**
* Builder for fluent definition of built-in functions.
*/
public static class Builder {

private String name;

private FunctionKind kind;

public Builder() {
// default constructor to allow a fluent definition
}

public Builder name(String name) {
this.name = name;
return this;
}

public Builder kind(FunctionKind kind) {
this.kind = kind;
return this;
}

public BuiltInFunctionDefinition build() {
return new BuiltInFunctionDefinition(name, kind);
}
}
}
Loading

0 comments on commit 86ebb26

Please sign in to comment.