Skip to content

Commit

Permalink
[FLINK-24387][table] Support JSON_STRING()
Browse files Browse the repository at this point in the history
This closes apache#17481.
  • Loading branch information
Airblader authored and twalthr committed Oct 19, 2021
1 parent ab7c38a commit 6bb15f3
Show file tree
Hide file tree
Showing 10 changed files with 325 additions and 33 deletions.
21 changes: 21 additions & 0 deletions docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,27 @@ json:
)
)
```
- sql: JSON_STRING(value)
table: jsonString(value)
description: |
Serializes a value into JSON.
This function returns a JSON string containing the serialized value. If the value is `NULL`,
the function returns `NULL`.
```
-- NULL
JSON_STRING(CAST(NULL AS INT))
-- '1'
JSON_STRING(1)
-- 'true'
JSON_STRING(TRUE)
-- '"Hello, World!"'
JSON_STRING('Hello, World!')
-- '[1,2]'
JSON_STRING(ARRAY[1, 2])
```
- sql: JSON_ARRAY([value]* [ { NULL | ABSENT } ON NULL ])
table: jsonArray(JsonOnNull, values...)
description: |
Expand Down
21 changes: 21 additions & 0 deletions docs/data/sql_functions_zh.yml
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,27 @@ json:
)
)
```
- sql: JSON_STRING(value)
table: jsonString(value)
description: |
Serializes a value into JSON.
This function returns a JSON string containing the serialized value. If the value is `NULL`,
the function returns `NULL`.
```
-- NULL
JSON_STRING(CAST(NULL AS INT))
-- '1'
JSON_STRING(1)
-- 'true'
JSON_STRING(TRUE)
-- '"Hello, World!"'
JSON_STRING('Hello, World!')
-- '[1,2]'
JSON_STRING(ARRAY[1, 2])
```
- sql: JSON_ARRAY([value]* [ { NULL | ABSENT } ON NULL ])
table: jsonArray(JsonOnNull, values...)
description: |
Expand Down
24 changes: 22 additions & 2 deletions flink-python/pyflink/table/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
'current_timestamp', 'current_watermark', 'local_time', 'local_timestamp',
'temporal_overlaps', 'date_format', 'timestamp_diff', 'array', 'row', 'map_',
'row_interval', 'pi', 'e', 'rand', 'rand_integer', 'atan2', 'negative', 'concat',
'concat_ws', 'uuid', 'null_of', 'log', 'with_columns', 'without_columns', 'json_object',
'json_array', 'call', 'call_sql', 'source_watermark']
'concat_ws', 'uuid', 'null_of', 'log', 'with_columns', 'without_columns', 'json_string',
'json_object', 'json_array', 'call', 'call_sql', 'source_watermark']


def _leaf_op(op_name: str) -> Expression:
Expand Down Expand Up @@ -610,6 +610,26 @@ def without_columns(head, *tails) -> Expression:
return _binary_op("withoutColumns", head, tails)


def json_string(value) -> Expression:
"""
Serializes a value into JSON.
This function returns a JSON string containing the serialized value. If the value is `NULL`,
the function returns `NULL`.
Examples:
::
>>> json_string(null_of(DataTypes.INT())) # None
>>> json_string(1) # '1'
>>> json_string(True) # 'true'
>>> json_string("Hello, World!") # '"Hello, World!"'
>>> json_string([1, 2]) # '[1,2]'
"""
return _unary_op("jsonString", value)


