Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
allwefantasy committed Mar 21, 2019
1 parent 4d8a6c0 commit ca60562
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 336 deletions.
12 changes: 9 additions & 3 deletions dev/package.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ usage: package
run package command based on different spark version.
Inputs are specified with the following environment variables:
MLSQL_SPARK_VERSION - the spark version, 2.2/2.3/2.4
DRY_RUN true|false
DISTRIBUTION true|false
MLSQL_SPARK_VERSION - the spark version, 2.2/2.3/2.4 default 2.3
DRY_RUN true|false default false
DISTRIBUTION true|false default false
DATASOURCE_INCLUDED true|false default false
EOF
exit 1
}
Expand All @@ -29,6 +30,7 @@ MLSQL_SPARK_VERSION=${MLSQL_SPARK_VERSION:-2.3}
DRY_RUN=${DRY_RUN:-false}
DISTRIBUTION=${DISTRIBUTION:-false}
OSS_ENABLE=${OSS_ENABLE:-false}
DATASOURCE_INCLUDED=${DATASOURCE_INCLUDED:-false}
COMMAND=${COMMAND:-package}

for env in MLSQL_SPARK_VERSION DRY_RUN DISTRIBUTION; do
Expand Down Expand Up @@ -82,6 +84,10 @@ if [[ "${OSS_ENABLE}" == "true" ]];then
BASE_PROFILES="$BASE_PROFILES -Poss-support"
fi

if [[ "$DATASOURCE_INCLUDED" == "true" ]];then
BASE_PROFILES="$BASE_PROFILES -Punit-test"
fi

if [[ ${DRY_RUN} == "true" ]];then

