Skip to content

Commit

Permalink
[SPARK-9512][SQL] Revert SPARK-9251, Allow evaluation while sorting
Browse files Browse the repository at this point in the history
The analysis rule has a bug and we ended up making the sorter still capable of doing evaluation, so lets revert this for now.

Author: Michael Armbrust <[email protected]>

Closes apache#7906 from marmbrus/revertSortProjection and squashes the following commits:

2da6972 [Michael Armbrust] unrevert unrelated changes
4f2b00c [Michael Armbrust] Revert "[SPARK-9251][SQL] do not order by expressions which still need evaluation"
  • Loading branch information
marmbrus committed Aug 4, 2015
1 parent 6a0f8b9 commit 34a0eb2
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ class Analyzer(
ExtractWindowExpressions ::
GlobalAggregates ::
UnresolvedHavingClauseAttributes ::
RemoveEvaluationFromSort ::
HiveTypeCoercion.typeCoercionRules ++
extendedResolutionRules : _*),
Batch("Nondeterministic", Once,
Expand Down Expand Up @@ -955,63 +954,6 @@ class Analyzer(
Project(p.output, newPlan.withNewChildren(newChild :: Nil))
}
}

/**
* Removes all still-need-evaluate ordering expressions from sort and use an inner project to
* materialize them, finally use a outer project to project them away to keep the result same.
* Then we can make sure we only sort by [[AttributeReference]]s.
*
* As an example,
* {{{
* Sort('a, 'b + 1,
* Relation('a, 'b))
* }}}
* will be turned into:
* {{{
* Project('a, 'b,
* Sort('a, '_sortCondition,
* Project('a, 'b, ('b + 1).as("_sortCondition"),
* Relation('a, 'b))))
* }}}
*/
object RemoveEvaluationFromSort extends Rule[LogicalPlan] {
private def hasAlias(expr: Expression) = {
expr.find {
case a: Alias => true
case _ => false
}.isDefined
}

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// The ordering expressions have no effect to the output schema of `Sort`,
// so `Alias`s in ordering expressions are unnecessary and we should remove them.
case s @ Sort(ordering, _, _) if ordering.exists(hasAlias) =>
val newOrdering = ordering.map(_.transformUp {
case Alias(child, _) => child
}.asInstanceOf[SortOrder])
s.copy(order = newOrdering)

case s @ Sort(ordering, global, child)
if s.expressions.forall(_.resolved) && s.childrenResolved && !s.hasNoEvaluation =>

val (ref, needEval) = ordering.partition(_.child.isInstanceOf[AttributeReference])

val namedExpr = needEval.map(_.child match {
case n: NamedExpression => n
case e => Alias(e, "_sortCondition")()
})

val newOrdering = ref ++ needEval.zip(namedExpr).map { case (order, ne) =>
order.copy(child = ne.toAttribute)
}

// Add still-need-evaluate ordering expressions into inner project and then project
// them away after the sort.
Project(child.output,
Sort(newOrdering, global,
Project(child.output ++ namedExpr, child)))
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend
}.nonEmpty
)

expressions.forall(_.resolved) && childrenResolved && !hasSpecialExpressions
!expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions
}
}

Expand Down Expand Up @@ -68,7 +68,7 @@ case class Generate(
generator.resolved &&
childrenResolved &&
generator.elementTypes.length == generatorOutput.length &&
generatorOutput.forall(_.resolved)
!generatorOutput.exists(!_.resolved)
}

// we don't want the gOutput to be taken as part of the expressions
Expand Down Expand Up @@ -188,7 +188,7 @@ case class WithWindowDefinition(
}

/**
* @param order The ordering expressions, should all be [[AttributeReference]]
* @param order The ordering expressions
* @param global True means global sorting apply for entire data set,
* False means sorting only apply within the partition.
* @param child Child logical plan
Expand All @@ -198,11 +198,6 @@ case class Sort(
global: Boolean,
child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output

def hasNoEvaluation: Boolean = order.forall(_.child.isInstanceOf[AttributeReference])

override lazy val resolved: Boolean =
expressions.forall(_.resolved) && childrenResolved && hasNoEvaluation
}

case class Aggregate(
Expand All @@ -217,7 +212,7 @@ case class Aggregate(
}.nonEmpty
)

expressions.forall(_.resolved) && childrenResolved && !hasWindowExpressions
!expressions.exists(!_.resolved) && childrenResolved && !hasWindowExpressions
}

lazy val newAggregation: Option[Aggregate] = Utils.tryConvert(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,39 +165,11 @@ class AnalysisSuite extends AnalysisTest {

test("pull out nondeterministic expressions from Sort") {
val plan = Sort(Seq(SortOrder(Rand(33), Ascending)), false, testRelation)
val analyzed = caseSensitiveAnalyzer.execute(plan)
analyzed.transform {
case s: Sort if s.expressions.exists(!_.deterministic) =>
fail("nondeterministic expressions are not allowed in Sort")
}
}

test("remove still-need-evaluate ordering expressions from sort") {
val a = testRelation2.output(0)
val b = testRelation2.output(1)

def makeOrder(e: Expression): SortOrder = SortOrder(e, Ascending)

val noEvalOrdering = makeOrder(a)
val noEvalOrderingWithAlias = makeOrder(Alias(Alias(b, "name1")(), "name2")())

val needEvalExpr = Coalesce(Seq(a, Literal("1")))
val needEvalExpr2 = Coalesce(Seq(a, b))
val needEvalOrdering = makeOrder(needEvalExpr)
val needEvalOrdering2 = makeOrder(needEvalExpr2)

val plan = Sort(
Seq(noEvalOrdering, noEvalOrderingWithAlias, needEvalOrdering, needEvalOrdering2),
false, testRelation2)

val evaluatedOrdering = makeOrder(AttributeReference("_sortCondition", StringType)())
val materializedExprs = Seq(needEvalExpr, needEvalExpr2).map(e => Alias(e, "_sortCondition")())

val projected = Alias(Rand(33), "_nondeterministic")()
val expected =
Project(testRelation2.output,
Sort(Seq(makeOrder(a), makeOrder(b), evaluatedOrdering, evaluatedOrdering), false,
Project(testRelation2.output ++ materializedExprs, testRelation2)))

Project(testRelation.output,
Sort(Seq(SortOrder(projected.toAttribute, Ascending)), false,
Project(testRelation.output :+ projected, testRelation)))
checkAnalysis(plan, expected)
}
}

0 comments on commit 34a0eb2

Please sign in to comment.