Skip to content

Commit

Permalink
[SPARK-35155][SQL] Add rule id pruning to Analyzer rules
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Added rule id based pruning to Analyzer rules in fixed point batches:

- org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns
- org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator
- org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions
- org.apache.spark.sql.catalyst.analysis.Analyzer$GlobalAggregates
- org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggAliasInGroupBy
- org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggregateFunctions
- org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAliases
- org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveBinaryArithmetic
- org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer
- org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveEncodersInUDF
- org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions
- org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate
- org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics
- org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveInsertInto
- org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveMissingReferences
- org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNewInstance
- org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveOrdinalInOrderByAndGroupBy
- org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveOutputRelation
- org.apache.spark.sql.catalyst.analysis.Analyzer$ResolvePivot
- org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRandomSeed
- org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences
- org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations
- org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveSubqueryColumnAliases
- org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTables
- org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTempViews
- org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast
- org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUserSpecifiedColumns
- org.apache.spark.sql.catalyst.analysis.Analyzer$WindowsSubstitution
- org.apache.spark.sql.catalyst.analysis.DeduplicateRelations
- org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
- org.apache.spark.sql.catalyst.analysis.EliminateUnions
- org.apache.spark.sql.catalyst.analysis.ResolveCreateNamedStruct
- org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveCoalesceHints
- org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveJoinStrategyHints
- org.apache.spark.sql.catalyst.analysis.ResolveInlineTables
- org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables
- org.apache.spark.sql.catalyst.analysis.ResolveTimeZone
- org.apache.spark.sql.catalyst.analysis.ResolveUnion
- org.apache.spark.sql.catalyst.analysis.SubstituteUnresolvedOrdinals
- org.apache.spark.sql.catalyst.analysis.TimeWindowing

Subsequent PRs will add tree bits based pruning to those rules. Split a big PR to reduce review load.

### Why are the changes needed?

Reduce the number of tree traversals and hence improve the query compilation latency.

### How was this patch tested?

Existing tests.

Closes apache#32425 from sigmod/analyzer.

Authored-by: Yingyi Bu <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
  • Loading branch information
sigmod authored and gengliangwang committed May 6, 2021
1 parent 4fe4b65 commit 7970318
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 47 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeMap, AttributeSet, NamedExpression, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.AlwaysProcess

