Skip to content

Commit

Permalink
[SPARK-17358][SQL] Cached table(parquet/orc) should be shard between …
Browse files Browse the repository at this point in the history
…beelines

## What changes were proposed in this pull request?
Cached table(parquet/orc) couldn't be shard between beelines, because the `sameResult` method used by `CacheManager` always return false(`sparkSession` are different) when compare two `HadoopFsRelation` in different beelines. So we make `sparkSession` a curry parameter.

## How was this patch tested?
Beeline1
```
1: jdbc:hive2://localhost:10000> CACHE TABLE src_pqt;
+---------+--+
| Result  |
+---------+--+
+---------+--+
No rows selected (5.143 seconds)
1: jdbc:hive2://localhost:10000> EXPLAIN SELECT * FROM src_pqt;
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+
|                                                                                                                                                                                                            plan                                                                                                                                                                                                            |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+
| == Physical Plan ==
InMemoryTableScan [key#49, value#50]
   +- InMemoryRelation [key#49, value#50], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `src_pqt`
         +- *FileScan parquet default.src_pqt[key#0,value#1] Batched: true, Format: ParquetFormat, InputPaths: hdfs://199.0.0.1:9000/qiyadong/src_pqt, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:int,value:string>  |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+
```

Beeline2
```
0: jdbc:hive2://localhost:10000> EXPLAIN SELECT * FROM src_pqt;
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+
|                                                                                                                                                                                                            plan                                                                                                                                                                                                            |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+
| == Physical Plan ==
InMemoryTableScan [key#68, value#69]
   +- InMemoryRelation [key#68, value#69], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `src_pqt`
         +- *FileScan parquet default.src_pqt[key#0,value#1] Batched: true, Format: ParquetFormat, InputPaths: hdfs://199.0.0.1:9000/qiyadong/src_pqt, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:int,value:string>  |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+
```

Author: Yadong Qi <[email protected]>

Closes apache#14913 from watermen/SPARK-17358.
  • Loading branch information
watermen authored and cloud-fan committed Sep 6, 2016
1 parent afb3d5d commit 64e826f
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,12 @@ case class DataSource(
}

HadoopFsRelation(
sparkSession,
fileCatalog,
partitionSchema = fileCatalog.partitionSpec().partitionColumns,
dataSchema = dataSchema,
bucketSpec = None,
format,
options)
options)(sparkSession)

// This is a non-streaming file based datasource.
case (format: FileFormat, _) =>
Expand Down Expand Up @@ -409,13 +408,12 @@ case class DataSource(
}

HadoopFsRelation(
sparkSession,
fileCatalog,
partitionSchema = fileCatalog.partitionSpec().partitionColumns,
dataSchema = dataSchema.asNullable,
bucketSpec = bucketSpec,
format,
caseInsensitiveOptions)
caseInsensitiveOptions)(sparkSession)

case _ =>
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,13 @@ abstract class OutputWriter {
* @param options Configuration used when reading / writing data.
*/
case class HadoopFsRelation(
sparkSession: SparkSession,
location: FileCatalog,
partitionSchema: StructType,
dataSchema: StructType,
bucketSpec: Option[BucketSpec],
fileFormat: FileFormat,
options: Map[String, String]) extends BaseRelation with FileRelation {
options: Map[String, String])(val sparkSession: SparkSession)
extends BaseRelation with FileRelation {

override def sqlContext: SQLContext = sparkSession.sqlContext

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
val bucketed = df.queryExecution.analyzed transform {
case l @ LogicalRelation(r: HadoopFsRelation, _, _) =>
l.copy(relation =
r.copy(bucketSpec = Some(BucketSpec(numBuckets = buckets, "c1" :: Nil, Nil))))
r.copy(bucketSpec =
Some(BucketSpec(numBuckets = buckets, "c1" :: Nil, Nil)))(r.sparkSession))
}
Dataset.ofRows(spark, bucketed)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}

val relation = HadoopFsRelation(
sparkSession = sparkSession,
location = fileCatalog,
partitionSchema = partitionSchema,
dataSchema = inferredSchema,
bucketSpec = bucketSpec,
fileFormat = defaultSource,
options = options)
options = options)(sparkSession = sparkSession)

val created = LogicalRelation(relation, catalogTable = Some(metastoreRelation.catalogTable))
cachedDataSourceTables.put(tableIdentifier, created)
Expand Down

0 comments on commit 64e826f

Please sign in to comment.