diff --git a/streamingpro-mlsql/pom.xml b/streamingpro-mlsql/pom.xml
index 45eda67c2..6fe9de80d 100644
--- a/streamingpro-mlsql/pom.xml
+++ b/streamingpro-mlsql/pom.xml
@@ -213,6 +213,17 @@
+
+ unit-test
+
+
+ org.elasticsearch
+ elasticsearch-spark-20_2.11
+ 6.5.0
+
+
+
+
streamingpro-spark-2.2.0-adaptor
diff --git a/streamingpro-mlsql/src/main/java/streaming/core/datasource/DataSourceRegistry.scala b/streamingpro-mlsql/src/main/java/streaming/core/datasource/DataSourceRegistry.scala
new file mode 100644
index 000000000..fff5b1983
--- /dev/null
+++ b/streamingpro-mlsql/src/main/java/streaming/core/datasource/DataSourceRegistry.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package streaming.core.datasource
+
+import com.google.common.reflect.ClassPath
+import org.apache.spark.sql.SaveMode
+import streaming.log.Logging
+
+import scala.collection.JavaConverters._
+
+/**
+ * 2018-12-20 WilliamZhu(allwefantasy@gmail.com)
+ */
+object DataSourceRegistry extends Logging {
+ private val registry = new java.util.concurrent.ConcurrentHashMap[String, MLSQLDataSource]()
+
+ def register(name: String, obj: MLSQLDataSource) = {
+ registry.put(name, obj)
+
+ }
+
+ def fetch(name: String): Option[MLSQLDataSource] = {
+ if (registry.containsKey(name)) {
+ Option(registry.get(name))
+ } else None
+ }
+
+ private def registerFromPackage(name: String) = {
+ ClassPath.from(getClass.getClassLoader).getTopLevelClasses(name).asScala.foreach { clzz =>
+ val dataSource = Class.forName(clzz.getName).newInstance()
+ if (dataSource.isInstanceOf[MLSQLRegistry]) {
+ dataSource.asInstanceOf[MLSQLRegistry].register()
+ } else {
+ logWarning(
+ s"""
+ |${clzz.getName} does not implement MLSQLRegistry,
+ |we cannot register it automatically.
+ """.stripMargin)
+ }
+ }
+ }
+
+ registerFromPackage("streaming.core.datasource.impl")
+ registerFromPackage("streaming.contri.datasource.impl")
+}
+
+trait MLSQLRegistry {
+ def register(): Unit
+}
+
+case class DataSourceConfig(path: String, config: Map[String, String])
+
+case class DataSinkConfig(path: String, config: Map[String, String], mode: SaveMode)
diff --git a/streamingpro-mlsql/src/main/java/streaming/core/datasource/MLSQLSource.scala b/streamingpro-mlsql/src/main/java/streaming/core/datasource/MLSQLSource.scala
new file mode 100644
index 000000000..8abdaf2b3
--- /dev/null
+++ b/streamingpro-mlsql/src/main/java/streaming/core/datasource/MLSQLSource.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package streaming.core.datasource
+
+import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, Row}
+
+/**
+ * 2018-12-20 WilliamZhu(allwefantasy@gmail.com)
+ */
+
+trait MLSQLDataSource {
+ def dbSplitter = {
+ "\\."
+ }
+
+ def fullFormat: String
+
+ def shortFormat: String
+
+}
+
+trait MLSQLSource extends MLSQLDataSource {
+ def load(reader: DataFrameReader, config: DataSourceConfig): DataFrame
+}
+
+trait MLSQLSink extends MLSQLDataSource {
+ def save(writer: DataFrameWriter[Row], config: DataSinkConfig): Unit
+}
+
diff --git a/streamingpro-mlsql/src/main/java/streaming/core/datasource/impl/MLSQLElasticSearch.scala b/streamingpro-mlsql/src/main/java/streaming/core/datasource/impl/MLSQLElasticSearch.scala
new file mode 100644
index 000000000..fcfead555
--- /dev/null
+++ b/streamingpro-mlsql/src/main/java/streaming/core/datasource/impl/MLSQLElasticSearch.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package streaming.core.datasource.impl
+
+import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, Row}
+import streaming.core.datasource._
+import streaming.dsl.ScriptSQLExec
+
+class MLSQLElasticSearch extends MLSQLSource with MLSQLSink with MLSQLRegistry {
+
+
+ override def fullFormat: String = "org.elasticsearch.spark.sql"
+
+ override def shortFormat: String = "es"
+
+ override def dbSplitter: String = "/"
+
+ override def load(reader: DataFrameReader, config: DataSourceConfig): DataFrame = {
+ var dbtable = config.path
+ // if contains splitter, then we will try to find dbname in dbMapping.
+ // otherwize we will do nothing since elasticsearch use something like index/type
+ // it will do no harm.
+ if (config.path.contains(dbSplitter)) {
+ val Array(_dbname, _dbtable) = config.path.split(dbSplitter, 2)
+ if (ScriptSQLExec.dbMapping.containsKey(_dbname)) {
+ dbtable = _dbtable
+ ScriptSQLExec.dbMapping.get(_dbname).foreach { f =>
+ reader.option(f._1, f._2)
+ }
+ }
+ }
+ //load configs should overwrite connect configs
+ reader.options(config.config)
+ reader.format(config.config.getOrElse("implClass", fullFormat)).load(dbtable)
+ }
+
+ override def save(writer: DataFrameWriter[Row], config: DataSinkConfig): Unit = {
+ var dbtable = config.path
+ // if contains splitter, then we will try to find dbname in dbMapping.
+ // otherwize we will do nothing since elasticsearch use something like index/type
+ // it will do no harm.
+ if (config.path.contains(dbSplitter)) {
+ val Array(_dbname, _dbtable) = config.path.split(dbSplitter, 2)
+ if (ScriptSQLExec.dbMapping.containsKey(_dbname)) {
+ dbtable = _dbtable
+ ScriptSQLExec.dbMapping.get(_dbname).foreach { f =>
+ writer.option(f._1, f._2)
+ }
+ }
+ }
+ writer.mode(config.mode)
+ //load configs should overwrite connect configs
+ writer.options(config.config)
+ config.config.get("partitionByCol").map { item =>
+ writer.partitionBy(item.split(","): _*)
+ }
+ writer.format(config.config.getOrElse("implClass", fullFormat)).save(dbtable)
+ }
+
+ override def register(): Unit = {
+ DataSourceRegistry.register(fullFormat, this)
+ DataSourceRegistry.register(shortFormat, this)
+ }
+}
diff --git a/streamingpro-mlsql/src/main/java/streaming/dsl/LoadAdaptor.scala b/streamingpro-mlsql/src/main/java/streaming/dsl/LoadAdaptor.scala
index c55599540..baedfee95 100644
--- a/streamingpro-mlsql/src/main/java/streaming/dsl/LoadAdaptor.scala
+++ b/streamingpro-mlsql/src/main/java/streaming/dsl/LoadAdaptor.scala
@@ -20,6 +20,7 @@ package streaming.dsl
import net.sf.json.JSONObject
import org.apache.spark.sql.{DataFrame, functions => F}
+import streaming.core.datasource.{DataSourceConfig, DataSourceRegistry, MLSQLSource}
import streaming.dsl.load.batch.{AutoWorkflowSelfExplain, MLSQLAPIExplain, MLSQLConfExplain, ModelSelfExplain}
import streaming.dsl.parser.DSLSQLParser._
import streaming.dsl.template.TemplateMerge
@@ -82,69 +83,68 @@ class BatchLoadAdaptor(scriptSQLExecListener: ScriptSQLExecListener,
reader.options(option)
path = TemplateMerge.merge(path, scriptSQLExecListener.env().toMap)
val resourceOwner = option.get("owner")
- format match {
- case "jdbc" =>
- val (dbname, dbtable) = parseDBAndTableFromStr(path)
- if (ScriptSQLExec.dbMapping.containsKey(dbname)) {
- ScriptSQLExec.dbMapping.get(dbname).foreach { f =>
- reader.option(f._1, f._2)
+
+ DataSourceRegistry.fetch(format).map { datasource =>
+ table = datasource.asInstanceOf[MLSQLSource].load(reader, DataSourceConfig(cleanStr(path), option))
+ }.getOrElse {
+ format match {
+ case "jdbc" =>
+ val (dbname, dbtable) = parseDBAndTableFromStr(path)
+ if (ScriptSQLExec.dbMapping.containsKey(dbname)) {
+ ScriptSQLExec.dbMapping.get(dbname).foreach { f =>
+ reader.option(f._1, f._2)
+ }
}
- }
- reader.option("dbtable", dbtable)
- table = reader.format("jdbc").load()
-
- case "es" | "org.elasticsearch.spark.sql" =>
- val (dbname, dbtable) = parseDBAndTableFromStr(path)
- if (ScriptSQLExec.dbMapping.containsKey(dbname)) {
- ScriptSQLExec.dbMapping.get(dbname).foreach { f =>
- reader.option(f._1, f._2)
+ reader.option("dbtable", dbtable)
+ table = reader.format("jdbc").load()
+
+ case "hbase" | "org.apache.spark.sql.execution.datasources.hbase" =>
+ table = reader.format("org.apache.spark.sql.execution.datasources.hbase").load()
+ case "crawlersql" =>
+ table = reader.option("path", cleanStr(path)).format("org.apache.spark.sql.execution.datasources.crawlersql").load()
+ case "image" =>
+ val resourcePath = resourceRealPath(scriptSQLExecListener, resourceOwner, path)
+ table = reader.option("path", resourcePath).format("streaming.dsl.mmlib.algs.processing.image").load()
+ case "jsonStr" =>
+ val items = cleanBlockStr(scriptSQLExecListener.env()(cleanStr(path))).split("\n")
+ import sparkSession.implicits._
+ table = reader.json(sparkSession.createDataset[String](items))
+ case "script" =>
+ val items = List(cleanBlockStr(scriptSQLExecListener.env()(cleanStr(path)))).map { f =>
+ val obj = new JSONObject()
+ obj.put("content", f)
+ obj.toString()
}
- }
- table = reader.format("org.elasticsearch.spark.sql").load(dbtable)
- case "hbase" | "org.apache.spark.sql.execution.datasources.hbase" =>
- table = reader.format("org.apache.spark.sql.execution.datasources.hbase").load()
- case "crawlersql" =>
- table = reader.option("path", cleanStr(path)).format("org.apache.spark.sql.execution.datasources.crawlersql").load()
- case "image" =>
- val resourcePath = resourceRealPath(scriptSQLExecListener, resourceOwner, path)
- table = reader.option("path", resourcePath).format("streaming.dsl.mmlib.algs.processing.image").load()
- case "jsonStr" =>
- val items = cleanBlockStr(scriptSQLExecListener.env()(cleanStr(path))).split("\n")
- import sparkSession.implicits._
- table = reader.json(sparkSession.createDataset[String](items))
- case "script" =>
- val items = List(cleanBlockStr(scriptSQLExecListener.env()(cleanStr(path)))).map { f =>
- val obj = new JSONObject()
- obj.put("content", f)
- obj.toString()
- }
- import sparkSession.implicits._
- table = reader.json(sparkSession.createDataset[String](items))
- case "hive" =>
- table = reader.table(cleanStr(path))
- case "text" =>
- val resourcePath = resourceRealPath(scriptSQLExecListener, resourceOwner, path)
- table = reader.text(resourcePath.split(","): _*)
- case "xml" =>
- val resourcePath = resourceRealPath(scriptSQLExecListener, resourceOwner, path)
- table = reader.option("path", resourcePath).format("com.databricks.spark.xml").load()
- case "mlsqlAPI" =>
- table = new MLSQLAPIExplain(sparkSession).explain
- case "mlsqlConf" =>
- table = new MLSQLConfExplain(sparkSession).explain
- case _ =>
+ import sparkSession.implicits._
+ table = reader.json(sparkSession.createDataset[String](items))
+ case "hive" =>
+ table = reader.table(cleanStr(path))
+ case "text" =>
+ val resourcePath = resourceRealPath(scriptSQLExecListener, resourceOwner, path)
+ table = reader.text(resourcePath.split(","): _*)
+ case "xml" =>
+ val resourcePath = resourceRealPath(scriptSQLExecListener, resourceOwner, path)
+ table = reader.option("path", resourcePath).format("com.databricks.spark.xml").load()
+ case "mlsqlAPI" =>
+ table = new MLSQLAPIExplain(sparkSession).explain
+ case "mlsqlConf" =>
+ table = new MLSQLConfExplain(sparkSession).explain
+ case _ =>
- // calculate resource real absolute path
- val resourcePath = resourceRealPath(scriptSQLExecListener, resourceOwner, path)
+ // calculate resource real absolute path
+ val resourcePath = resourceRealPath(scriptSQLExecListener, resourceOwner, path)
- table = ModelSelfExplain(format, cleanStr(path), option, sparkSession).isMatch.thenDo.orElse(() => {
+ table = ModelSelfExplain(format, cleanStr(path), option, sparkSession).isMatch.thenDo.orElse(() => {
- AutoWorkflowSelfExplain(format, cleanStr(path), option, sparkSession).isMatch.thenDo().orElse(() => {
- reader.format(format).load(resourcePath)
- }).get()
+ AutoWorkflowSelfExplain(format, cleanStr(path), option, sparkSession).isMatch.thenDo().orElse(() => {
+ reader.format(format).load(resourcePath)
+ }).get()
- }).get
+ }).get
+ }
}
+
+
table.createOrReplaceTempView(tableName)
}
}
diff --git a/streamingpro-mlsql/src/main/java/streaming/dsl/SaveAdaptor.scala b/streamingpro-mlsql/src/main/java/streaming/dsl/SaveAdaptor.scala
index 5bfb89722..504565564 100644
--- a/streamingpro-mlsql/src/main/java/streaming/dsl/SaveAdaptor.scala
+++ b/streamingpro-mlsql/src/main/java/streaming/dsl/SaveAdaptor.scala
@@ -20,11 +20,12 @@ package streaming.dsl
import java.util.concurrent.TimeUnit
-import _root_.streaming.dsl.parser.DSLSQLParser._
-import _root_.streaming.dsl.template.TemplateMerge
-import org.apache.spark.sql._
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.streaming.{DataStreamWriter, Trigger}
+import org.apache.spark.sql.{DataFrame, Row, SaveMode}
+import streaming.core.datasource.{DataSinkConfig, DataSourceRegistry, MLSQLSink}
+import streaming.dsl.parser.DSLSQLParser._
+import streaming.dsl.template.TemplateMerge
import scala.collection.mutable.ArrayBuffer
@@ -119,81 +120,88 @@ class BatchSaveAdaptor(val scriptSQLExecListener: ScriptSQLExecListener,
if (option.contains("fileNum")) {
oldDF = oldDF.repartition(option.getOrElse("fileNum", "").toString.toInt)
}
-
var writer = oldDF.write
- val dbAndTable = final_path.split("\\.")
- var connect_provied = false
- if (dbAndTable.length == 2 && ScriptSQLExec.dbMapping.containsKey(dbAndTable(0))) {
- ScriptSQLExec.dbMapping.get(dbAndTable(0)).foreach {
- f =>
- writer.option(f._1, f._2)
+ DataSourceRegistry.fetch(format).map { datasource =>
+ datasource.asInstanceOf[MLSQLSink].save(
+ writer,
+ DataSinkConfig(final_path, option ++ Map("partitionByCol" -> partitionByCol.mkString(",")),
+ mode))
+ }.getOrElse {
+ val dbAndTable = final_path.split("\\.")
+ var connect_provied = false
+ if (dbAndTable.length == 2 && ScriptSQLExec.dbMapping.containsKey(dbAndTable(0))) {
+ ScriptSQLExec.dbMapping.get(dbAndTable(0)).foreach {
+ f =>
+ writer.option(f._1, f._2)
+ }
+ connect_provied = true
}
- connect_provied = true
- }
- if (connect_provied) {
- final_path = dbAndTable(1)
- }
+ if (connect_provied) {
+ final_path = dbAndTable(1)
+ }
- writer = writer.format(format).mode(mode).partitionBy(partitionByCol: _*).options(option)
+ writer = writer.format(format).mode(mode).partitionBy(partitionByCol: _*).options(option)
- def getKafkaBrokers = {
- "metadata.broker.list" -> option.getOrElse("metadata.broker.list", "kafka.bootstrap.servers")
- }
+ def getKafkaBrokers = {
+ "metadata.broker.list" -> option.getOrElse("metadata.broker.list", "kafka.bootstrap.servers")
+ }
- format match {
- case "es" =>
- writer.format(
- option.getOrElse("implClass", "org.elasticsearch.spark.sql")).save(final_path)
- case "hive" =>
- writer.format(option.getOrElse("file_format", "parquet"))
- writer.saveAsTable(final_path)
-
- case "kafka8" | "kafka9" =>
-
- writer.option("topics", final_path).
- option(getKafkaBrokers._1, getKafkaBrokers._2).
- format("com.hortonworks.spark.sql.kafka08").save()
-
- case "kafka" =>
- writer.option("topic", final_path).
- option(getKafkaBrokers._1, getKafkaBrokers._2).format("kafka").save()
-
- case "hbase" =>
- writer.option("outputTableName", final_path).format(
- option.getOrElse("implClass", "org.apache.spark.sql.execution.datasources.hbase")).save()
- case "redis" =>
- writer.option("outputTableName", final_path).format(
- option.getOrElse("implClass", "org.apache.spark.sql.execution.datasources.redis")).save()
- case "jdbc" =>
- if (option.contains("idCol")) {
- import org.apache.spark.sql.jdbc.DataFrameWriterExtensions._
- val extraOptionsField = writer.getClass.getDeclaredField("extraOptions")
- extraOptionsField.setAccessible(true)
- val extraOptions = extraOptionsField.get(writer).asInstanceOf[scala.collection.mutable.HashMap[String, String]]
- val jdbcOptions = new JDBCOptions(extraOptions.toMap + ("dbtable" -> final_path))
- writer.upsert(option.get("idCol"), jdbcOptions, oldDF)
- } else {
- writer.option("dbtable", final_path).save()
- }
+ format match {
+ case "es" =>
+ writer.format(
+ option.getOrElse("implClass", "org.elasticsearch.spark.sql")).save(final_path)
+ case "hive" =>
+ writer.format(option.getOrElse("file_format", "parquet"))
+ writer.saveAsTable(final_path)
+
+ case "kafka8" | "kafka9" =>
+
+ writer.option("topics", final_path).
+ option(getKafkaBrokers._1, getKafkaBrokers._2).
+ format("com.hortonworks.spark.sql.kafka08").save()
+
+ case "kafka" =>
+ writer.option("topic", final_path).
+ option(getKafkaBrokers._1, getKafkaBrokers._2).format("kafka").save()
+
+ case "hbase" =>
+ writer.option("outputTableName", final_path).format(
+ option.getOrElse("implClass", "org.apache.spark.sql.execution.datasources.hbase")).save()
+ case "redis" =>
+ writer.option("outputTableName", final_path).format(
+ option.getOrElse("implClass", "org.apache.spark.sql.execution.datasources.redis")).save()
+ case "jdbc" =>
+ if (option.contains("idCol")) {
+ import org.apache.spark.sql.jdbc.DataFrameWriterExtensions._
+ val extraOptionsField = writer.getClass.getDeclaredField("extraOptions")
+ extraOptionsField.setAccessible(true)
+ val extraOptions = extraOptionsField.get(writer).asInstanceOf[scala.collection.mutable.HashMap[String, String]]
+ val jdbcOptions = new JDBCOptions(extraOptions.toMap + ("dbtable" -> final_path))
+ writer.upsert(option.get("idCol"), jdbcOptions, oldDF)
+ } else {
+ writer.option("dbtable", final_path).save()
+ }
- case "carbondata" =>
- if (dbAndTable.size == 2) {
- writer.option("tableName", dbAndTable(1)).option("dbName", dbAndTable(0))
- }
- if (dbAndTable.size == 1 && dbAndTable(0) != "-") {
- writer.option("tableName", dbAndTable(0))
- }
- writer.format(option.getOrElse("implClass", "org.apache.spark.sql.CarbonSource")).save()
- case _ =>
- if (final_path == "-" || final_path.isEmpty) {
- writer.format(option.getOrElse("implClass", format)).save()
- } else {
- writer.format(option.getOrElse("implClass", format)).save(final_path)
- }
+ case "carbondata" =>
+ if (dbAndTable.size == 2) {
+ writer.option("tableName", dbAndTable(1)).option("dbName", dbAndTable(0))
+ }
+ if (dbAndTable.size == 1 && dbAndTable(0) != "-") {
+ writer.option("tableName", dbAndTable(0))
+ }
+ writer.format(option.getOrElse("implClass", "org.apache.spark.sql.CarbonSource")).save()
+ case _ =>
+ if (final_path == "-" || final_path.isEmpty) {
+ writer.format(option.getOrElse("implClass", format)).save()
+ } else {
+ writer.format(option.getOrElse("implClass", format)).save(final_path)
+ }
+ }
}
+
}
}
diff --git a/streamingpro-mlsql/src/test/scala/streaming/test/datasource/DataSourceSpec.scala b/streamingpro-mlsql/src/test/scala/streaming/test/datasource/DataSourceSpec.scala
new file mode 100644
index 000000000..25d67c400
--- /dev/null
+++ b/streamingpro-mlsql/src/test/scala/streaming/test/datasource/DataSourceSpec.scala
@@ -0,0 +1,65 @@
+package streaming.test.datasource
+
+import org.apache.spark.streaming.BasicSparkOperation
+import org.scalatest.BeforeAndAfterAll
+import streaming.core.strategy.platform.SparkRuntime
+import streaming.core.{BasicMLSQLConfig, SpecFunctions}
+import streaming.dsl.ScriptSQLExec
+import streaming.log.Logging
+
+/**
+ * 2018-12-20 WilliamZhu(allwefantasy@gmail.com)
+ */
+class DataSourceSpec extends BasicSparkOperation with SpecFunctions with BasicMLSQLConfig with BeforeAndAfterAll with Logging {
+ "load es" should "work fine" in {
+
+ withBatchContext(setupBatchContext(batchParams, "classpath:///test/empty.json")) { runtime: SparkRuntime =>
+ //执行sql
+ implicit val spark = runtime.sparkSession
+
+ var sq = createSSEL
+ ScriptSQLExec.parse(
+ s"""
+ |
+ |set data='''
+ |{"jack":"cool"}
+ |''';
+ |
+ |load jsonStr.`data` as data1;
+ |
+ |save overwrite data1 as es.`twitter/cool` where
+ |`es.index.auto.create`="true"
+ |and es.nodes.wan.only="true"
+ |and es.nodes="127.0.0.1";
+ |
+ |load es.`twitter/cool` where
+ |and es.nodes="127.0.0.1"
+ |and es.nodes.wan.only="true"
+ |as table1;
+ |select * from table1 as output1;
+ |
+ |connect es where `es.index.auto.create`="true"
+ |and es.nodes.wan.only="true"
+ |and es.nodes="127.0.0.1" as es_instance;
+ |
+ |load es.`es_instance/twitter/cool`
+ |as table1;
+ |select * from table1 as output2;
+ |
+ |
+ """.stripMargin, sq)
+ assume(spark.sql("select * from output1").collect().last.get(0) == "cool")
+ assume(spark.sql("select * from output2").collect().last.get(0) == "cool")
+ }
+ }
+
+ val server = new streaming.test.servers.ElasticSearchServer("6.5.3")
+
+ override protected def beforeAll(): Unit = {
+ server.startServer
+ }
+
+ override protected def afterAll(): Unit = {
+ server.stopServer
+ }
+}
diff --git a/streamingpro-mlsql/src/test/scala/streaming/test/servers/ElasticSearchServer.scala b/streamingpro-mlsql/src/test/scala/streaming/test/servers/ElasticSearchServer.scala
new file mode 100644
index 000000000..e7ca6935b
--- /dev/null
+++ b/streamingpro-mlsql/src/test/scala/streaming/test/servers/ElasticSearchServer.scala
@@ -0,0 +1,27 @@
+package streaming.test.servers
+
+/**
+ * 2018-12-19 WilliamZhu(allwefantasy@gmail.com)
+ */
+class ElasticSearchServer(version: String) extends WowBaseTestServer {
+
+ override def composeYaml: String =
+ s"""
+ |version: '2'
+ |services:
+ | elasticsearch:
+ | image: library/elasticsearch:${version}
+ | ports:
+ | - "9200:9200"
+ | - "9300:9300"
+ | environment:
+ | ES_JAVA_OPTS: "-Xms512m -Xmx512m"
+ | discovery.type: single-node
+ """.stripMargin
+
+ override def waitToServiceReady: Boolean = {
+ // wait zk to ready, runs on host server
+ val shellCommand = s"nc -z 127.0.0.1 9200"
+ readyCheck("", shellCommand, false)
+ }
+}
diff --git a/streamingpro-mlsql/src/test/scala/streaming/test/servers/WowBaseTestServer.scala b/streamingpro-mlsql/src/test/scala/streaming/test/servers/WowBaseTestServer.scala
index fe1f9402b..fd909b21c 100644
--- a/streamingpro-mlsql/src/test/scala/streaming/test/servers/WowBaseTestServer.scala
+++ b/streamingpro-mlsql/src/test/scala/streaming/test/servers/WowBaseTestServer.scala
@@ -91,5 +91,6 @@ trait WowBaseTestServer extends Logging {
logInfo(s"command=[${dockerComposeCommand}] status=${status} out=[${out}] err=[${err}]")
require(status == 0, "Fail to start server")
ShellCommand.execCmd(s"rm -rf ${pjDir}")
+ ShellCommand.execWithExitValue("docker network prune --force")
}
}