Skip to content

Commit

Permalink
[LIVY-299][REPL] Output multiple lines in one statement block
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Livy Scala Interpreter doesn't support output multiple lines in one code block, this will miss some outputs, so here change the logic to support it.

## How was this patch tested?

Add new unit tests.

Author: jerryshao <[email protected]>

Closes apache#66 from jerryshao/LIVY-299.
  • Loading branch information
jerryshao committed Nov 24, 2017
1 parent 1f59e10 commit f893d19
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ class InteractiveIT extends BaseIntegrationTestSuite {
test("basic interactive session") {
withNewSession(Spark) { s =>
s.run("val sparkVersion = sc.version").result().left.foreach(info(_))
s.run("1+1").verifyResult("res0: Int = 2")
s.run("""sc.getConf.get("spark.executor.instances")""").verifyResult("res1: String = 1")
s.run("1+1").verifyResult("res0: Int = 2\n")
s.run("""sc.getConf.get("spark.executor.instances")""").verifyResult("res1: String = 1\n")
s.run("val sql = new org.apache.spark.sql.SQLContext(sc)").verifyResult(
".*" + Pattern.quote(
"sql: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext") + ".*")
Expand All @@ -46,8 +46,8 @@ class InteractiveIT extends BaseIntegrationTestSuite {
// Verify Livy internal configurations are not exposed.
// TODO separate all these checks to different sub tests after merging new IT code.
s.run("""sc.getConf.getAll.exists(_._1.startsWith("spark.__livy__."))""")
.verifyResult(".*false")
s.run("""sys.props.exists(_._1.startsWith("spark.__livy__."))""").verifyResult(".*false")
.verifyResult(".*false\n")
s.run("""sys.props.exists(_._1.startsWith("spark.__livy__."))""").verifyResult(".*false\n")
s.run("""val str = "str"""")
s.complete("str.", "scala", 4).verifyContaining(List("compare", "contains"))
s.complete("str2.", "scala", 5).verifyNone()
Expand Down Expand Up @@ -137,16 +137,16 @@ class InteractiveIT extends BaseIntegrationTestSuite {
// Check is the library loaded in JVM in the proper class loader.
s.run("Thread.currentThread.getContextClassLoader.loadClass" +
"""("org.codehaus.plexus.util.FileUtils")""")
.verifyResult(".*Class\\[_\\] = class org.codehaus.plexus.util.FileUtils")
.verifyResult(".*Class\\[_\\] = class org.codehaus.plexus.util.FileUtils\n")

// Check does Scala interpreter see the library.
s.run("import org.codehaus.plexus.util._").verifyResult("import org.codehaus.plexus.util._")
s.run("import org.codehaus.plexus.util._").verifyResult("import org.codehaus.plexus.util._\n")

// Check does SparkContext see classes defined by Scala interpreter.
s.run("case class Item(i: Int)").verifyResult("defined class Item")
s.run("case class Item(i: Int)").verifyResult("defined class Item\n")
s.run("val rdd = sc.parallelize(Array.fill(10){new Item(scala.util.Random.nextInt(1000))})")
.verifyResult("rdd.*")
s.run("rdd.count()").verifyResult(".*= 10")
s.run("rdd.count()").verifyResult(".*= 10\n")
}
}

Expand All @@ -164,15 +164,15 @@ class InteractiveIT extends BaseIntegrationTestSuite {
test("recover interactive session") {
withNewSession(Spark) { s =>
val stmt1 = s.run("1")
stmt1.verifyResult("res0: Int = 1")
stmt1.verifyResult("res0: Int = 1\n")

restartLivy()

// Verify session still exists.
s.verifySessionIdle()
s.run("2").verifyResult("res1: Int = 2")
s.run("2").verifyResult("res1: Int = 2\n")
// Verify statement result is preserved.
stmt1.verifyResult("res0: Int = 1")
stmt1.verifyResult("res0: Int = 1\n")

s.stop()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,34 @@ abstract class AbstractSparkInterpreter extends Interpreter with Logging {
case Interpreter.ExecuteError(_, _, _) =>
result

case _ =>
executeLines(tail, result)
case Interpreter.ExecuteAborted(_) =>
result

case Interpreter.ExecuteSuccess(e) =>
val mergedRet = resultFromLastLine match {
case Interpreter.ExecuteSuccess(s) =>
// Because of SparkMagic related specific logic, so we will only merge text/plain
// result. For any magic related output, still follow the old way.
if (s.values.contains(TEXT_PLAIN) && e.values.contains(TEXT_PLAIN)) {
val lastRet = s.values.getOrElse(TEXT_PLAIN, "").asInstanceOf[String]
val currRet = e.values.getOrElse(TEXT_PLAIN, "").asInstanceOf[String]
if (lastRet.nonEmpty && currRet.nonEmpty) {
Interpreter.ExecuteSuccess(TEXT_PLAIN -> s"$lastRet$currRet")
} else if (lastRet.nonEmpty) {
Interpreter.ExecuteSuccess(TEXT_PLAIN -> lastRet)
} else if (currRet.nonEmpty) {
Interpreter.ExecuteSuccess(TEXT_PLAIN -> currRet)
} else {
result
}
} else {
result
}

case _ => result
}

executeLines(tail, mergedRet)
}
}
}
Expand Down Expand Up @@ -318,7 +344,7 @@ abstract class AbstractSparkInterpreter extends Interpreter with Logging {
}

private def readStdout() = {
val output = outputStream.toString("UTF-8").trim
val output = outputStream.toString("UTF-8")
outputStream.reset()

output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ abstract class PythonBaseInterpreterSpec extends BaseInterpreterSpec {
))
}

it should "get multiple outputs in one block" in withInterpreter { interpreter =>
val response = interpreter.execute(
"""
|print("1")
|print("2")
""".stripMargin)
response should equal(Interpreter.ExecuteSuccess(
TEXT_PLAIN -> "1\n2"
))
}

it should "parse a class" in withInterpreter { interpreter =>
val response = interpreter.execute(
"""
Expand Down
38 changes: 24 additions & 14 deletions repl/src/test/scala/org/apache/livy/repl/ScalaInterpreterSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,24 @@ class ScalaInterpreterSpec extends BaseInterpreterSpec {
it should "execute `1 + 2` == 3" in withInterpreter { interpreter =>
val response = interpreter.execute("1 + 2")
response should equal (Interpreter.ExecuteSuccess(
TEXT_PLAIN -> "res0: Int = 3"
TEXT_PLAIN -> "res0: Int = 3\n"
))
}

it should "execute multiple statements" in withInterpreter { interpreter =>
var response = interpreter.execute("val x = 1")
response should equal (Interpreter.ExecuteSuccess(
TEXT_PLAIN -> "x: Int = 1"
TEXT_PLAIN -> "x: Int = 1\n"
))

response = interpreter.execute("val y = 2")
response should equal (Interpreter.ExecuteSuccess(
TEXT_PLAIN -> "y: Int = 2"
TEXT_PLAIN -> "y: Int = 2\n"
))

response = interpreter.execute("x + y")
response should equal (Interpreter.ExecuteSuccess(
TEXT_PLAIN -> "res0: Int = 3"
TEXT_PLAIN -> "res0: Int = 3\n"
))
}

Expand All @@ -64,7 +64,7 @@ class ScalaInterpreterSpec extends BaseInterpreterSpec {
|x + y
""".stripMargin)
response should equal(Interpreter.ExecuteSuccess(
TEXT_PLAIN -> "res2: Int = 3"
TEXT_PLAIN -> "x: Int = 1\ny: Int = 2\nres2: Int = 3\n"
))
}

Expand Down Expand Up @@ -96,14 +96,24 @@ class ScalaInterpreterSpec extends BaseInterpreterSpec {
""".stripMargin)

response should equal(Interpreter.ExecuteSuccess(
TEXT_PLAIN -> "res0: Int = 3"
TEXT_PLAIN -> "res0: Int = 3\n"
))
}

it should "capture stdout" in withInterpreter { interpreter =>
val response = interpreter.execute("println(\"Hello World\")")
response should equal(Interpreter.ExecuteSuccess(
TEXT_PLAIN -> "Hello World"
TEXT_PLAIN -> "Hello World\n"
))

val resp1 = interpreter.execute("print(1)\nprint(2)")
resp1 should equal(Interpreter.ExecuteSuccess(
TEXT_PLAIN -> "12"
))

val resp2 = interpreter.execute("println(1)\nprintln(2)")
resp2 should equal(Interpreter.ExecuteSuccess(
TEXT_PLAIN -> "1\n2\n"
))
}

Expand All @@ -123,7 +133,7 @@ class ScalaInterpreterSpec extends BaseInterpreterSpec {
"""sc.parallelize(0 to 1).map { i => i+1 }.collect""".stripMargin)

response should equal(Interpreter.ExecuteSuccess(
TEXT_PLAIN -> "res0: Array[Int] = Array(1, 2)"
TEXT_PLAIN -> "res0: Array[Int] = Array(1, 2)\n"
))
}

Expand All @@ -144,7 +154,7 @@ class ScalaInterpreterSpec extends BaseInterpreterSpec {
"""val r = 1
|// comment
""".stripMargin)
response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1"))
response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1\n"))

response = interpreter.execute(
"""val r = 1
Expand All @@ -153,7 +163,7 @@ class ScalaInterpreterSpec extends BaseInterpreterSpec {
|comment
|*/
""".stripMargin)
response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1"))
response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1\n"))

// Test statements ending with a mix of single line and multi-line comments
response = interpreter.execute(
Expand All @@ -165,7 +175,7 @@ class ScalaInterpreterSpec extends BaseInterpreterSpec {
|*/
|// comment
""".stripMargin)
response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1"))
response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1\n"))

response = interpreter.execute(
"""val r = 1
Expand All @@ -175,7 +185,7 @@ class ScalaInterpreterSpec extends BaseInterpreterSpec {
|comment
|*/
""".stripMargin)
response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1"))
response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1\n"))

// Make sure incomplete statement is still returned as incomplete statement.
response = interpreter.execute("sc.")
Expand All @@ -195,12 +205,12 @@ class ScalaInterpreterSpec extends BaseInterpreterSpec {

try {
response should equal(
Interpreter.ExecuteSuccess(TEXT_PLAIN -> s"r: String = \n$stringWithComment"))
Interpreter.ExecuteSuccess(TEXT_PLAIN -> s"r: String = \n$stringWithComment\n"))
} catch {
case _: Exception =>
response should equal(
// Scala 2.11 doesn't have a " " after "="
Interpreter.ExecuteSuccess(TEXT_PLAIN -> s"r: String =\n$stringWithComment"))
Interpreter.ExecuteSuccess(TEXT_PLAIN -> s"r: String =\n$stringWithComment\n"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class SharedSessionSpec extends BaseSessionSpec(Shared) {
"status" -> "ok",
"execution_count" -> 0,
"data" -> Map(
"text/plain" -> "res0: Int = 3"
"text/plain" -> "res0: Int = 3\n"
)
))

Expand Down Expand Up @@ -82,7 +82,7 @@ class SharedSessionSpec extends BaseSessionSpec(Shared) {
"status" -> "ok",
"execution_count" -> 0,
"data" -> Map(
"text/plain" -> "res0: Array[Int] = Array(1, 2)"
"text/plain" -> "res0: Array[Int] = Array(1, 2)\n"
)
))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@ class SparkRInterpreterSpec extends BaseInterpreterSpec {
))
}

it should "get multiple outputs in one block" in withInterpreter { interpreter =>
val response = interpreter.execute(
"""
|print("1")
|print("2")
""".stripMargin)
response should equal(Interpreter.ExecuteSuccess(
TEXT_PLAIN -> "[1] \"1\"\n[1] \"2\""
))
}

it should "capture stdout" in withInterpreter { interpreter =>
val response = interpreter.execute("cat(3)")
response should equal(Interpreter.ExecuteSuccess(
Expand Down
12 changes: 6 additions & 6 deletions repl/src/test/scala/org/apache/livy/repl/SparkSessionSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class SparkSessionSpec extends BaseSessionSpec(Spark) {
"status" -> "ok",
"execution_count" -> 0,
"data" -> Map(
"text/plain" -> "res0: Int = 3"
"text/plain" -> "res0: Int = 3\n"
)
))

Expand All @@ -56,7 +56,7 @@ class SparkSessionSpec extends BaseSessionSpec(Spark) {
"status" -> "ok",
"execution_count" -> 0,
"data" -> Map(
"text/plain" -> "x: Int = 1"
"text/plain" -> "x: Int = 1\n"
)
))

Expand All @@ -70,7 +70,7 @@ class SparkSessionSpec extends BaseSessionSpec(Spark) {
"status" -> "ok",
"execution_count" -> 1,
"data" -> Map(
"text/plain" -> "y: Int = 2"
"text/plain" -> "y: Int = 2\n"
)
))

Expand All @@ -84,7 +84,7 @@ class SparkSessionSpec extends BaseSessionSpec(Spark) {
"status" -> "ok",
"execution_count" -> 2,
"data" -> Map(
"text/plain" -> "res0: Int = 3"
"text/plain" -> "res0: Int = 3\n"
)
))

Expand All @@ -100,7 +100,7 @@ class SparkSessionSpec extends BaseSessionSpec(Spark) {
"status" -> "ok",
"execution_count" -> 0,
"data" -> Map(
"text/plain" -> "Hello World"
"text/plain" -> "Hello World\n"
)
))

Expand Down Expand Up @@ -169,7 +169,7 @@ class SparkSessionSpec extends BaseSessionSpec(Spark) {
"status" -> "ok",
"execution_count" -> 0,
"data" -> Map(
"text/plain" -> "res0: Array[Int] = Array(1, 2)"
"text/plain" -> "res0: Array[Int] = Array(1, 2)\n"
)
))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class InteractiveSessionSpec extends FunSpec
scalaResult should equal (Extraction.decompose(Map(
"status" -> "ok",
"execution_count" -> 1,
"data" -> Map("text/plain" -> "res0: Int = 3")))
"data" -> Map("text/plain" -> "res0: Int = 3\n")))
)

val rResult = executeStatement("1 + 2", Some("sparkr"))
Expand Down

0 comments on commit f893d19

Please sign in to comment.