object DeduplicateRelations extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
renewDuplicatedRelations(Nil, plan)._1.resolveOperatorsUp {
renewDuplicatedRelations(Nil, plan)._1.resolveOperatorsUpWithPruning(
AlwaysProcess.fn, ruleId) {
case p: LogicalPlan if !p.childrenResolved => p
// To resolve duplicate expression IDs for Join.
case j @ Join(left, right, _, _, _) if !j.duplicateResolved =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.mutable
import org.apache.spark.sql.catalyst.expressions.{Ascending, Expression, IntegerLiteral, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.catalyst.trees.{AlwaysProcess, CurrentOrigin}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf

Expand Down Expand Up @@ -143,7 +143,8 @@ object ResolveHints {
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
AlwaysProcess.fn, ruleId) {
case h: UnresolvedHint if STRATEGY_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
if (h.parameters.isEmpty) {
// If there is no table alias specified, apply the hint on the entire subtree.
Expand Down Expand Up @@ -246,7 +247,8 @@ object ResolveHints {
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
AlwaysProcess.fn, ruleId) {
case hint @ UnresolvedHint(hintName, _, _) => hintName.toUpperCase(Locale.ROOT) match {
case "REPARTITION" =>
createRepartition(shuffle = true, hint)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import scala.util.control.NonFatal
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.AlwaysProcess
import org.apache.spark.sql.types.{StructField, StructType}

/**
* An analyzer rule that replaces [[UnresolvedInlineTable]] with [[LocalRelation]].
*/
object ResolveInlineTables extends Rule[LogicalPlan] with CastSupport {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
AlwaysProcess.fn, ruleId) {
case table: UnresolvedInlineTable if table.expressionsResolved =>
validateInputDimension(table)
validateInputEvaluable(table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.{CombineUnions, OptimizeUpdateFields}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.AlwaysProcess
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -249,7 +250,8 @@ object ResolveUnion extends Rule[LogicalPlan] {
caseSensitiveAnalysis)
}

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
AlwaysProcess.fn, ruleId) {
case e if !e.childrenResolved => e

case Union(children, byName, allowMissingCol) if byName =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.expressions.{BaseGroupingSets, Expression, Literal, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Sort}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.AlwaysProcess
import org.apache.spark.sql.catalyst.trees.CurrentOrigin.withOrigin
import org.apache.spark.sql.types.IntegerType

Expand All @@ -39,7 +40,8 @@ object SubstituteUnresolvedOrdinals extends Rule[LogicalPlan] {
case e => e
}

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
AlwaysProcess.fn, ruleId) {
case s: Sort if conf.orderByOrdinal && s.order.exists(o => containIntLiteral(o.child)) =>
val newOrders = s.order.map {
case order @ SortOrder(ordinal @ Literal(index: Int, IntegerType), _, _, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.AlwaysProcess
import org.apache.spark.sql.connector.catalog.{CatalogManager, LookupCatalog}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.DataType
Expand Down Expand Up @@ -90,7 +91,7 @@ object ResolveLambdaVariables extends Rule[LogicalPlan] {
}

override def apply(plan: LogicalPlan): LogicalPlan = {
plan.resolveOperators {
plan.resolveOperatorsWithPruning(AlwaysProcess.fn, ruleId) {
case q: LogicalPlan =>
q.mapExpressions(resolve(_, Map.empty))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ListQuery, TimeZoneAwareExpression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.AlwaysProcess
import org.apache.spark.sql.types.DataType

/**
Expand All @@ -38,7 +39,7 @@ object ResolveTimeZone extends Rule[LogicalPlan] {
}

override def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveExpressions(transformTimeZoneExprs)
plan.resolveExpressionsWithPruning(AlwaysProcess.fn, ruleId)(transformTimeZoneExprs)

def resolveTimeZones(e: Expression): Expression = e.transform(transformTimeZoneExprs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable
import org.apache.spark.sql.catalyst.expressions.{Expression, UpdateFields, WithField}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.AlwaysProcess


/**
Expand Down Expand Up @@ -71,7 +72,8 @@ object OptimizeUpdateFields extends Rule[LogicalPlan] {
UpdateFields(struct, fieldOps1 ++ fieldOps2)
}

def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions(optimizeUpdateFields)
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveExpressionsWithPruning(
AlwaysProcess.fn, ruleId)(optimizeUpdateFields)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,51 @@ object RuleIdCollection {
// a changing, external state. Rules here are in alphabetical order.
private val rulesNeedingIds: Seq[String] = {
// Catalyst Analyzer rules
"org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$GlobalAggregates" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggAliasInGroupBy" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggregateFunctions" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAliases" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveBinaryArithmetic" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveEncodersInUDF" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveInsertInto" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveMissingReferences" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNewInstance" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveOrdinalInOrderByAndGroupBy" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveOutputRelation" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolvePivot" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRandomSeed" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveSubquery" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveSubqueryColumnAliases" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTables" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTempViews" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUserSpecifiedColumns" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowOrder" ::
"org.apache.spark.sql.catalyst.analysis.Analyzer$WindowsSubstitution" ::
"org.apache.spark.sql.catalyst.analysis.ApplyCharTypePadding" ::
"org.apache.spark.sql.catalyst.analysis.DeduplicateRelations" ::
"org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases" ::
"org.apache.spark.sql.catalyst.analysis.EliminateUnions" ::
"org.apache.spark.sql.catalyst.analysis.ResolveCreateNamedStruct" ::
"org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveCoalesceHints" ::
"org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveJoinStrategyHints" ::
"org.apache.spark.sql.catalyst.analysis.ResolveInlineTables" ::
"org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables" ::
"org.apache.spark.sql.catalyst.analysis.ResolveTimeZone" ::
"org.apache.spark.sql.catalyst.analysis.ResolveUnion" ::
"org.apache.spark.sql.catalyst.analysis.SubstituteUnresolvedOrdinals" ::
"org.apache.spark.sql.catalyst.analysis.TimeWindowing" ::
"org.apache.spark.sql.catalyst.analysis.UpdateOuterReferences" ::
// Catalyst Optimizer rules
"org.apache.spark.sql.catalyst.optimizer.BooleanSimplification" ::
Expand All @@ -59,6 +99,7 @@ object RuleIdCollection {
"org.apache.spark.sql.catalyst.optimizer.OptimizeCsvJsonExprs" ::
"org.apache.spark.sql.catalyst.optimizer.OptimizeIn" ::
"org.apache.spark.sql.catalyst.optimizer.Optimizer$OptimizeSubqueries" ::
"org.apache.spark.sql.catalyst.optimizer.OptimizeUpdateFields"::
"org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelation" ::
"org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin" ::
"org.apache.spark.sql.catalyst.optimizer.PushExtraPredicateThroughJoin" ::
Expand Down

0 comments on commit 7970318

Please sign in to comment.