Skip to content

Commit

Permalink
Merge pull request byzer-org#981 from allwefantasy/TRY
Browse files Browse the repository at this point in the history
Try
  • Loading branch information
allwefantasy authored Mar 21, 2019
2 parents 469c0c1 + baf59a4 commit ab61da0
Show file tree
Hide file tree
Showing 42 changed files with 1,346 additions and 668 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,4 @@ dev/sync-doc.sh
docs/gitbook/zh/_book/
contri/streamingpro-automl/logs/
/tmp
streamingpro-mlsql/src/main/java/streaming/core/WilliamLocalSparkServiceApp.scala
12 changes: 9 additions & 3 deletions dev/package.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ usage: package
run package command based on different spark version.
Inputs are specified with the following environment variables:
MLSQL_SPARK_VERSION - the spark version, 2.2/2.3/2.4
DRY_RUN true|false
DISTRIBUTION true|false
MLSQL_SPARK_VERSION - the spark version, 2.2/2.3/2.4 default 2.3
DRY_RUN true|false default false
DISTRIBUTION true|false default false
DATASOURCE_INCLUDED true|false default false
EOF
exit 1
}
Expand All @@ -29,6 +30,7 @@ MLSQL_SPARK_VERSION=${MLSQL_SPARK_VERSION:-2.3}
DRY_RUN=${DRY_RUN:-false}
DISTRIBUTION=${DISTRIBUTION:-false}
OSS_ENABLE=${OSS_ENABLE:-false}
DATASOURCE_INCLUDED=${DATASOURCE_INCLUDED:-false}
COMMAND=${COMMAND:-package}

for env in MLSQL_SPARK_VERSION DRY_RUN DISTRIBUTION; do
Expand Down Expand Up @@ -82,6 +84,10 @@ if [[ "${OSS_ENABLE}" == "true" ]];then
BASE_PROFILES="$BASE_PROFILES -Poss-support"
fi

if [[ "$DATASOURCE_INCLUDED" == "true" ]];then
BASE_PROFILES="$BASE_PROFILES -Punit-test"
fi

if [[ ${DRY_RUN} == "true" ]];then

cat << EOF
Expand Down
4 changes: 2 additions & 2 deletions docs/gitbook/zh/stream/datasource.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ and password="----"
as mysql1;
save append table21
as jdbc.`mysql1.test1`
as streamJDBC.`mysql1.test1`
options mode="Complete"
and `driver-statement-0`="create table if not exists test1(k TEXT,c BIGINT)"
and `statement-0`="insert into wow.test1(k,c) values(?,?)"
Expand Down Expand Up @@ -136,7 +136,7 @@ as table21;

-- save the data to mysql.
save append table21
as jdbc.`mysql1.test1`
as streamJDBC.`mysql1.test1`
options mode="Complete"
and `driver-statement-0`="create table if not exists test1(k TEXT,c BIGINT)"
and `statement-0`="insert into wow.test1(k,c) values(?,?)"
Expand Down
4 changes: 2 additions & 2 deletions docs/gitbook/zh/stream/stream_mysql_update.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ as groupTable;