def json_object(on_null: JsonOnNull = JsonOnNull.NULL, *args) -> Expression:
"""
Builds a JSON object string from a list of key-value pairs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_ARRAY;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_OBJECT;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_STRING;

/**
* Entry point of the Table API Expression DSL such as: {@code $("myField").plus(10).abs()}
Expand Down Expand Up @@ -628,6 +629,28 @@ public static ApiExpression jsonObject(JsonOnNull onNull, Object... keyValues) {
return apiCall(JSON_OBJECT, arguments);
}

/**
* Serializes a value into JSON.
*
* <p>This function returns a JSON string containing the serialized value. If the value is
* {@code null}, the function returns {@code null}.
*
* <p>Examples:
*
* <pre>{@code
* // null
* jsonString(nullOf(DataTypes.INT()))
*
* jsonString(1) // "1"
* jsonString(true) // "true"
* jsonString("Hello, World!") // "\"Hello, World!\""
* jsonString(Arrays.asList(1, 2)) // "[1,2]"
* }</pre>
*/
public static ApiExpression jsonString(Object value) {
return apiCallAtLeastOneArgument(JSON_STRING, value);
}

/**
* Builds a JSON array string from a list of values.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,27 @@ trait ImplicitExpressionConversions {
*/
def not(expression: Expression): Expression = Expressions.not(expression)

/**
* Serializes a value into JSON.
*
* This function returns a JSON string containing the serialized value. If the value is `null`,
* the function returns `null`.
*
* Examples:
* {{{
* // null
* jsonString(nullOf(DataTypes.INT()))
*
* jsonString(1) // "1"
* jsonString(true) // "true"
* jsonString("Hello, World!") // "\"Hello, World!\""
* jsonString(Arrays.asList(1, 2)) // "[1,2]"
* }}}
*/
def jsonString(value: Expression): Expression = {
Expressions.jsonString(value)
}

/**
* Builds a JSON object string from a list of key-value pairs.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.JsonExistsOnError;
import org.apache.flink.table.api.JsonOnNull;
import org.apache.flink.table.api.JsonQueryOnEmptyOrError;
import org.apache.flink.table.api.JsonQueryWrapper;
import org.apache.flink.table.api.JsonType;
Expand Down Expand Up @@ -73,6 +74,7 @@
import static org.apache.flink.table.types.inference.TypeStrategies.nullableIfAllArgs;
import static org.apache.flink.table.types.inference.TypeStrategies.nullableIfArgs;
import static org.apache.flink.table.types.inference.TypeStrategies.varyingString;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.JSON_ARGUMENT;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TWO_EQUALS_COMPARABLE;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TWO_FULLY_COMPARABLE;

Expand Down Expand Up @@ -1569,6 +1571,15 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
.runtimeDeferred()
.build();

public static final BuiltInFunctionDefinition JSON_STRING =
BuiltInFunctionDefinition.newBuilder()
.name("JSON_STRING")
.kind(SCALAR)
.inputTypeStrategy(sequence(JSON_ARGUMENT))
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING())))
.runtimeProvided()
.build();

public static final BuiltInFunctionDefinition JSON_OBJECT =
BuiltInFunctionDefinition.newBuilder()
.name("JSON_OBJECT")
Expand All @@ -1582,7 +1593,10 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
BuiltInFunctionDefinition.newBuilder()
.name("JSON_ARRAY")
.kind(SCALAR)
.inputTypeStrategy(SpecificInputTypeStrategies.JSON_ARRAY)
.inputTypeStrategy(
InputTypeStrategies.varyingSequence(
symbol(JsonOnNull.class),
SpecificInputTypeStrategies.JSON_ARGUMENT))
.outputTypeStrategy(explicit(DataTypes.STRING().notNull()))
.runtimeDeferred()
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.JsonOnNull;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.inference.InputTypeStrategies;
import org.apache.flink.table.types.inference.InputTypeStrategy;
Expand All @@ -36,7 +37,6 @@
import static org.apache.flink.table.types.inference.InputTypeStrategies.or;
import static org.apache.flink.table.types.inference.InputTypeStrategies.repeatingSequence;
import static org.apache.flink.table.types.inference.InputTypeStrategies.symbol;
import static org.apache.flink.table.types.inference.InputTypeStrategies.varyingSequence;

/**
* Entry point for specific input type strategies not covered in {@link InputTypeStrategies}.
Expand All @@ -57,6 +57,16 @@ public final class SpecificInputTypeStrategies {
public static final InputTypeStrategy CURRENT_WATERMARK =
new CurrentWatermarkInputTypeStrategy();

/** Argument type representing all types supported in a JSON context. */
public static final ArgumentTypeStrategy JSON_ARGUMENT =
or(
logical(LogicalTypeFamily.CHARACTER_STRING),
logical(LogicalTypeFamily.BINARY_STRING),
logical(LogicalTypeFamily.TIMESTAMP),
logical(LogicalTypeFamily.CONSTRUCTED),
logical(LogicalTypeRoot.BOOLEAN),
logical(LogicalTypeFamily.NUMERIC));

/**
* Input strategy for {@link BuiltInFunctionDefinitions#JSON_OBJECT}.
*
Expand All @@ -69,25 +79,7 @@ public final class SpecificInputTypeStrategies {
.finishWithVarying(
repeatingSequence(
and(logical(LogicalTypeFamily.CHARACTER_STRING), LITERAL),
or(
logical(LogicalTypeFamily.CHARACTER_STRING),
logical(LogicalTypeFamily.BINARY_STRING),
logical(LogicalTypeFamily.TIMESTAMP),
logical(LogicalTypeFamily.CONSTRUCTED),
logical(LogicalTypeRoot.BOOLEAN),
logical(LogicalTypeFamily.NUMERIC))));

/** Input strategy for {@link BuiltInFunctionDefinitions#JSON_ARRAY}. */
public static final InputTypeStrategy JSON_ARRAY =
varyingSequence(
symbol(JsonOnNull.class),
or(
logical(LogicalTypeFamily.CHARACTER_STRING),
logical(LogicalTypeFamily.BINARY_STRING),
logical(LogicalTypeFamily.TIMESTAMP),
logical(LogicalTypeFamily.CONSTRUCTED),
logical(LogicalTypeRoot.BOOLEAN),
logical(LogicalTypeFamily.NUMERIC)));
JSON_ARGUMENT));

// --------------------------------------------------------------------------------------------
// Strategies composed of other strategies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.flink.table.data.binary.BinaryRowData
import org.apache.flink.table.data.util.DataFormatConverters.{DataFormatConverter, getConverterForDataType}
import org.apache.flink.table.functions.BuiltInFunctionDefinitions
import org.apache.flink.table.planner.calcite.{FlinkRexBuilder, FlinkTypeFactory, RexDistinctKeyVariable, RexFieldVariable}
import org.apache.flink.table.planner.codegen.CodeGenUtils.{requireTemporal, requireTimeInterval, _}
import org.apache.flink.table.planner.codegen.CodeGenUtils._
import org.apache.flink.table.planner.codegen.GenerateUtils._
import org.apache.flink.table.planner.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
import org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens._
Expand All @@ -44,10 +44,9 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.{getFieldCou
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo

import org.apache.calcite.rex._
import org.apache.calcite.sql.{SqlKind, SqlOperator}
import org.apache.calcite.sql.`type`.{ReturnTypes, SqlTypeName}
import org.apache.calcite.sql.{SqlKind, SqlOperator}
import org.apache.calcite.util.{Sarg, TimestampString}
import org.apache.flink.table.functions.{BuiltInFunctionDefinitions, FunctionDefinition}

import scala.collection.JavaConversions._

Expand Down Expand Up @@ -810,21 +809,24 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)

case bsf: BridgingSqlFunction =>
bsf.getDefinition match {
case functionDefinition : FunctionDefinition
if functionDefinition eq BuiltInFunctionDefinitions.CURRENT_WATERMARK =>
case BuiltInFunctionDefinitions.CURRENT_WATERMARK =>
generateWatermark(ctx, contextTerm, resultType)
case functionDefinition : FunctionDefinition
if functionDefinition eq BuiltInFunctionDefinitions.GREATEST =>

case BuiltInFunctionDefinitions.GREATEST =>
operands.foreach { operand =>
requireComparable(operand)
}
generateGreatestLeast(resultType, operands)
case functionDefinition : FunctionDefinition
if functionDefinition eq BuiltInFunctionDefinitions.LEAST =>

case BuiltInFunctionDefinitions.LEAST =>
operands.foreach { operand =>
requireComparable(operand)
}
generateGreatestLeast(resultType, operands, false)
generateGreatestLeast(resultType, operands, greatest = false)

case BuiltInFunctionDefinitions.JSON_STRING =>
new JsonStringCallGen(call).generate(ctx, operands, resultType)

case _ =>
new BridgingSqlFunctionCallGen(call).generate(ctx, operands, resultType)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.codegen.calls

import org.apache.flink.table.planner.codegen.CodeGenUtils.{BINARY_STRING, className, newName, primitiveTypeTermForType}
import org.apache.flink.table.planner.codegen.JsonGenerateUtils.createNodeTerm
import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GeneratedExpression}
import org.apache.flink.table.runtime.functions.SqlJsonUtils
import org.apache.flink.table.types.logical.LogicalType

import org.apache.calcite.rex.RexCall

/** [[CallGenerator]] for `JSON_STRING`. */
class JsonStringCallGen(call: RexCall) extends CallGenerator {
private def jsonUtils = className[SqlJsonUtils]

override def generate(
ctx: CodeGeneratorContext,
operands: Seq[GeneratedExpression],
returnType: LogicalType): GeneratedExpression = {

val valueTerm = createNodeTerm(ctx, operands.head, call.operands.get(0))

val resultTerm = newName("result")
val resultTermType = primitiveTypeTermForType(returnType)
val resultCode = s"""
|${operands.map(_.code).mkString}
|
|$resultTermType $resultTerm = null;
|if (!${operands.head.nullTerm}) {
| $resultTerm =
| $BINARY_STRING.fromString($jsonUtils.serializeJson($valueTerm));
|}
|""".stripMargin

GeneratedExpression(resultTerm, operands.head.nullTerm, resultCode, returnType)
}
}
Loading

0 comments on commit 6bb15f3

Please sign in to comment.