Skip to content

Commit

Permalink
init commit: compiling and working
Browse files Browse the repository at this point in the history
  • Loading branch information
Abhishek Somani authored and citrusraj committed Jul 23, 2019
1 parent 1246247 commit 0d2d73a
Show file tree
Hide file tree
Showing 9 changed files with 367 additions and 2 deletions.
25 changes: 25 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
*.class
*.log
*.iml
*.ipr
*.iws
.idea
out
.cache/
.history/
.lib/
dist/*
target/
bin/
libexec/
lib_managed/
src_managed/
project/boot/
project/plugins/project/
logs/
project/*-shim.sbt
project/project/
project/target/
target/
.scala_dependencies
.worksheet
2 changes: 0 additions & 2 deletions README.md

This file was deleted.

49 changes: 49 additions & 0 deletions acid-datasource/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
name := "acid-datasource"

version := "0.1"

scalaVersion := "2.11.12"

//assemblyShadeRules in assembly := Seq(
// ShadeRule.rename("org.apache.hadoop.hive.**" -> "com.qubole.spark.hive.@1").inAll
//)

libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.4.3" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.4.3" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-hive" % "2.4.3" % "provided"

//libraryDependencies += "org.apache.hive" % "hive-metastore" % "3.1.1"


//assemblyMergeStrategy in assembly := {
// case PathList("javax", "inject", xs @ _*) => MergeStrategy.last
// case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last
// case PathList("javax", "activation", xs @ _*) => MergeStrategy.last
// case PathList("javax", "jdo", xs @ _*) => MergeStrategy.last
// case PathList("org", "apache", "log4j", xs @ _*) => MergeStrategy.last
// case PathList("com", "google", xs @ _*) => MergeStrategy.last
// case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last
// case PathList("com", "codahale", xs @ _*) => MergeStrategy.last
// case PathList("com", "yammer", xs @ _*) => MergeStrategy.last
// case PathList("org","aopalliance", xs @ _*) => MergeStrategy.last
// case PathList("com","zaxxer", xs @ _*) => MergeStrategy.last
// case PathList("org","apache", "logging", "log4j", xs @ _*) => MergeStrategy.last
// case PathList("io","netty", xs @ _*) => MergeStrategy.last
// case PathList("org","datanucleus", xs @ _*) => MergeStrategy.last
// case PathList("org", "apache", "arrow", xs @ _*) => MergeStrategy.last
// case PathList("org", "slf4j", "impl", xs @ _*) => MergeStrategy.last
// //case "about.html" => MergeStrategy.rename
// case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last
// case "META-INF/mailcap" => MergeStrategy.last
// case "META-INF/mimetypes.default" => MergeStrategy.last
// case "plugin.properties" => MergeStrategy.last
// case "log4j.properties" => MergeStrategy.last
// case "Log4j2Plugins.dat" => MergeStrategy.last
// case "git.properties" => MergeStrategy.last
// case "plugin.xml" => MergeStrategy.last
// case "META-INF/io.netty.versions.properties" => MergeStrategy.last
// case "META-INF/org/apache/logging/log4j/core/config/plugins/Log4j2Plugins.dat" => MergeStrategy.last
// case x =>
// val oldStrategy = (assemblyMergeStrategy in assembly).value
// oldStrategy(x)
//}
1 change: 1 addition & 0 deletions acid-datasource/project/assembly.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9")
1 change: 1 addition & 0 deletions acid-datasource/project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version = 1.2.8
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qubole.spark

import java.util.Locale

import com.fasterxml.jackson.annotation.JsonIgnore
import com.qubole.shaded.hive.conf.HiveConf
import com.qubole.shaded.hive.metastore.HiveMetaStoreClient

import scala.util.{Failure, Success, Try}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart, SparkListenerJobEnd}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, TextBasedFileFormat}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{DataType, MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.spark.sql.hive.client.HiveClientImpl
import com.qubole.shaded.hive.metastore.api.Table
import com.qubole.shaded.hive.metastore.api.FieldSchema
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils}
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}

import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer

class HiveAcidDataSource
extends RelationProvider
// with CreatableRelationProvider
// with PrunedFilteredScan
with DataSourceRegister
with Logging {

var _acid_state: HiveAcidFileIndex = _
var _client: HiveMetaStoreClient = _

var _dataSchema: StructType = _
var _partitionSchema: StructType = _


override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val tableName = parameters.getOrElse("table", {
throw HiveAcidErrors.tableNotSpecifiedException
})

_client = new HiveMetaStoreClient(new HiveConf(), null, false)

val hTable: Table = _client.getTable(tableName.split('.')(0), tableName.split('.')(1))
val cols = hTable.getSd.getCols
val partitionCols = hTable.getPartitionKeys

_dataSchema = StructType(cols.toList.map(fromHiveColumn).toArray)
_partitionSchema = StructType(partitionCols.map(fromHiveColumn).toArray)


if (_acid_state == null) {
_acid_state = new HiveAcidFileIndex(sqlContext.sparkSession, hTable,
sqlContext.sparkSession.sessionState.conf.defaultSizeInBytes, _client, _partitionSchema)
}

registerQEListener(sqlContext)

HadoopFsRelation(
_acid_state,
partitionSchema = _partitionSchema,
dataSchema = _dataSchema,
bucketSpec = None,
fileFormat = new TextFileFormat(),
options = Map.empty)(sqlContext.sparkSession)
}


def registerQEListener(sqlContext: SQLContext): Unit = {
sqlContext.sparkSession.listenerManager.register(new QueryExecutionListener {
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
// scalastyle:off println
println("Somani job end, closing _acid_state");
// scalastyle:on println
if (_acid_state != null) {
_acid_state.close()
}
}

override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
// scalastyle:off println
println("Somani job end, closing _acid_state");
// scalastyle:on println
if (_acid_state != null) {
_acid_state.close()
}
}
})
}

override def shortName(): String = {
HiveAcidUtils.NAME
}

def fromHiveColumn(hc: FieldSchema): StructField = {
val columnType = getSparkSQLDataType(hc)
val metadata = if (hc.getType != columnType.catalogString) {
new MetadataBuilder().putString(HIVE_TYPE_STRING, hc.getType).build()
} else {
Metadata.empty
}

val field = StructField(
name = hc.getName,
dataType = columnType,
nullable = true,
metadata = metadata)
Option(hc.getComment).map(field.withComment).getOrElse(field)
}

/** Get the Spark SQL native DataType from Hive's FieldSchema. */
private def getSparkSQLDataType(hc: FieldSchema): DataType = {
try {
CatalystSqlParser.parseDataType(hc.getType)
} catch {
case e: ParseException =>
throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
}
}

