Skip to content

Commit

Permalink
Merge pull request byzer-org#815 from allwefantasy/ISSUE-814
Browse files Browse the repository at this point in the history
Refractor datasource in LoadAdaptor and SaveAdaptor
  • Loading branch information
allwefantasy authored Dec 20, 2018
2 parents 2ef9cb7 + e4c467f commit 77e4b87
Show file tree
Hide file tree
Showing 9 changed files with 430 additions and 124 deletions.
11 changes: 11 additions & 0 deletions streamingpro-mlsql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,17 @@
</dependencies>
</profile>

<profile>
<id>unit-test</id>
<dependencies>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>6.5.0</version>
</dependency>
</dependencies>
</profile>

<profile>
<id>streamingpro-spark-2.2.0-adaptor</id>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package streaming.core.datasource

import com.google.common.reflect.ClassPath
import org.apache.spark.sql.SaveMode
import streaming.log.Logging

import scala.collection.JavaConverters._

/**
* 2018-12-20 WilliamZhu([email protected])
*/
object DataSourceRegistry extends Logging {
private val registry = new java.util.concurrent.ConcurrentHashMap[String, MLSQLDataSource]()

def register(name: String, obj: MLSQLDataSource) = {
registry.put(name, obj)

}

def fetch(name: String): Option[MLSQLDataSource] = {
if (registry.containsKey(name)) {
Option(registry.get(name))
} else None
}

private def registerFromPackage(name: String) = {
ClassPath.from(getClass.getClassLoader).getTopLevelClasses(name).asScala.foreach { clzz =>
val dataSource = Class.forName(clzz.getName).newInstance()
if (dataSource.isInstanceOf[MLSQLRegistry]) {
dataSource.asInstanceOf[MLSQLRegistry].register()
} else {
logWarning(
s"""
|${clzz.getName} does not implement MLSQLRegistry,
|we cannot register it automatically.
""".stripMargin)
}
}
}

registerFromPackage("streaming.core.datasource.impl")
registerFromPackage("streaming.contri.datasource.impl")
}

trait MLSQLRegistry {
def register(): Unit
}

case class DataSourceConfig(path: String, config: Map[String, String])

case class DataSinkConfig(path: String, config: Map[String, String], mode: SaveMode)
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package streaming.core.datasource

import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, Row}

/**
* 2018-12-20 WilliamZhu([email protected])
*/

trait MLSQLDataSource {
def dbSplitter = {
"\\."
}

def fullFormat: String

def shortFormat: String

}

trait MLSQLSource extends MLSQLDataSource {
def load(reader: DataFrameReader, config: DataSourceConfig): DataFrame
}

trait MLSQLSink extends MLSQLDataSource {
def save(writer: DataFrameWriter[Row], config: DataSinkConfig): Unit
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package streaming.core.datasource.impl

import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, Row}
import streaming.core.datasource._
import streaming.dsl.ScriptSQLExec

class MLSQLElasticSearch extends MLSQLSource with MLSQLSink with MLSQLRegistry {


override def fullFormat: String = "org.elasticsearch.spark.sql"

override def shortFormat: String = "es"

override def dbSplitter: String = "/"

override def load(reader: DataFrameReader, config: DataSourceConfig): DataFrame = {
var dbtable = config.path
// if contains splitter, then we will try to find dbname in dbMapping.
// otherwize we will do nothing since elasticsearch use something like index/type
// it will do no harm.
if (config.path.contains(dbSplitter)) {
val Array(_dbname, _dbtable) = config.path.split(dbSplitter, 2)
if (ScriptSQLExec.dbMapping.containsKey(_dbname)) {
dbtable = _dbtable
ScriptSQLExec.dbMapping.get(_dbname).foreach { f =>
reader.option(f._1, f._2)
}
}
}
//load configs should overwrite connect configs
reader.options(config.config)
reader.format(config.config.getOrElse("implClass", fullFormat)).load(dbtable)
}

override def save(writer: DataFrameWriter[Row], config: DataSinkConfig): Unit = {
var dbtable = config.path
// if contains splitter, then we will try to find dbname in dbMapping.
// otherwize we will do nothing since elasticsearch use something like index/type
// it will do no harm.
if (config.path.contains(dbSplitter)) {
val Array(_dbname, _dbtable) = config.path.split(dbSplitter, 2)
if (ScriptSQLExec.dbMapping.containsKey(_dbname)) {
dbtable = _dbtable
ScriptSQLExec.dbMapping.get(_dbname).foreach { f =>
writer.option(f._1, f._2)
}
}
}
writer.mode(config.mode)
//load configs should overwrite connect configs
writer.options(config.config)
config.config.get("partitionByCol").map { item =>
writer.partitionBy(item.split(","): _*)
}
writer.format(config.config.getOrElse("implClass", fullFormat)).save(dbtable)
}

override def register(): Unit = {
DataSourceRegistry.register(fullFormat, this)
DataSourceRegistry.register(shortFormat, this)
}
}
112 changes: 56 additions & 56 deletions streamingpro-mlsql/src/main/java/streaming/dsl/LoadAdaptor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package streaming.dsl