cat << EOF
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object TableType {
val HBASE = TableTypeMeta("hbase", Set("hbase"))
val HDFS = TableTypeMeta("hdfs", Set("parquet", "json", "csv", "image", "text", "xml", "excel"))
val HTTP = TableTypeMeta("http", Set("http"))
val JDBC = TableTypeMeta("jdbc", Set("jdbc"))
val JDBC = TableTypeMeta("jdbc", Set("jdbc", "streamJDBC"))
val ES = TableTypeMeta("es", Set("es"))
val REDIS = TableTypeMeta("redis", Set("redis"))
val KAFKA = TableTypeMeta("kafka", Set("kafka", "kafka8", "kafka9"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,34 @@ class MLSQLElasticSearch(override val uid: String) extends MLSQLSource with MLSQ
val newOptions = scala.collection.mutable.HashMap[String, String]() ++ config.config
ConnectMeta.options(DBMappingKey(shortFormat, _dbname)) match {
case Some(option) =>
dbName = ""
newOptions ++= option

table.split(dbSplitter) match {
case Array(_db, _table) =>
dbName = _db
table = _table
case _ =>
}

case None =>
dbName = ""
//dbName = ""
}

newOptions.filter(f => f._1 == "es.resource").map { f => table == f._2 }

newOptions.filter(f => f._1 == "es.resource").map { f =>
if (f._2.contains(dbSplitter)) {
f._2.split(dbSplitter, 2) match {
case Array(_db, _table) =>
dbName = _db
table = _table
case Array(_db) =>
dbName = _db
}
} else {
dbName = f._2
}
}

SourceInfo(shortFormat, dbName, table)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import org.scalatest.BeforeAndAfterAll
import streaming.core.strategy.platform.SparkRuntime
import streaming.core.{BasicMLSQLConfig, SpecFunctions}
import streaming.dsl.ScriptSQLExec
import streaming.dsl.auth.OperateType
import streaming.dsl.auth.meta.client.DefaultConsoleClient
import streaming.dsl.auth.{OperateType, TableType}
import streaming.log.Logging
import streaming.test.datasource.help.MLSQLTableEnhancer._

Expand Down Expand Up @@ -146,4 +146,225 @@ class AuthSpec extends BasicSparkOperation with SpecFunctions with BasicMLSQLCon

}
}


"auth-mongo" should "work fine" in {

withBatchContext(setupBatchContext(batchParams, "classpath:///test/empty.json")) { implicit runtime: SparkRuntime =>
//执行sql
val spark = runtime.sparkSession

val mlsql =
"""
|connect mongo where
| partitioner="MongoPaginateBySizePartitioner"
|and uri="mongodb://127.0.0.1:27017/twitter" as mongo_instance;
|
|load mongo.`mongo_instance/cool`
|as test_table;
|
|load mongo.`cool_1` where
| partitioner="MongoPaginateBySizePartitioner"
|and uri="mongodb://127.0.0.1:27017/twitter_1"
|as test_table_2;
|
|save overwrite test_table as mongo.`mongo_instance/cool_2` where
| partitioner="MongoPaginateBySizePartitioner";
""".stripMargin
executeScript(mlsql)
var tables = DefaultConsoleClient.get

val loadMLSQLTable = tables
.filter(f => (f.tableType == TableType.MONGO && f.operateType == OperateType.LOAD))

var table = loadMLSQLTable.map(f => f.table.get).toSet
assume(table == Set("cool", "cool_1"))
var db = loadMLSQLTable.map(f => f.db.get).toSet
assume(db == Set("twitter", "twitter_1"))
var sourceType = loadMLSQLTable.map(f => f.sourceType.get).toSet
assume(sourceType == Set("mongo"))

val saveMLSQLTable = tables
.filter(f => (f.tableType == TableType.MONGO && f.operateType == OperateType.SAVE))
table = saveMLSQLTable.map(f => f.table.get).toSet
assume(table == Set("cool_2"))
db = saveMLSQLTable.map(f => f.db.get).toSet
assume(db == Set("twitter"))
sourceType = saveMLSQLTable.map(f => f.sourceType.get).toSet
assume(sourceType == Set("mongo"))
}
}

"auth-es" should "work fine" in {

withBatchContext(setupBatchContext(batchParams, "classpath:///test/empty.json")) { implicit runtime: SparkRuntime =>
//执行sql
implicit val spark = runtime.sparkSession


val mlsql =
"""
|connect es where
|`es.nodes`="127.0.0.1"
|and `es.port`="9200"
|and `es.resource`="test_index"
|as es_conn;
|
|load es.`es_conn/test_type`
|as es_test;
|
|load es.`test_type_1` options `es.nodes`="127.0.0.1"
|and `es.resource`="test_index_1"
|and `es.port`="9200"
|as es_test_1;
|
|load es.`.test.index/` options `es.nodes`="172.16.1.159"
|and `es.port`="9200"
|as es_test;
|
|save overwrite data1 as es.`es_conn/test_index_2` where
|`es.index.auto.create`="true"
|and `es.port`="9200"
|and `es.nodes`="127.0.0.1";
|
|connect es where
|`es.nodes`="127.0.0.1"
|and `es.port`="9200"
|as es_conn1;
|
|load es.`es_conn1/index/ttype` options `es.nodes`="172.16.1.159"
|and `es.port`="9200"
|as es_test2;
""".stripMargin

executeScript(mlsql)

var tables = DefaultConsoleClient.get

val loadMLSQLTable = tables
.filter(f => (f.tableType == TableType.ES && f.operateType == OperateType.LOAD))

var table = loadMLSQLTable.map(f => f.table.get).toSet
assume(table == Set("test_type", "test_type_1", "", "ttype"))
var db = loadMLSQLTable.map(f => f.db.get).toSet
assume(db == Set("test_index", "test_index_1", ".test.index", "index"))
var sourceType = loadMLSQLTable.map(f => f.sourceType.get).toSet
assume(sourceType == Set("es"))

val saveMLSQLTable = tables
.filter(f => (f.tableType == TableType.ES && f.operateType == OperateType.SAVE))
table = saveMLSQLTable.map(f => f.table.get).toSet
assume(table == Set("test_index_2"))
db = saveMLSQLTable.map(f => f.db.get).toSet
assume(db == Set("test_index"))
sourceType = saveMLSQLTable.map(f => f.sourceType.get).toSet
assume(sourceType == Set("es"))
}
}

"auth-solr" should "work fine" in {
withBatchContext(setupBatchContext(batchParams, "classpath:///test/empty.json")) { implicit runtime: SparkRuntime =>
//执行sql
implicit val spark = runtime.sparkSession

val mlsql =
"""
connect solr where `zkhost`="127.0.0.1:9983"
and `collection`="mlsql_example"
and `flatten_multivalued`="false"
as solr1
;
load solr.`solr1/mlsql_example` as mlsql_example;
save mlsql_example_data as solr.`solr1/mlsql_example`
options soft_commit_secs = "1";
""".stripMargin

executeScript(mlsql)

var tables = DefaultConsoleClient.get
val loadMLSQLTable = tables.filter(f => (f.tableType == TableType.SOLR && f.operateType == OperateType.LOAD))

var db = loadMLSQLTable.map(f => f.db.get).toSet
assume(db == Set("mlsql_example"))
var sourceType = loadMLSQLTable.map(f => f.sourceType.get).toSet
assume(sourceType == Set("solr"))

val saveMLSQLTable = tables
.filter(f => (f.tableType == TableType.SOLR && f.operateType == OperateType.SAVE))
db = saveMLSQLTable.map(f => f.db.get).toSet
assume(db == Set("mlsql_example"))
sourceType = saveMLSQLTable.map(f => f.sourceType.get).toSet
assume(sourceType == Set("solr"))
}

}

"auth-hbase" should "work fine" in {


withBatchContext(setupBatchContext(batchParams, "classpath:///test/empty.json")) { implicit runtime: SparkRuntime =>
//执行sql
implicit val spark = runtime.sparkSession


val mlsql =
"""
|connect hbase where
| namespace="test_ns"
|and zk="127.0.0.1:2181" as hbase_instance;
|
|load hbase.`hbase_instance:test_tb` options
|family="cf"
|as test_table;
|
|load hbase.`test_ns_1:test_tb_1`
|options zk="127.0.0.1:2181"
|and family="cf"
|as output1;
|
|save overwrite test_table as hbase.`hbase_instance:test_tb_2` where
| rowkey="rowkey"
|and family="cf";
""".stripMargin
executeScript(mlsql)

var tables = DefaultConsoleClient.get

val loadMLSQLTable = tables.filter(f => (f.tableType == TableType.HBASE && f.operateType == OperateType.LOAD))

var table = loadMLSQLTable.map(f => f.table.get).toSet
assume(table == Set("test_tb", "test_tb_1"))
var db = loadMLSQLTable.map(f => f.db.get).toSet
assume(db == Set("test_ns", "test_ns_1"))
var sourceType = loadMLSQLTable.map(f => f.sourceType.get).toSet
assume(sourceType == Set("hbase"))

val saveMLSQLTable = tables
.filter(f => (f.tableType == TableType.HBASE && f.operateType == OperateType.SAVE))
table = saveMLSQLTable.map(f => f.table.get).toSet
assume(table == Set("test_tb_2"))
db = saveMLSQLTable.map(f => f.db.get).toSet
assume(db == Set("test_ns"))
sourceType = saveMLSQLTable.map(f => f.sourceType.get).toSet
assume(sourceType == Set("hbase"))
}

}
"auth-set-statement" should "work fine" in {
withBatchContext(setupBatchContext(batchParams, "classpath:///test/empty.json")) { implicit runtime: SparkRuntime =>
//执行sql
implicit val spark = runtime.sparkSession

val mlsql =
"""
set jack=`echo` where type="shell";
""".stripMargin
executeScript(mlsql)
val loadMLSQLTable = DefaultConsoleClient.get.filter(f => (f.tableType == TableType.GRAMMAR && f.operateType == OperateType.SET))
assert(loadMLSQLTable.size == 1)
}

}
}
Loading

0 comments on commit ca60562

Please sign in to comment.