Skip to content

Commit

Permalink
[SPARK-7551][DataFrame] support backticks for DataFrame attribute res…
Browse files Browse the repository at this point in the history
…olution

Author: Wenchen Fan <[email protected]>

Closes apache#6074 from cloud-fan/7551 and squashes the following commits:

e6f579e [Wenchen Fan] allow space
2b86699 [Wenchen Fan] handle blank
e218d99 [Wenchen Fan] address comments
54c4209 [Wenchen Fan] fix 7551
  • Loading branch information
cloud-fan authored and rxin committed May 13, 2015
1 parent 7ff16e8 commit 213a6f3
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
}

/**
* Optionally resolves the given string to a [[NamedExpression]] using the input from all child
* Optionally resolves the given strings to a [[NamedExpression]] using the input from all child
* nodes of this LogicalPlan. The attribute is expressed as
* as string in the following form: `[scope].AttributeName.[nested].[fields]...`.
*/
Expand All @@ -116,7 +116,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
resolve(nameParts, children.flatMap(_.output), resolver, throwErrors)

/**
* Optionally resolves the given string to a [[NamedExpression]] based on the output of this
* Optionally resolves the given strings to a [[NamedExpression]] based on the output of this
* LogicalPlan. The attribute is expressed as string in the following form:
* `[scope].AttributeName.[nested].[fields]...`.
*/
Expand All @@ -126,6 +126,57 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
throwErrors: Boolean = false): Option[NamedExpression] =
resolve(nameParts, output, resolver, throwErrors)

/**
* Given an attribute name, split it to name parts by dot, but
* don't split the name parts quoted by backticks, for example,
* `ab.cd`.`efg` should be split into two parts "ab.cd" and "efg".
*/
def resolveQuoted(
name: String,
resolver: Resolver): Option[NamedExpression] = {
resolve(parseAttributeName(name), resolver, true)
}

/**
* Internal method, used to split attribute name by dot with backticks rule.
* Backticks must appear in pairs, and the quoted string must be a complete name part,
* which means `ab..c`e.f is not allowed.
* Escape character is not supported now, so we can't use backtick inside name part.
*/
private def parseAttributeName(name: String): Seq[String] = {
val e = new AnalysisException(s"syntax error in attribute name: $name")
val nameParts = scala.collection.mutable.ArrayBuffer.empty[String]
val tmp = scala.collection.mutable.ArrayBuffer.empty[Char]
var inBacktick = false
var i = 0
while (i < name.length) {
val char = name(i)
if (inBacktick) {
if (char == '`') {
inBacktick = false
if (i + 1 < name.length && name(i + 1) != '.') throw e
} else {
tmp += char
}
} else {
if (char == '`') {
if (tmp.nonEmpty) throw e
inBacktick = true
} else if (char == '.') {
if (tmp.isEmpty) throw e
nameParts += tmp.mkString
tmp.clear()
} else {
tmp += char
}
}
i += 1
}
if (tmp.isEmpty || inBacktick) throw e
nameParts += tmp.mkString
nameParts.toSeq
}

/**
* Resolve the given `name` string against the given attribute, returning either 0 or 1 match.
*
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,15 @@ class DataFrame private[sql](
}

protected[sql] def resolve(colName: String): NamedExpression = {
queryExecution.analyzed.resolve(colName.split("\\."), sqlContext.analyzer.resolver).getOrElse {
queryExecution.analyzed.resolveQuoted(colName, sqlContext.analyzer.resolver).getOrElse {
throw new AnalysisException(
s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")
}
}

protected[sql] def numericColumns: Seq[Expression] = {
schema.fields.filter(_.dataType.isInstanceOf[NumericType]).map { n =>
queryExecution.analyzed.resolve(n.name.split("\\."), sqlContext.analyzer.resolver).get
queryExecution.analyzed.resolveQuoted(n.name, sqlContext.analyzer.resolver).get
}
}

Expand Down
27 changes: 27 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,33 @@ class DataFrameSuite extends QueryTest {
assert(complexData.filter(complexData("m")(complexData("s")("value")) === 1).count() == 1)
}

test("SPARK-7551: support backticks for DataFrame attribute resolution") {
val df = TestSQLContext.jsonRDD(TestSQLContext.sparkContext.makeRDD(
"""{"a.b": {"c": {"d..e": {"f": 1}}}}""" :: Nil))
checkAnswer(
df.select(df("`a.b`.c.`d..e`.`f`")),
Row(1)
)

val df2 = TestSQLContext.jsonRDD(TestSQLContext.sparkContext.makeRDD(
"""{"a b": {"c": {"d e": {"f": 1}}}}""" :: Nil))
checkAnswer(
df2.select(df2("`a b`.c.d e.f")),
Row(1)
)

def checkError(testFun: => Unit): Unit = {
val e = intercept[org.apache.spark.sql.AnalysisException] {
testFun
}
assert(e.getMessage.contains("syntax error in attribute name:"))
}
checkError(df("`abc.`c`"))
checkError(df("`abc`..d"))
checkError(df("`a`.b."))
checkError(df("`a.b`.c.`d"))
}

test("SPARK-7324 dropDuplicates") {
val testData = TestSQLContext.sparkContext.parallelize(
(2, 1, 2) :: (1, 1, 1) ::
Expand Down

0 comments on commit 213a6f3

Please sign in to comment.