Skip to content

Commit

Permalink
[SPARK-37750][SQL] ANSI mode: optionally return null result if elemen…
Browse files Browse the repository at this point in the history
…t not exists in array/map

### What changes were proposed in this pull request?

Add a new configuration `spark.sql.ansi.failOnElementNotExists` which controls whether throwing exceptions or returning null results when element not exists in the [] operator in array/map type

The default value of the new configuration is true.

### Why are the changes needed?

* Provide an alternative way for Spark SQL users who replies on null results when element not exists in array/map, e.g. `select .. where array[index] is not null` or `select .. where map[key] is not null`
* Map type is not part of the ANSI SQL type. There can be arguments that map[key] should return null if key not exist.

### Does this PR introduce _any_ user-facing change?

Yes, providing a new option `spark.sql.ansi.failOnElementNotExists` which can optionally return null result if element not exists in array/map. However, the default behavior is not changed.

### How was this patch tested?

Unit tests

Closes apache#35031 from gengliangwang/safeAccess.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
  • Loading branch information
gengliangwang committed Jan 4, 2022
1 parent 3a2da70 commit 08fd501
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 21 deletions.
9 changes: 6 additions & 3 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@
"message" : [ "%s" ]
},
"INVALID_ARRAY_INDEX" : {
"message" : [ "Invalid index: %s, numElements: %s. If necessary set %s to false to bypass this error." ]
},
"INVALID_ARRAY_INDEX_IN_ELEMENT_AT" : {
"message" : [ "Invalid index: %s, numElements: %s. To return NULL instead, use 'try_element_at'. If necessary set %s to false to bypass this error." ]
},
"INVALID_FIELD_NAME" : {
Expand All @@ -82,9 +85,6 @@
"message" : [ "The fraction of sec must be zero. Valid range is [0, 60]. If necessary set %s to false to bypass this error. " ],
"sqlState" : "22023"
},
"INVALID_INPUT_INDEX" : {
"message" : [ "Invalid index: %s, numElements: %s. If necessary set %s to false to bypass this error." ]
},
"INVALID_INPUT_SYNTAX_FOR_NUMERIC_TYPE" : {
"message" : [ "invalid input syntax for type numeric: %s. To return NULL instead, use 'try_cast'. If necessary set %s to false to bypass this error." ],
"sqlState" : "42000"
Expand All @@ -93,6 +93,9 @@
"message" : [ "Input schema %s can only contain StringType as a key type for a MapType." ]
},
"MAP_KEY_DOES_NOT_EXIST" : {
"message" : [ "Key %s does not exist. If necessary set %s to false to bypass this error." ]
},
"MAP_KEY_DOES_NOT_EXIST_IN_ELEMENT_AT" : {
"message" : [ "Key %s does not exist. To return NULL instead, use 'try_element_at'. If necessary set %s to false to bypass this error." ]
},
"MISSING_COLUMN" : {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2068,6 +2068,8 @@ case class ElementAt(
case MapType(_, valueType, _) => valueType
}

override val isElementAtFunction: Boolean = true

override def inputTypes: Seq[AbstractDataType] = {
(left.dataType, right.dataType) match {
case (arr: ArrayType, e2: IntegralType) if (e2 != LongType) =>
Expand Down Expand Up @@ -2129,7 +2131,7 @@ case class ElementAt(
val index = ordinal.asInstanceOf[Int]
if (array.numElements() < math.abs(index)) {
if (failOnError) {
throw QueryExecutionErrors.invalidArrayIndexError(index, array.numElements())
throw QueryExecutionErrors.invalidElementAtIndexError(index, array.numElements())
} else {
null
}
Expand Down Expand Up @@ -2168,7 +2170,7 @@ case class ElementAt(
}

val indexOutOfBoundBranch = if (failOnError) {
s"throw QueryExecutionErrors.invalidArrayIndexError($index, $eval1.numElements());"
s"throw QueryExecutionErrors.invalidElementAtIndexError($index, $eval1.numElements());"
} else {
s"${ev.isNull} = true;"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ case class GetArrayStructFields(
case class GetArrayItem(
child: Expression,
ordinal: Expression,
failOnError: Boolean = SQLConf.get.ansiEnabled)
failOnError: Boolean = SQLConf.get.ansiFailOnElementNotExists)
extends BinaryExpression with GetArrayItemUtil with ExpectsInputTypes with ExtractValue {

def this(child: Expression, ordinal: Expression) = this(child, ordinal, SQLConf.get.ansiEnabled)
Expand Down Expand Up @@ -341,6 +341,8 @@ trait GetArrayItemUtil {
*/
trait GetMapValueUtil extends BinaryExpression with ImplicitCastInputTypes {

protected val isElementAtFunction: Boolean = false

// todo: current search is O(n), improve it.
def getValueEval(
value: Any,
Expand All @@ -365,7 +367,7 @@ trait GetMapValueUtil extends BinaryExpression with ImplicitCastInputTypes {

if (!found) {
if (failOnError) {
throw QueryExecutionErrors.mapKeyNotExistError(ordinal)
throw QueryExecutionErrors.mapKeyNotExistError(ordinal, isElementAtFunction)
} else {
null
}
Expand Down Expand Up @@ -400,7 +402,7 @@ trait GetMapValueUtil extends BinaryExpression with ImplicitCastInputTypes {
val keyJavaType = CodeGenerator.javaType(keyType)
nullSafeCodeGen(ctx, ev, (eval1, eval2) => {
val keyNotFoundBranch = if (failOnError) {
s"throw QueryExecutionErrors.mapKeyNotExistError($eval2);"
s"throw QueryExecutionErrors.mapKeyNotExistError($eval2, $isElementAtFunction);"
} else {
s"${ev.isNull} = true;"
}
Expand Down Expand Up @@ -439,7 +441,7 @@ trait GetMapValueUtil extends BinaryExpression with ImplicitCastInputTypes {
case class GetMapValue(
child: Expression,
key: Expression,
failOnError: Boolean = SQLConf.get.ansiEnabled)
failOnError: Boolean = SQLConf.get.ansiFailOnElementNotExists)
extends GetMapValueUtil with ExtractValue {

def this(child: Expression, key: Expression) = this(child, key, SQLConf.get.ansiEnabled)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,18 +164,36 @@ object QueryExecutionErrors {
}

def invalidArrayIndexError(index: Int, numElements: Int): ArrayIndexOutOfBoundsException = {
invalidArrayIndexErrorInternal(index, numElements, SQLConf.ANSI_STRICT_INDEX_OPERATOR.key)
}

def invalidInputIndexError(index: Int, numElements: Int): ArrayIndexOutOfBoundsException = {
invalidArrayIndexErrorInternal(index, numElements, SQLConf.ANSI_ENABLED.key)
}

private def invalidArrayIndexErrorInternal(
index: Int,
numElements: Int,
key: String): ArrayIndexOutOfBoundsException = {
new SparkArrayIndexOutOfBoundsException(errorClass = "INVALID_ARRAY_INDEX",
messageParameters = Array(index.toString, numElements.toString, SQLConf.ANSI_ENABLED.key))
messageParameters = Array(index.toString, numElements.toString, key))
}

def invalidInputIndexError(index: Int, stringLength: Int): ArrayIndexOutOfBoundsException = {
new SparkArrayIndexOutOfBoundsException(errorClass = "INVALID_INPUT_INDEX",
messageParameters = Array(index.toString, stringLength.toString, SQLConf.ANSI_ENABLED.key))
def invalidElementAtIndexError(
index: Int,
numElements: Int): ArrayIndexOutOfBoundsException = {
new SparkArrayIndexOutOfBoundsException(errorClass = "INVALID_ARRAY_INDEX_IN_ELEMENT_AT",
messageParameters = Array(index.toString, numElements.toString, SQLConf.ANSI_ENABLED.key))
}

def mapKeyNotExistError(key: Any): NoSuchElementException = {
new SparkNoSuchElementException(errorClass = "MAP_KEY_DOES_NOT_EXIST",
messageParameters = Array(key.toString, SQLConf.ANSI_ENABLED.key))
def mapKeyNotExistError(key: Any, isElementAtFunction: Boolean): NoSuchElementException = {
if (isElementAtFunction) {
new SparkNoSuchElementException(errorClass = "MAP_KEY_DOES_NOT_EXIST_IN_ELEMENT_AT",
messageParameters = Array(key.toString, SQLConf.ANSI_ENABLED.key))
} else {
new SparkNoSuchElementException(errorClass = "MAP_KEY_DOES_NOT_EXIST",
messageParameters = Array(key.toString, SQLConf.ANSI_STRICT_INDEX_OPERATOR.key))
}
}

def rowFromCSVParserNotExpectedError(): Throwable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2665,6 +2665,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val ANSI_STRICT_INDEX_OPERATOR = buildConf("spark.sql.ansi.strictIndexOperator")
.doc(s"When true and '${ANSI_ENABLED.key}' is true, accessing complex SQL types via [] " +
"operator will throw an exception if array index is out of bound, or map key does not " +
"exist. Otherwise, Spark will return a null result when accessing an invalid index.")
.version("3.3.0")
.booleanConf
.createWithDefault(true)

val SORT_BEFORE_REPARTITION =
buildConf("spark.sql.execution.sortBeforeRepartition")
.internal()
Expand Down Expand Up @@ -4130,6 +4138,8 @@ class SQLConf extends Serializable with Logging {

def enforceReservedKeywords: Boolean = ansiEnabled && getConf(ENFORCE_RESERVED_KEYWORDS)

def ansiFailOnElementNotExists: Boolean = ansiEnabled && getConf(ANSI_STRICT_INDEX_OPERATOR)

def timestampType: AtomicType = getConf(TIMESTAMP_TYPE) match {
case "TIMESTAMP_LTZ" =>
// For historical reason, the TimestampType maps to TIMESTAMP WITH LOCAL TIME ZONE
Expand Down
16 changes: 16 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/ansi/array.sql
Original file line number Diff line number Diff line change
@@ -1 +1,17 @@
--IMPORT array.sql

-- index out of range for array elements
-- return null results if array index in [] operator is out of bound
set spark.sql.ansi.strictIndexOperator=false;
select array(1, 2, 3)[5];
select array(1, 2, 3)[-1];

-- the configuration spark.sql.ansi.strictIndexOperator doesn't affect the function element_at
select element_at(array(1, 2, 3), 5);
select element_at(array(1, 2, 3), -5);
select element_at(array(1, 2, 3), 0);

-- -- the configuration spark.sql.ansi.strictIndexOperator doesn't affect the function elt
select elt(4, '123', '456');
select elt(0, '123', '456');
select elt(-1, '123', '456');
7 changes: 7 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/ansi/map.sql
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
--IMPORT map.sql

-- key does not exist
-- return null results if the map key in [] operator doesn't exist
set spark.sql.ansi.strictIndexOperator=false;
select map(1, 'a', 2, 'b')[5];
-- the configuration spark.sql.ansi.strictIndexOperator doesn't affect the function element_at
select element_at(map(1, 'a', 2, 'b'), 5);
84 changes: 81 additions & 3 deletions sql/core/src/test/resources/sql-tests/results/ansi/array.sql.out
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 20
-- Number of queries: 29


-- !query
Expand Down Expand Up @@ -222,13 +222,91 @@ select array(1, 2, 3)[5]
struct<>
-- !query output
org.apache.spark.SparkArrayIndexOutOfBoundsException
Invalid index: 5, numElements: 3. To return NULL instead, use 'try_element_at'. If necessary set spark.sql.ansi.enabled to false to bypass this error.
Invalid index: 5, numElements: 3. If necessary set spark.sql.ansi.strictIndexOperator to false to bypass this error.


-- !query
select array(1, 2, 3)[-1]
-- !query schema
struct<>
-- !query output
org.apache.spark.SparkArrayIndexOutOfBoundsException
Invalid index: -1, numElements: 3. If necessary set spark.sql.ansi.strictIndexOperator to false to bypass this error.


-- !query
set spark.sql.ansi.strictIndexOperator=false
-- !query schema
struct<key:string,value:string>
-- !query output
spark.sql.ansi.strictIndexOperator false


-- !query
select array(1, 2, 3)[5]
-- !query schema
struct<array(1, 2, 3)[5]:int>
-- !query output
NULL


-- !query
select array(1, 2, 3)[-1]
-- !query schema
struct<array(1, 2, 3)[-1]:int>
-- !query output
NULL


-- !query
select element_at(array(1, 2, 3), 5)
-- !query schema
struct<>
-- !query output
org.apache.spark.SparkArrayIndexOutOfBoundsException
Invalid index: 5, numElements: 3. To return NULL instead, use 'try_element_at'. If necessary set spark.sql.ansi.enabled to false to bypass this error.


-- !query
select element_at(array(1, 2, 3), -5)
-- !query schema
struct<>
-- !query output
org.apache.spark.SparkArrayIndexOutOfBoundsException
Invalid index: -5, numElements: 3. To return NULL instead, use 'try_element_at'. If necessary set spark.sql.ansi.enabled to false to bypass this error.


-- !query
select element_at(array(1, 2, 3), 0)
-- !query schema
struct<>
-- !query output
java.lang.ArrayIndexOutOfBoundsException
SQL array indices start at 1


-- !query
select elt(4, '123', '456')
-- !query schema
struct<>
-- !query output
org.apache.spark.SparkArrayIndexOutOfBoundsException
Invalid index: 4, numElements: 2. If necessary set spark.sql.ansi.enabled to false to bypass this error.


-- !query
select elt(0, '123', '456')
-- !query schema
struct<>
-- !query output
org.apache.spark.SparkArrayIndexOutOfBoundsException
Invalid index: 0, numElements: 2. If necessary set spark.sql.ansi.enabled to false to bypass this error.


-- !query
select elt(-1, '123', '456')
-- !query schema
struct<>
-- !query output
org.apache.spark.SparkArrayIndexOutOfBoundsException
Invalid index: -1, numElements: 3. To return NULL instead, use 'try_element_at'. If necessary set spark.sql.ansi.enabled to false to bypass this error.
Invalid index: -1, numElements: 2. If necessary set spark.sql.ansi.enabled to false to bypass this error.
29 changes: 27 additions & 2 deletions sql/core/src/test/resources/sql-tests/results/ansi/map.sql.out
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 10
-- Number of queries: 13


-- !query
Expand All @@ -17,7 +17,7 @@ select map(1, 'a', 2, 'b')[5]
struct<>
-- !query output
org.apache.spark.SparkNoSuchElementException
Key 5 does not exist. To return NULL instead, use 'try_element_at'. If necessary set spark.sql.ansi.enabled to false to bypass this error.
Key 5 does not exist. If necessary set spark.sql.ansi.strictIndexOperator to false to bypass this error.


-- !query
Expand Down Expand Up @@ -84,3 +84,28 @@ struct<>
-- !query output
org.apache.spark.sql.AnalysisException
cannot resolve 'array_contains(map_keys(map(1, 'a', 2, 'b')), '1')' due to data type mismatch: Input to function array_contains should have been array followed by a value with same element type, but it's [array<int>, string].; line 1 pos 7


-- !query
set spark.sql.ansi.strictIndexOperator=false
-- !query schema
struct<key:string,value:string>
-- !query output
spark.sql.ansi.strictIndexOperator false


-- !query
select map(1, 'a', 2, 'b')[5]
-- !query schema
struct<map(1, a, 2, b)[5]:string>
-- !query output
NULL


-- !query
select element_at(map(1, 'a', 2, 'b'), 5)
-- !query schema
struct<>
-- !query output
org.apache.spark.SparkNoSuchElementException
Key 5 does not exist. To return NULL instead, use 'try_element_at'. If necessary set spark.sql.ansi.enabled to false to bypass this error.

0 comments on commit 08fd501

Please sign in to comment.