Skip to content

Commit

Permalink
[FLINK-36789][table-common] Update FunctionHint and ArgumentHint with…
Browse files Browse the repository at this point in the history
… traits
  • Loading branch information
twalthr authored Nov 26, 2024
1 parent 59dc491 commit 0580924
Show file tree
Hide file tree
Showing 14 changed files with 395 additions and 167 deletions.
24 changes: 16 additions & 8 deletions docs/content.zh/docs/dev/table/functions/udfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -661,8 +661,10 @@ public static class NamedParameterClass extends ScalarFunction {

// 使用 @ArgumentHint 注解指定参数的名称,参数类型,以及该参数是否是必需的参数
@FunctionHint(
argument = {@ArgumentHint(name = "param1", isOptional = false, type = @DataTypeHint("STRING")),
@ArgumentHint(name = "param2", isOptional = true, type = @DataTypeHint("INTEGER"))}
arguments = {
@ArgumentHint(name = "param1", isOptional = false, type = @DataTypeHint("STRING")),
@ArgumentHint(name = "param2", isOptional = true, type = @DataTypeHint("INTEGER"))
}
)
public String eval(String s1, Integer s2) {
return s1 + ", " + s2;
Expand All @@ -679,8 +681,10 @@ class NamedParameterClass extends ScalarFunction {

// 使用 @ArgumentHint 注解指定参数的名称,参数类型,以及是否是必需的参数
@FunctionHint(
argument = Array(new ArgumentHint(name = "param1", isOptional = false, `type` = new DataTypeHint("STRING")),
new ArgumentHint(name = "param2", isOptional = true, `type` = new DataTypeHint("INTEGER")))
arguments = Array(
new ArgumentHint(name = "param1", isOptional = false, `type` = new DataTypeHint("STRING")),
new ArgumentHint(name = "param2", isOptional = true, `type` = new DataTypeHint("INTEGER"))
)
)
def eval(s1: String, s2: Int): String = {
s1 + ", " + s2
Expand All @@ -700,8 +704,10 @@ import org.apache.flink.table.functions.ScalarFunction;

// 使用 @ArgumentHint 注解指定参数的名称,参数类型,以及是否是必需的参数
@FunctionHint(
argument = {@ArgumentHint(name = "param1", isOptional = false, type = @DataTypeHint("STRING")),
@ArgumentHint(name = "param2", isOptional = true, type = @DataTypeHint("INTEGER"))}
arguments = {
@ArgumentHint(name = "param1", isOptional = false, type = @DataTypeHint("STRING")),
@ArgumentHint(name = "param2", isOptional = true, type = @DataTypeHint("INTEGER"))
}
)
public static class NamedParameterClass extends ScalarFunction {

Expand All @@ -718,8 +724,10 @@ import org.apache.flink.table.functions.ScalarFunction;

// 使用 @ArgumentHint 注解指定参数的名称,参数类型,以及是否是必需的参数
@FunctionHint(
argument = Array(new ArgumentHint(name = "param1", isOptional = false, `type` = new DataTypeHint("STRING")),
new ArgumentHint(name = "param2", isOptional = true, `type` = new DataTypeHint("INTEGER")))
arguments = Array(
new ArgumentHint(name = "param1", isOptional = false, `type` = new DataTypeHint("STRING")),
new ArgumentHint(name = "param2", isOptional = true, `type` = new DataTypeHint("INTEGER"))
)
)
class NamedParameterClass extends ScalarFunction {

Expand Down
33 changes: 19 additions & 14 deletions docs/content.zh/docs/dev/table/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -397,13 +397,15 @@ import org.apache.flink.types.Row;

public static class NamedParameterProcedure extends Procedure {

@ProcedureHint(
argument = {@ArgumentHint(name = "param1", type = @DataTypeHint("INTEGER"), isOptional = false),
@ArgumentHint(name = "param2", type = @DataTypeHint("INTEGER"), isOptional = true)}
)
public @DataTypeHint("INT") Integer[] call(ProcedureContext context, Integer a, Integer b) {
return new Integer[] {a + (b == null ? 0 : b)};
}
@ProcedureHint(
arguments = {
@ArgumentHint(name = "param1", type = @DataTypeHint("INTEGER"), isOptional = false),
@ArgumentHint(name = "param2", type = @DataTypeHint("INTEGER"), isOptional = true)
}
)
public @DataTypeHint("INT") Integer[] call(ProcedureContext context, Integer a, Integer b) {
return new Integer[] {a + (b == null ? 0 : b)};
}
}
```
{{< /tab >}}
Expand All @@ -418,15 +420,16 @@ import org.apache.flink.types.Row
import scala.annotation.varargs

class NamedParameterProcedure extends Procedure {

@ProcedureHint(
argument = Array(
arguments = Array(
new ArgumentHint(name = "param1", `type` = new DataTypeHint("INTEGER"), isOptional = false),
new ArgumentHint(name = "param2", `type` = new DataTypeHint("INTEGER"), isOptional = true)
)
)
def call(context: ProcedureContext, a: Integer, b: Integer): Array[Integer] = {
Array(a + (if (b == null) 0 else b))
}
def call(context: ProcedureContext, a: Integer, b: Integer): Array[Integer] = {
Array(a + (if (b == null) 0 else b))
}
}
```
{{< /tab >}}
Expand All @@ -444,8 +447,10 @@ import org.apache.flink.table.procedures.Procedure;
import org.apache.flink.types.Row;

@ProcedureHint(
argument = {@ArgumentHint(name = "param1", type = @DataTypeHint("INTEGER"), isOptional = false),
@ArgumentHint(name = "param2", type = @DataTypeHint("INTEGER"), isOptional = true)}
arguments = {
@ArgumentHint(name = "param1", type = @DataTypeHint("INTEGER"), isOptional = false),
@ArgumentHint(name = "param2", type = @DataTypeHint("INTEGER"), isOptional = true)
}
)
public static class NamedParameterProcedure extends Procedure {

Expand All @@ -466,7 +471,7 @@ import org.apache.flink.types.Row
import scala.annotation.varargs

@ProcedureHint(
argument = Array(
arguments = Array(
new ArgumentHint(name = "param1", `type` = new DataTypeHint("INTEGER"), isOptional = false),
new ArgumentHint(name = "param2", `type` = new DataTypeHint("INTEGER"), isOptional = true)
)
Expand Down
24 changes: 16 additions & 8 deletions docs/content/docs/dev/table/functions/udfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -671,8 +671,10 @@ public static class NamedParameterClass extends ScalarFunction {

// Use the @ArgumentHint annotation to specify the name, type, and whether a parameter is required.
@FunctionHint(
argument = {@ArgumentHint(name = "param1", isOptional = false, type = @DataTypeHint("STRING")),
@ArgumentHint(name = "param2", isOptional = true, type = @DataTypeHint("INTEGER"))}
arguments = {
@ArgumentHint(name = "param1", isOptional = false, type = @DataTypeHint("STRING")),
@ArgumentHint(name = "param2", isOptional = true, type = @DataTypeHint("INTEGER"))
}
)
public String eval(String s1, Integer s2) {
return s1 + ", " + s2;
Expand All @@ -689,8 +691,10 @@ class NamedParameterClass extends ScalarFunction {

// Use the @ArgumentHint annotation to specify the name, type, and whether a parameter is required.
@FunctionHint(
argument = Array(new ArgumentHint(name = "param1", isOptional = false, `type` = new DataTypeHint("STRING")),
new ArgumentHint(name = "param2", isOptional = true, `type` = new DataTypeHint("INTEGER")))
arguments = Array(
new ArgumentHint(name = "param1", isOptional = false, `type` = new DataTypeHint("STRING")),
new ArgumentHint(name = "param2", isOptional = true, `type` = new DataTypeHint("INTEGER"))
)
)
def eval(s1: String, s2: Int): String = {
s1 + ", " + s2
Expand All @@ -710,8 +714,10 @@ import org.apache.flink.table.functions.ScalarFunction;

// Use the @ArgumentHint annotation to specify the name, type, and whether a parameter is required.
@FunctionHint(
argument = {@ArgumentHint(name = "param1", isOptional = false, type = @DataTypeHint("STRING")),
@ArgumentHint(name = "param2", isOptional = true, type = @DataTypeHint("INTEGER"))}
arguments = {
@ArgumentHint(name = "param1", isOptional = false, type = @DataTypeHint("STRING")),
@ArgumentHint(name = "param2", isOptional = true, type = @DataTypeHint("INTEGER"))
}
)
public static class NamedParameterClass extends ScalarFunction {

Expand All @@ -728,8 +734,10 @@ import org.apache.flink.table.functions.ScalarFunction;

// Use the @ArgumentHint annotation to specify the name, type, and whether a parameter is required.
@FunctionHint(
argument = Array(new ArgumentHint(name = "param1", isOptional = false, `type` = new DataTypeHint("STRING")),
new ArgumentHint(name = "param2", isOptional = true, `type` = new DataTypeHint("INTEGER")))
arguments = Array(
new ArgumentHint(name = "param1", isOptional = false, `type` = new DataTypeHint("STRING")),
new ArgumentHint(name = "param2", isOptional = true, `type` = new DataTypeHint("INTEGER"))
)
)
class NamedParameterClass extends ScalarFunction {

Expand Down
35 changes: 20 additions & 15 deletions docs/content/docs/dev/table/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -395,14 +395,16 @@ import org.apache.flink.table.procedures.Procedure;
import org.apache.flink.types.Row;

public static class NamedParameterProcedure extends Procedure {

@ProcedureHint(
argument = {@ArgumentHint(name = "param1", type = @DataTypeHint("INTEGER"), isOptional = false),
@ArgumentHint(name = "param2", type = @DataTypeHint("INTEGER"), isOptional = true)}
)
public @DataTypeHint("INT") Integer[] call(ProcedureContext context, Integer a, Integer b) {
return new Integer[] {a + (b == null ? 0 : b)};
}

@ProcedureHint(
arguments = {
@ArgumentHint(name = "param1", type = @DataTypeHint("INTEGER"), isOptional = false),
@ArgumentHint(name = "param2", type = @DataTypeHint("INTEGER"), isOptional = true)
}
)
public @DataTypeHint("INT") Integer[] call(ProcedureContext context, Integer a, Integer b) {
return new Integer[] {a + (b == null ? 0 : b)};
}
}
```
{{< /tab >}}
Expand All @@ -417,15 +419,16 @@ import org.apache.flink.types.Row
import scala.annotation.varargs

class NamedParameterProcedure extends Procedure {

@ProcedureHint(
argument = Array(
arguments = Array(
new ArgumentHint(name = "param1", `type` = new DataTypeHint("INTEGER"), isOptional = false),
new ArgumentHint(name = "param2", `type` = new DataTypeHint("INTEGER"), isOptional = true)
)
)
def call(context: ProcedureContext, a: Integer, b: Integer): Array[Integer] = {
Array(a + (if (b == null) 0 else b))
}
def call(context: ProcedureContext, a: Integer, b: Integer): Array[Integer] = {
Array(a + (if (b == null) 0 else b))
}
}
```
{{< /tab >}}
Expand All @@ -443,8 +446,10 @@ import org.apache.flink.table.procedures.Procedure;
import org.apache.flink.types.Row;

@ProcedureHint(
argument = {@ArgumentHint(name = "param1", type = @DataTypeHint("INTEGER"), isOptional = false),
@ArgumentHint(name = "param2", type = @DataTypeHint("INTEGER"), isOptional = true)}
arguments = {
@ArgumentHint(name = "param1", type = @DataTypeHint("INTEGER"), isOptional = false),
@ArgumentHint(name = "param2", type = @DataTypeHint("INTEGER"), isOptional = true)
}
)
public static class NamedParameterProcedure extends Procedure {

Expand All @@ -465,7 +470,7 @@ import org.apache.flink.types.Row
import scala.annotation.varargs

@ProcedureHint(
argument = Array(
arguments = Array(
new ArgumentHint(name = "param1", `type` = new DataTypeHint("INTEGER"), isOptional = false),
new ArgumentHint(name = "param2", `type` = new DataTypeHint("INTEGER"), isOptional = true)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,23 @@
* <p>An {@code ArgumentHint} can be used to provide hints about the name, optionality, and data
* type of argument.
*
* <p>It combines the functionality of {@link FunctionHint#argumentNames()} and {@link DataTypeHint}
* annotations to conveniently group argument-related information together in function declarations.
* <p>{@code @ArgumentHint(name = "in1", type = @DataTypeHint("STRING"), isOptional = false)} is a
* scalar argument with the data type STRING, named "in1", and cannot be omitted when calling.
*
* <p>{@code @ArgumentHint(name = "in1", type = @DataTypeHint("STRING"), isOptional = false} is an
* argument with the type String, named in1, and cannot be omitted when calling.
* @see FunctionHint
*/
@PublicEvolving
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER})
public @interface ArgumentHint {

/**
* The kind of the argument.
*
* <p>Only applies to {@code ProcessTableFunction}s (PTFs). Others can only take scalar values.
*/
ArgumentTrait[] value() default {ArgumentTrait.SCALAR};

/**
* The name of the argument.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.annotation;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.types.inference.StaticArgumentTrait;

import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Declares traits for {@link ArgumentHint}. They enable basic validation by the framework.
*
* <p>Some traits have dependencies to other traits, which is why this enum reflects a hierarchy in
* which {@link #SCALAR}, {@link #TABLE_AS_ROW}, and {@link #TABLE_AS_SET} are the top-level roots.
*/
@PublicEvolving
public enum ArgumentTrait {

/**
* An argument that accepts a scalar value. For example: f(1), f(true), f('Some string').
*
* <p>It's the default if no {@link ArgumentHint} is provided.
*/
SCALAR(StaticArgumentTrait.SCALAR),

/**
* An argument that accepts a table "as row" (i.e. with row semantics). This trait only applies
* to {@code ProcessTableFunction} (PTF).
*
* <p>For scalability, input tables are distributed into virtual processors. Each virtual
* processor executes a PTF instance and has access only to a share of the entire table. The
* argument declaration decides about the size of the share and co-location of data.
*
* <p>A table with row semantics assumes that there is no correlation between rows and each row
* can be processed independently. The framework is free in how to distribute rows among virtual
* processors and each virtual processor has access only to the currently processed row.
*/
TABLE_AS_ROW(StaticArgumentTrait.TABLE_AS_ROW),

/**
* An argument that accepts a table "as set" (i.e. with set semantics). This trait only applies
* to {@code ProcessTableFunction} (PTF).
*
* <p>For scalability, input tables are distributed into virtual processors. Each virtual
* processor executes a PTF instance and has access only to a share of the entire table. The
* argument declaration decides about the size of the share and co-location of data.
*
* <p>A table with set semantics assumes that there is a correlation between rows. When calling
* the function, the PARTITION BY clause defines the columns for correlation. The framework
* ensures that all rows belonging to same set are co-located. A PTF instance is able to access
* all rows belonging to the same set. In other words: The virtual processor is scoped under a
* key context.
*/
TABLE_AS_SET(StaticArgumentTrait.TABLE_AS_SET),

/**
* Defines that a PARTITION BY clause is optional for {@link #TABLE_AS_SET}. By default, it is
* mandatory for improving the parallel execution by distributing the table by key.
*/
OPTIONAL_PARTITION_BY(StaticArgumentTrait.OPTIONAL_PARTITION_BY, TABLE_AS_SET);

private final StaticArgumentTrait staticTrait;
private final Set<ArgumentTrait> requirements;

ArgumentTrait(StaticArgumentTrait staticTrait, ArgumentTrait... requirements) {
this.staticTrait = staticTrait;
this.requirements = Arrays.stream(requirements).collect(Collectors.toSet());
}

public Set<ArgumentTrait> getRequirements() {
return requirements;
}

public StaticArgumentTrait toStaticTrait() {
return staticTrait;
}
}
Loading

0 comments on commit 0580924

Please sign in to comment.