Skip to content

Commit

Permalink
[FLINK-16205][table] Support JSON_OBJECTAGG
Browse files Browse the repository at this point in the history
A few notes regarding the implementation

(1) In order to have deterministic results, we sort the object keys
    alphabetically. However, there is no built-in way in Jackson to do this
    yet for the tree representation (JsonNode), so we need an extra
    conversion step. See
    https://www.mail-archive.com/[email protected]/msg01877.html
    for more details.

(2) We represent (NULL|ABSENT) ON NULL as two separate built-in functions
    for now. This is necessary because otherwise we would have to ship
    the symbol across the network for each record, which leads to various
    problems. Calcite essentially uses the same workaround.

This closes apache#17549.
  • Loading branch information
Airblader authored and twalthr committed Oct 27, 2021
1 parent 2168e1f commit 8106e27
Show file tree
Hide file tree
Showing 19 changed files with 814 additions and 5 deletions.
19 changes: 19 additions & 0 deletions docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,25 @@ json:
)
)
```
- sql: JSON_OBJECTAGG([KEY] key VALUE value [ { NULL | ABSENT } ON NULL ])
table: jsonObjectAgg(JsonOnNull, keyExpression, valueExpression)
description: |
Builds a JSON object string by aggregating key-value expressions into a single JSON object.
The key expression must return a non-nullable character string. Value expressions can be
arbitrary, including other JSON functions. If a value is `NULL`, the `ON NULL` behavior
defines what to do. If omitted, `NULL ON NULL` is assumed by default.
Note that keys must be unique. If a key occurs multiple times, an error will be thrown.
This function is currently not supported in `OVER` windows.
```
-- '{"Apple":2,"Banana":17,"Orange":0}'
SELECT
JSON_OBJECTAGG(KEY product VALUE cnt)
FROM orders
```
- sql: JSON_STRING(value)
table: jsonString(value)
description: |
Expand Down
19 changes: 19 additions & 0 deletions docs/data/sql_functions_zh.yml
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,25 @@ json:
)
)
```
- sql: JSON_OBJECTAGG([KEY] key VALUE value [ { NULL | ABSENT } ON NULL ])
table: jsonObjectAgg(JsonOnNull, keyExpression, valueExpression)
description: |
Builds a JSON object string by aggregating key-value expressions into a single JSON object.
The key expression must return a non-nullable character string. Value expressions can be
arbitrary, including other JSON functions. If a value is `NULL`, the `ON NULL` behavior
defines what to do. If omitted, `NULL ON NULL` is assumed by default.
Note that keys must be unique. If a key occurs multiple times, an error will be thrown.
This function is currently not supported in `OVER` windows.
```
-- '{"Apple":2,"Banana":17,"Orange":0}'
SELECT
JSON_OBJECTAGG(KEY product VALUE cnt)
FROM orders
```
- sql: JSON_STRING(value)
table: jsonString(value)
description: |
Expand Down
25 changes: 24 additions & 1 deletion flink-python/pyflink/table/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
'temporal_overlaps', 'date_format', 'timestamp_diff', 'array', 'row', 'map_',
'row_interval', 'pi', 'e', 'rand', 'rand_integer', 'atan2', 'negative', 'concat',
'concat_ws', 'uuid', 'null_of', 'log', 'with_columns', 'without_columns', 'json_string',
'json_object', 'json_array', 'call', 'call_sql', 'source_watermark']
'json_object', 'json_object_agg', 'json_array', 'call', 'call_sql', 'source_watermark']


def _leaf_op(op_name: str) -> Expression:
Expand Down Expand Up @@ -663,6 +663,29 @@ def json_object(on_null: JsonOnNull = JsonOnNull.NULL, *args) -> Expression:
return _varargs_op("jsonObject", *(on_null._to_j_json_on_null(), *args))


def json_object_agg(on_null: JsonOnNull,
key_expr: Union[str, Expression[str]],
value_expr) -> Expression:
"""
Builds a JSON object string by aggregating key-value expressions into a single JSON object.
The key expression must return a non-nullable character string. Value expressions can be
arbitrary, including other JSON functions. If a value is `NULL`, the `on_null` behavior defines
what to do.
Note that keys must be unique. If a key occurs multiple times, an error will be thrown.
This function is currently not supported in `OVER` windows.
Examples:
::
>>> # '{"Apple":2,"Banana":17,"Orange":0}'
>>> orders.select(json_object_agg(JsonOnNull.NULL, col("product"), col("cnt")))
"""
return _ternary_op("jsonObjectAgg", on_null._to_j_json_on_null(), key_expr, value_expr)