// override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
// // Rely on a type erasure hack to pass RDD[InternalRow] back as RDD[Row]
// JDBCRDD.scanTable(
// sparkSession.sparkContext,
// schema,`
// requiredColumns,
// filters,
// parts,
// jdbcOptions).asInstanceOf[RDD[Row]]
// }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qubole.spark

object HiveAcidErrors {
def tableNotSpecifiedException: Throwable = {
new IllegalArgumentException("'table' is not specified")
}

def tableNotAcidException: Throwable = {
new IllegalArgumentException("The specified table is not an acid table")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qubole.spark

import org.apache.hadoop.conf.Configuration
import com.qubole.shaded.hive.conf.HiveConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FileIndex, FileStatusCache, InMemoryFileIndex, PartitionDirectory, PartitionPath, PartitionSpec, PrunedInMemoryFileIndex}
import com.qubole.shaded.hive.metastore.HiveMetaStoreClient
import com.qubole.shaded.hive.metastore.api.MetaException
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, GenericInternalRow, Literal}
import com.qubole.shaded.hive.metastore.api.MetaException
import org.apache.thrift.TException
import com.qubole.shaded.hive.metastore.api.Table
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String

import scala.collection.JavaConversions._

class HiveAcidFileIndex (sparkSession: SparkSession,
val table: Table,
val sizeInBytes: Long,
val client: HiveMetaStoreClient,
val pSchema: StructType) extends FileIndex {


var _txnId: Long = -1
override def rootPaths: Seq[Path] = Seq(new Path(table.getSd.getLocation)) //.map(new Path(_)).toSeq

private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)

override def refresh(): Unit = fileStatusCache.invalidateAll()

override def partitionSchema: StructType = pSchema

override def inputFiles: Array[String] = new InMemoryFileIndex(
sparkSession, rootPaths, Option(table.getSd.getSerdeInfo.getParameters)
.map(_.toMap).orNull, userSpecifiedSchema = None).inputFiles

override def listFiles(
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
if (_txnId == -1) {
_txnId = client.openTxn("HiveAcidDataSource")
}
// Somani: Need to take care partitioning here
var validTxns = client.getValidTxns(_txnId)
var validWriteIds = client.getValidWriteIds(Seq(table.getDbName + "." + table.getTableName), client.getValidTxns(_txnId).writeToString())
new InMemoryFileIndex(
sparkSession, rootPaths, Option(table.getSd.getSerdeInfo.getParameters)
.map(_.toMap).orNull, userSpecifiedSchema = None).listFiles(Nil, dataFilters)
}

def close(): Unit = {
if (_txnId != -1) {
client.commitTxn(_txnId)
_txnId = -1
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qubole.spark

object HiveAcidUtils {
val NAME = "hiveAcid"
}

0 comments on commit 0d2d73a

Please sign in to comment.