```sql
save append groupTable
as jdbc.`mydb.patient`
as streamJDBC.`mydb.patient`
options mode="update"
-- call procedure 调用存储过程
and `statement-0`="call test_proc(?,?,?)"
Expand All @@ -103,7 +103,7 @@ and checkpointLocation="/streamingpro-test/kafka/patient/mysql/update";
select dt,addr,num, dt as dt1, addr as addr2 from groupTable as outputTable;
save append outputTable
as jdbc.`mydb.patient`
as streamJDBC.`mydb.patient`
options mode="update"
and `statement-0`="insert into patientUpdate(dt,addr,num) value(?,?,?) ON DUPLICATE KEY UPDATE dt=?,addr=?,num=?;"
and duration="5"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,23 +85,25 @@ object TableType {
val HBASE = TableTypeMeta("hbase", Set("hbase"))
val HDFS = TableTypeMeta("hdfs", Set("parquet", "json", "csv", "image", "text", "xml", "excel"))
val HTTP = TableTypeMeta("http", Set("http"))
val JDBC = TableTypeMeta("jdbc", Set("jdbc"))
val JDBC = TableTypeMeta("jdbc", Set("jdbc", "streamJDBC"))
val ES = TableTypeMeta("es", Set("es"))
val REDIS = TableTypeMeta("redis", Set("redis"))
val KAFKA = TableTypeMeta("kafka", Set("kafka", "kafka8", "kafka9"))
val SOCKET = TableTypeMeta("socket", Set("socket"))
val MONGO = TableTypeMeta("mongo", Set("mongo"))
val SOLR = TableTypeMeta("solr", Set("solr"))
val TEMP = TableTypeMeta("temp", Set("temp", "jsonStr", "script", "csvStr", "mockStream", "console"))
val API = TableTypeMeta("api", Set("mlsqlAPI", "mlsqlConf"))
val WEB = TableTypeMeta("web", Set("crawlersql"))
val GRAMMAR = TableTypeMeta("grammar", Set("grammar"))
val SYSTEM = TableTypeMeta("system", Set("_mlsql_", "model", "modelList", "modelParams", "modelExample", "modelExplain"))
val UNKNOW = TableTypeMeta("unknow", Set("unknow"))

def from(str: String) = {
List(REDIS, HIVE, HBASE, HDFS, HTTP, JDBC, ES, MONGO, SOLR, TEMP, API, WEB, GRAMMAR, SYSTEM).filter(f => f.includes.contains(str)).headOption
List(UNKNOW, KAFKA, SOCKET, REDIS, HIVE, HBASE, HDFS, HTTP, JDBC, ES, MONGO, SOLR, TEMP, API, WEB, GRAMMAR, SYSTEM).filter(f => f.includes.contains(str)).headOption
}

def toList = {
List(REDIS, HIVE, HBASE, HDFS, HTTP, JDBC, ES, MONGO, SOLR, TEMP, API, WEB, GRAMMAR, SYSTEM).flatMap(f => f.includes.toSeq)
List(UNKNOW, KAFKA, SOCKET, REDIS, HIVE, HBASE, HDFS, HTTP, JDBC, ES, MONGO, SOLR, TEMP, API, WEB, GRAMMAR, SYSTEM).flatMap(f => f.includes.toSeq)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,21 @@ abstract class MLSQLBaseFileSource extends MLSQLSource with MLSQLSink with MLSQL
}


override def save(writer: DataFrameWriter[Row], config: DataSinkConfig): Unit = {
override def save(writer: DataFrameWriter[Row], config: DataSinkConfig): Any = {
val context = ScriptSQLExec.contextGetOrForTest()
val format = config.config.getOrElse("implClass", fullFormat)
writer.options(rewriteConfig(config.config)).format(format).save(resourceRealPath(context.execListener, Option(context.owner), config.path))
}

override def register(): Unit = {
DataSourceRegistry.register(MLSQLDataSourceKey(fullFormat, MLSQLSparkDataSourceType), this)
DataSourceRegistry.register(MLSQLDataSourceKey(shortFormat, MLSQLSparkDataSourceType), this)
}

override def sourceInfo(config: DataAuthConfig): SourceInfo = {
val context = ScriptSQLExec.contextGetOrForTest()
val owner = config.config.get("owner").getOrElse(context.owner)
SourceInfo(shortFormat, "", resourceRealPath(context.execListener, Option(owner), config.path))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package streaming.core.datasource

import java.util.concurrent.TimeUnit

import org.apache.spark.sql.streaming.{DataStreamWriter, Trigger}
import org.apache.spark.sql.{DataFrameWriter, Row}
import streaming.dsl.{DslTool, ScriptSQLExec}

/**
* 2019-03-20 WilliamZhu([email protected])
*/
abstract class MLSQLBaseStreamSource extends MLSQLSource with MLSQLSink with MLSQLSourceInfo with MLSQLRegistry with DslTool {

def rewriteConfig(config: Map[String, String]) = {
config
}


override def save(batchWriter: DataFrameWriter[Row], config: DataSinkConfig): Any = {
val oldDF = config.df.get
var option = config.config
if (option.contains("fileNum")) {
option -= "fileNum"
}

val writer: DataStreamWriter[Row] = oldDF.writeStream
var path = config.path

val Array(db, table) = parseRef(aliasFormat, path, (options: Map[String, String]) => {
writer.options(options)
})

path = table

require(option.contains("checkpointLocation"), "checkpointLocation is required")
require(option.contains("duration"), "duration is required")
require(option.contains("mode"), "mode is required")

if (option.contains("partitionByCol")) {
val cols = option("partitionByCol").split(",").filterNot(f => f.isEmpty)
if (cols.size != 0) {
writer.partitionBy(option("partitionByCol").split(","): _*)
}
option -= "partitionByCol"
}

val duration = option("duration").toInt
option -= "duration"


val mode = option("mode")
option -= "mode"

val format = config.config.getOrElse("implClass", fullFormat)

writer.format(format).outputMode(mode).options(option)

val dbtable = if (option.contains("dbtable")) option("dbtable") else path

if (dbtable != null && dbtable != "-") {
writer.option("path", dbtable)
}

ScriptSQLExec.contextGetOrForTest().execListener.env().get("streamName") match {
case Some(name) => writer.queryName(name)
case None =>
}
writer.trigger(Trigger.ProcessingTime(duration, TimeUnit.SECONDS)).start()
}


override def register(): Unit = {
DataSourceRegistry.register(MLSQLDataSourceKey(fullFormat, MLSQLSparkDataSourceType), this)
DataSourceRegistry.register(MLSQLDataSourceKey(shortFormat, MLSQLSparkDataSourceType), this)
}

override def sourceInfo(config: DataAuthConfig): SourceInfo = {

val Array(db, table) = config.path.split("\\.") match {
case Array(db, table) => Array(db, table)
case Array(table) => Array("", table)
}
SourceInfo(shortFormat, db, table)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package streaming.core.datasource

import org.apache.spark.sql._
import _root_.streaming.dsl.MLSQLExecuteContext
import org.apache.spark.sql._

/**
* 2018-12-20 WilliamZhu([email protected])
Expand All @@ -34,6 +34,10 @@ trait MLSQLDataSource {

def shortFormat: String

def aliasFormat: String = {
shortFormat
}

}

trait MLSQLSource extends MLSQLDataSource with MLSQLSourceInfo {
Expand All @@ -48,15 +52,15 @@ trait RewriteableSource {
}

trait MLSQLSink extends MLSQLDataSource {
def save(writer: DataFrameWriter[Row], config: DataSinkConfig): Unit
def save(writer: DataFrameWriter[Row], config: DataSinkConfig): Any
}

trait MLSQLDirectSource extends MLSQLDataSource {
def load(reader: DataFrameReader, config: DataSourceConfig): DataFrame
}

trait MLSQLDirectSink extends MLSQLDataSource {
def save(writer: DataFrameWriter[Row], config: DataSinkConfig): Unit
def save(writer: DataFrameWriter[Row], config: DataSinkConfig): Any
}

case class SourceInfo(sourceType: String, db: String, table: String)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package streaming.core.datasource.impl

import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, Row}
import streaming.core.datasource._
import streaming.dsl.mmlib.algs.param.{BaseParams, WowParams}

/**
* 2019-03-20 WilliamZhu([email protected])
*/
class MLSQLCarbondata(override val uid: String) extends MLSQLBaseFileSource with WowParams {
def this() = this(BaseParams.randomUID())


override def load(reader: DataFrameReader, config: DataSourceConfig): DataFrame = {
val format = config.config.getOrElse("implClass", fullFormat)
reader.options(config.config).format(format).table(config.path)
}


override def save(writer: DataFrameWriter[Row], config: DataSinkConfig): Unit = {
val Array(db, table) = parseRef(shortFormat, config.path, (options: Map[String, String]) => {
writer.options(options)
})

if (db.isEmpty) {
writer.option("tableName", table)
} else {
writer.option("tableName", db).option("dbName", table)
}

val format = config.config.getOrElse("implClass", fullFormat)
writer.options(rewriteConfig(config.config)).format(format).save()
}

override def sourceInfo(config: DataAuthConfig): SourceInfo = {
val Array(db, table) = parseRef(shortFormat, config.path, (options: Map[String, String]) => {
})
SourceInfo(shortFormat, db, table)
}

override def fullFormat: String = "org.apache.spark.sql.CarbonSource"

override def shortFormat: String = "carbondata"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package streaming.core.datasource.impl

import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, Row}
import streaming.core.datasource.{DataSinkConfig, DataSourceConfig, MLSQLBaseStreamSource}
import streaming.dsl.ScriptSQLExec
import streaming.dsl.mmlib.algs.param.{BaseParams, WowParams}

/**
* 2019-03-20 WilliamZhu([email protected])
*/
class MLSQLConsole(override val uid: String) extends MLSQLBaseStreamSource with WowParams {
def this() = this(BaseParams.randomUID())

override def load(reader: DataFrameReader, config: DataSourceConfig): DataFrame = {
throw new RuntimeException(s"load is not support with ${shortFormat} ")
}

def isStream = {
val context = ScriptSQLExec.contextGetOrForTest()
context.execListener.env().contains("streamName")
}


override def save(batchWriter: DataFrameWriter[Row], config: DataSinkConfig): Any = {

if (isStream) {
return super.save(batchWriter, config)

}
throw new RuntimeException(s"save is not support with ${shortFormat} in batch mode")

}

override def fullFormat: String = "console"

override def shortFormat: String = "console"

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package streaming.core.datasource.impl

import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, Row}
import streaming.core.datasource._
import streaming.dsl.mmlib.algs.param.{BaseParams, WowParams}

/**
* 2019-03-20 WilliamZhu([email protected])
*/
class MLSQLCrawlerSql(override val uid: String) extends MLSQLBaseFileSource with WowParams {
def this() = this(BaseParams.randomUID())


override def load(reader: DataFrameReader, config: DataSourceConfig): DataFrame = {
reader.option("path", config.path).options(rewriteConfig(config.config)).format("org.apache.spark.sql.execution.datasources.crawlersql").load()
}

override def save(writer: DataFrameWriter[Row], config: DataSinkConfig): Unit = {
throw new RuntimeException(s"save is not supported in ${shortFormat}")
}

override def fullFormat: String = "crawlersql"

override def shortFormat: String = fullFormat

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package streaming.core.datasource.impl

import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, Row}
import streaming.core.datasource._
import streaming.dsl.ScriptSQLExec
import streaming.dsl.mmlib.algs.param.{BaseParams, WowParams}

/**
* 2019-03-20 WilliamZhu([email protected])
*/
class MLSQLCsvStr(override val uid: String) extends MLSQLBaseFileSource with WowParams {
def this() = this(BaseParams.randomUID())


override def load(reader: DataFrameReader, config: DataSourceConfig): DataFrame = {
val context = ScriptSQLExec.contextGetOrForTest()
val items = cleanBlockStr(context.execListener.env()(cleanStr(config.path))).split("\n")
val spark = config.df.get.sparkSession
import spark.implicits._
reader.options(rewriteConfig(config.config)).csv(spark.createDataset[String](items))
}

override def save(writer: DataFrameWriter[Row], config: DataSinkConfig): Unit = {
throw new RuntimeException(s"save is not supported in ${shortFormat}")
}

override def fullFormat: String = "csvStr"

override def shortFormat: String = fullFormat

}

Loading

0 comments on commit ab61da0

Please sign in to comment.