import net.sf.json.JSONObject
import org.apache.spark.sql.{DataFrame, functions => F}
import streaming.core.datasource.{DataSourceConfig, DataSourceRegistry, MLSQLSource}
import streaming.dsl.load.batch.{AutoWorkflowSelfExplain, MLSQLAPIExplain, MLSQLConfExplain, ModelSelfExplain}
import streaming.dsl.parser.DSLSQLParser._
import streaming.dsl.template.TemplateMerge
Expand Down Expand Up @@ -82,69 +83,68 @@ class BatchLoadAdaptor(scriptSQLExecListener: ScriptSQLExecListener,
reader.options(option)
path = TemplateMerge.merge(path, scriptSQLExecListener.env().toMap)
val resourceOwner = option.get("owner")
format match {
case "jdbc" =>
val (dbname, dbtable) = parseDBAndTableFromStr(path)
if (ScriptSQLExec.dbMapping.containsKey(dbname)) {
ScriptSQLExec.dbMapping.get(dbname).foreach { f =>
reader.option(f._1, f._2)

DataSourceRegistry.fetch(format).map { datasource =>
table = datasource.asInstanceOf[MLSQLSource].load(reader, DataSourceConfig(cleanStr(path), option))
}.getOrElse {
format match {
case "jdbc" =>
val (dbname, dbtable) = parseDBAndTableFromStr(path)
if (ScriptSQLExec.dbMapping.containsKey(dbname)) {
ScriptSQLExec.dbMapping.get(dbname).foreach { f =>
reader.option(f._1, f._2)
}
}
}
reader.option("dbtable", dbtable)
table = reader.format("jdbc").load()

case "es" | "org.elasticsearch.spark.sql" =>
val (dbname, dbtable) = parseDBAndTableFromStr(path)
if (ScriptSQLExec.dbMapping.containsKey(dbname)) {
ScriptSQLExec.dbMapping.get(dbname).foreach { f =>
reader.option(f._1, f._2)
reader.option("dbtable", dbtable)
table = reader.format("jdbc").load()

case "hbase" | "org.apache.spark.sql.execution.datasources.hbase" =>
table = reader.format("org.apache.spark.sql.execution.datasources.hbase").load()
case "crawlersql" =>
table = reader.option("path", cleanStr(path)).format("org.apache.spark.sql.execution.datasources.crawlersql").load()
case "image" =>
val resourcePath = resourceRealPath(scriptSQLExecListener, resourceOwner, path)
table = reader.option("path", resourcePath).format("streaming.dsl.mmlib.algs.processing.image").load()
case "jsonStr" =>
val items = cleanBlockStr(scriptSQLExecListener.env()(cleanStr(path))).split("\n")
import sparkSession.implicits._
table = reader.json(sparkSession.createDataset[String](items))
case "script" =>
val items = List(cleanBlockStr(scriptSQLExecListener.env()(cleanStr(path)))).map { f =>
val obj = new JSONObject()
obj.put("content", f)
obj.toString()
}
}
table = reader.format("org.elasticsearch.spark.sql").load(dbtable)
case "hbase" | "org.apache.spark.sql.execution.datasources.hbase" =>
table = reader.format("org.apache.spark.sql.execution.datasources.hbase").load()
case "crawlersql" =>
table = reader.option("path", cleanStr(path)).format("org.apache.spark.sql.execution.datasources.crawlersql").load()
case "image" =>
val resourcePath = resourceRealPath(scriptSQLExecListener, resourceOwner, path)
table = reader.option("path", resourcePath).format("streaming.dsl.mmlib.algs.processing.image").load()
case "jsonStr" =>
val items = cleanBlockStr(scriptSQLExecListener.env()(cleanStr(path))).split("\n")
import sparkSession.implicits._
table = reader.json(sparkSession.createDataset[String](items))
case "script" =>
val items = List(cleanBlockStr(scriptSQLExecListener.env()(cleanStr(path)))).map { f =>
val obj = new JSONObject()
obj.put("content", f)
obj.toString()
}
import sparkSession.implicits._
table = reader.json(sparkSession.createDataset[String](items))
case "hive" =>
table = reader.table(cleanStr(path))
case "text" =>
val resourcePath = resourceRealPath(scriptSQLExecListener, resourceOwner, path)
table = reader.text(resourcePath.split(","): _*)
case "xml" =>
val resourcePath = resourceRealPath(scriptSQLExecListener, resourceOwner, path)
table = reader.option("path", resourcePath).format("com.databricks.spark.xml").load()
case "mlsqlAPI" =>
table = new MLSQLAPIExplain(sparkSession).explain
case "mlsqlConf" =>
table = new MLSQLConfExplain(sparkSession).explain
case _ =>
import sparkSession.implicits._
table = reader.json(sparkSession.createDataset[String](items))
case "hive" =>
table = reader.table(cleanStr(path))
case "text" =>
val resourcePath = resourceRealPath(scriptSQLExecListener, resourceOwner, path)
table = reader.text(resourcePath.split(","): _*)
case "xml" =>
val resourcePath = resourceRealPath(scriptSQLExecListener, resourceOwner, path)
table = reader.option("path", resourcePath).format("com.databricks.spark.xml").load()
case "mlsqlAPI" =>
table = new MLSQLAPIExplain(sparkSession).explain
case "mlsqlConf" =>
table = new MLSQLConfExplain(sparkSession).explain
case _ =>

// calculate resource real absolute path
val resourcePath = resourceRealPath(scriptSQLExecListener, resourceOwner, path)
// calculate resource real absolute path
val resourcePath = resourceRealPath(scriptSQLExecListener, resourceOwner, path)

table = ModelSelfExplain(format, cleanStr(path), option, sparkSession).isMatch.thenDo.orElse(() => {
table = ModelSelfExplain(format, cleanStr(path), option, sparkSession).isMatch.thenDo.orElse(() => {

AutoWorkflowSelfExplain(format, cleanStr(path), option, sparkSession).isMatch.thenDo().orElse(() => {
reader.format(format).load(resourcePath)
}).get()
AutoWorkflowSelfExplain(format, cleanStr(path), option, sparkSession).isMatch.thenDo().orElse(() => {
reader.format(format).load(resourcePath)
}).get()

}).get
}).get
}
}


table.createOrReplaceTempView(tableName)
}
}
Expand Down
Loading

0 comments on commit 77e4b87

Please sign in to comment.