def json_array(on_null: JsonOnNull = JsonOnNull.ABSENT, *args) -> Expression:
"""
Builds a JSON array string from a list of values.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.SqlCallExpression;
import org.apache.flink.table.expressions.TimePointUnit;
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.UserDefinedFunction;
Expand All @@ -45,6 +46,8 @@
import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_ARRAY;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_OBJECT;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_OBJECTAGG_ABSENT_ON_NULL;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_OBJECTAGG_NULL_ON_NULL;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_STRING;

/**
Expand Down Expand Up @@ -629,6 +632,41 @@ public static ApiExpression jsonObject(JsonOnNull onNull, Object... keyValues) {
return apiCall(JSON_OBJECT, arguments);
}

/**
* Builds a JSON object string by aggregating key-value expressions into a single JSON object.
*
* <p>The key expression must return a non-nullable character string. Value expressions can be
* arbitrary, including other JSON functions. If a value is {@code NULL}, the {@link JsonOnNull
* onNull} behavior defines what to do.
*
* <p>Note that keys must be unique. If a key occurs multiple times, an error will be thrown.
*
* <p>This function is currently not supported in {@code OVER} windows.
*
* <p>Examples:
*
* <pre>{@code
* // "{\"Apple\":2,\"Banana\":17,\"Orange\":0}"
* orders.select(jsonObjectAgg(JsonOnNull.NULL, $("product"), $("cnt")))
* }</pre>
*
* @see #jsonObject(JsonOnNull, Object...)
*/
public static ApiExpression jsonObjectAgg(JsonOnNull onNull, Object keyExpr, Object valueExpr) {
final BuiltInFunctionDefinition functionDefinition;
switch (onNull) {
case ABSENT:
functionDefinition = JSON_OBJECTAGG_ABSENT_ON_NULL;
break;
case NULL:
default:
functionDefinition = JSON_OBJECTAGG_NULL_ON_NULL;
break;
}

return apiCall(functionDefinition, keyExpr, valueExpr);
}

/**
* Serializes a value into JSON.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,29 @@ trait ImplicitExpressionConversions {
Expressions.jsonObject(onNull, keyValues: _*)
}

/**
* Builds a JSON object string by aggregating key-value expressions into a single JSON object.
*
* The key expression must return a non-nullable character string. Value expressions can be
* arbitrary, including other JSON functions. If a value is `NULL`, the [[JsonOnNull onNull]]
* behavior defines what to do.
*
* Note that keys must be unique. If a key occurs multiple times, an error will be thrown.
*
* This function is currently not supported in `OVER` windows.
*
* Examples:
* {{{
* // "{\"Apple\":2,\"Banana\":17,\"Orange\":0}"
* orders.select(jsonObjectAgg(JsonOnNull.NULL, $("product"), $("cnt")))
* }}}
*
* @see #jsonObject
*/
def jsonObjectAgg(onNull: JsonOnNull, keyExpr: Expression, valueExpr: Expression): Expression = {
Expressions.jsonObjectAgg(onNull, keyExpr, valueExpr)
}

/**
* Builds a JSON array string from a list of values.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1589,6 +1589,26 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
.runtimeDeferred()
.build();

public static final BuiltInFunctionDefinition JSON_OBJECTAGG_NULL_ON_NULL =
BuiltInFunctionDefinition.newBuilder()
.name("JSON_OBJECTAGG_NULL_ON_NULL")
.kind(AGGREGATE)
.inputTypeStrategy(
sequence(logical(LogicalTypeFamily.CHARACTER_STRING), JSON_ARGUMENT))
.outputTypeStrategy(explicit(DataTypes.STRING().notNull()))
.runtimeDeferred()
.build();

public static final BuiltInFunctionDefinition JSON_OBJECTAGG_ABSENT_ON_NULL =
BuiltInFunctionDefinition.newBuilder()
.name("JSON_OBJECTAGG_ABSENT_ON_NULL")
.kind(AGGREGATE)
.inputTypeStrategy(
sequence(logical(LogicalTypeFamily.CHARACTER_STRING), JSON_ARGUMENT))
.outputTypeStrategy(explicit(DataTypes.STRING().notNull()))
.runtimeDeferred()
.build();

public static final BuiltInFunctionDefinition JSON_ARRAY =
BuiltInFunctionDefinition.newBuilder()
.name("JSON_ARRAY")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ public class SqlAggFunctionVisitor extends ExpressionDefaultVisitor<SqlAggFuncti
BuiltInFunctionDefinitions.VAR_SAMP, FlinkSqlOperatorTable.VAR_SAMP);
AGG_DEF_SQL_OPERATOR_MAPPING.put(
BuiltInFunctionDefinitions.COLLECT, FlinkSqlOperatorTable.COLLECT);
AGG_DEF_SQL_OPERATOR_MAPPING.put(
BuiltInFunctionDefinitions.JSON_OBJECTAGG_NULL_ON_NULL,
FlinkSqlOperatorTable.JSON_OBJECTAGG_NULL_ON_NULL);
AGG_DEF_SQL_OPERATOR_MAPPING.put(
BuiltInFunctionDefinitions.JSON_OBJECTAGG_ABSENT_ON_NULL,
FlinkSqlOperatorTable.JSON_OBJECTAGG_ABSENT_ON_NULL);
}

private final RelBuilder relBuilder;
Expand Down
Loading

0 comments on commit 8106e27

Please sign in to comment.