diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index b21594deb709e..e5e91acf86594 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer import scala.annotation.tailrec +import scala.util.control.NonFatal import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction @@ -151,8 +152,17 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { val emptyRow = new GenericInternalRow(attributes.length) val boundE = BindReferences.bindReference(e, attributes) if (boundE.exists(_.isInstanceOf[Unevaluable])) return false - val v = boundE.eval(emptyRow) - v == null || v == false + + // some expressions, like map(), may throw an exception when dealing with null values. + // therefore, we need to handle exceptions. + try { + val v = boundE.eval(emptyRow) + v == null || v == false + } catch { + case NonFatal(e) => + // cannot filter out null if `where` expression throws an exception with null input + false + } } private def buildNewJoinType(filter: Filter, join: Join): JoinType = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala index 192db596347a3..2530cfded9ec2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala @@ -20,11 +20,13 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{Coalesce, IsNotNull} +import org.apache.spark.sql.catalyst.expressions.{Coalesce, If, IsNotNull, Literal, RaiseError} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StringType +import org.apache.spark.unsafe.types.UTF8String class OuterJoinEliminationSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { @@ -252,4 +254,18 @@ class OuterJoinEliminationSuite extends PlanTest { comparePlans(optimized, originalQuery.analyze) } } + + test("SPARK-38868: exception thrown from filter predicate does not propagate") { + val x = testRelation.subquery(Symbol("x")) + val y = testRelation1.subquery(Symbol("y")) + + val message = Literal(UTF8String.fromString("Bad value"), StringType) + val originalQuery = + x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr)) + .where(If("y.d".attr > 0, true, RaiseError(message)).isNull) + + val optimized = Optimize.execute(originalQuery.analyze) + + comparePlans(optimized, originalQuery.analyze) + } }