Skip to content

Commit

Permalink
[SPARK-28702][SQL] Display useful error message (instead of NPE) for …
Browse files Browse the repository at this point in the history
…invalid Dataset operations

### What changes were proposed in this pull request?
Added proper message instead of NPE for invalid Dataset operations (e.g. calling actions inside of transformations) similar to SPARK-5063 for RDD

### Why are the changes needed?
To report the user about the exact issue instead of NPE

### Does this PR introduce any user-facing change?
No

### How was this patch tested?

Manually tested

```scala
test code snap
"import spark.implicits._
    val ds1 = spark.sparkContext.parallelize(1 to 100, 100).toDS()
    val ds2 = spark.sparkContext.parallelize(1 to 100, 100).toDS()
    ds1.map(x => {
      // scalastyle:off
      println(ds2.count + x)
      x
    }).collect()"
```

Closes apache#25503 from shivusondur/jira28702.

Authored-by: shivusondur <[email protected]>
Signed-off-by: Josh Rosen <[email protected]>
  • Loading branch information
shivusondur authored and JoshRosen committed Aug 23, 2019
1 parent 33e45ec commit 23bed0d
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.util.control.NonFatal

import org.apache.commons.lang3.StringUtils

import org.apache.spark.TaskContext
import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.annotation.{DeveloperApi, Evolving, Experimental, Stable, Unstable}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.function._
Expand Down Expand Up @@ -184,11 +184,23 @@ private[sql] object Dataset {
*/
@Stable
class Dataset[T] private[sql](
@transient val sparkSession: SparkSession,
@transient private val _sparkSession: SparkSession,
@DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
@DeveloperApi @Unstable @transient val encoder: Encoder[T])
extends Serializable {

@transient lazy val sparkSession: SparkSession = {
if (_sparkSession == null) {
throw new SparkException(
"Dataset transformations and actions can only be invoked by the driver, not inside of" +
" other Dataset transformations; for example, dataset1.map(x => dataset2.values.count()" +
" * x) is invalid because the values transformation and count action cannot be " +
"performed inside of the dataset1.map transformation. For more information," +
" see SPARK-28702.")
}
_sparkSession
}

// A globally unique id of this Dataset.
private val id = Dataset.curId.getAndIncrement()

Expand Down

0 comments on commit 23bed0d

Please sign in to comment.