Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
allwefantasy committed Mar 14, 2019
1 parent 7e1d439 commit f1d7fdd
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 28 deletions.
5 changes: 3 additions & 2 deletions dev/build-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ SELF=$(cd $(dirname $0) && pwd)
cd $SELF

RELEASE=${RELEASE:-false}
MLSQL_VERSION=${MLSQL_VERSION:-1.2.0-SNAPSHOT}

if [[ "${RELEASE}" != "true" ]];then
cd ..
Expand All @@ -15,8 +16,8 @@ if [[ "${RELEASE}" != "true" ]];then
exec:exec)
fi

export MLSQL_SPARK_VERSION=${MLSQL_SPARK_VERSION:-2.3}
export SPARK_VERSION=${SPARK_VERSION:-2.3.2}
export MLSQL_SPARK_VERSION=${MLSQL_SPARK_VERSION:-2.4}
export SPARK_VERSION=${SPARK_VERSION:-2.4.0}
export MLSQL_DISTRIBUTIOIN_URL="streamingpro-mlsql-spark_${MLSQL_SPARK_VERSION}-${MLSQL_VERSION}.jar"
export DISTRIBUTION=${MLSQL_SPARK_VERSION:-false}

Expand Down
4 changes: 3 additions & 1 deletion dev/start-local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ fi

JARS=$(echo ${MLSQL_HOME}/libs/*.jar | tr ' ' ',')
MAIN_JAR=$(ls ${MLSQL_HOME}/libs|grep 'streamingpro-mlsql')
export DRIVER_MEMORY=${DRIVER_MEMORY:-2g}
$SPARK_HOME/bin/spark-submit --class streaming.core.StreamingApp \
--driver-memory ${DRIVER_MEMORY} \
--jars ${JARS} \
--master local[*] \
--name mlsql \
Expand All @@ -30,4 +32,4 @@ $SPARK_HOME/bin/spark-submit --class streaming.core.StreamingApp \
-streaming.driver.port 9003 \
-streaming.spark.service true \
-streaming.thrift false \
-streaming.enableHiveSupport true
-streaming.enableHiveSupport true
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object OperateType extends Enumeration {
object TableType {
val HIVE = TableTypeMeta("hive", Set("hive"))
val HBASE = TableTypeMeta("hbase", Set("hbase"))
val HDFS = TableTypeMeta("hdfs", Set("parquet", "json", "csv", "image", "text", "xml"))
val HDFS = TableTypeMeta("hdfs", Set("parquet", "json", "csv", "image", "text", "xml", "excel"))
val HTTP = TableTypeMeta("http", Set("http"))
val JDBC = TableTypeMeta("jdbc", Set("jdbc"))
val ES = TableTypeMeta("es", Set("es"))
Expand All @@ -68,7 +68,7 @@ object TableType {
val API = TableTypeMeta("api", Set("mlsqlAPI", "mlsqlConf"))
val WEB = TableTypeMeta("web", Set("crawlersql"))
val GRAMMAR = TableTypeMeta("grammar", Set("grammar"))
val SYSTEM = TableTypeMeta("system", Set("_mlsql_"))
val SYSTEM = TableTypeMeta("system", Set("_mlsql_", "model", "modelList", "modelParams", "modelExample", "modelExplain"))

def from(str: String) = {
List(HIVE, HBASE, HDFS, HTTP, JDBC, ES, MONGO, SOLR, TEMP, API, WEB, GRAMMAR, SYSTEM).filter(f => f.includes.contains(str)).headOption
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,38 @@ package streaming.core.datasource.impl

import _root_.streaming.core.datasource._
import _root_.streaming.dsl.mmlib.algs.param.{BaseParams, WowParams}
import _root_.streaming.dsl.{DslTool, ScriptSQLExec}
import org.apache.spark.ml.param.Param
import org.apache.spark.sql._

/**
* 2019-02-19 WilliamZhu([email protected])
*/
class MLSQLExcel(override val uid: String) extends MLSQLSource with MLSQLSink with MLSQLSourceInfo with MLSQLRegistry with WowParams {
class MLSQLExcel(override val uid: String) extends MLSQLSource with MLSQLSink with MLSQLSourceInfo with MLSQLRegistry with WowParams with DslTool {
def this() = this(BaseParams.randomUID())

override def load(reader: DataFrameReader, config: DataSourceConfig): DataFrame = {
val context = ScriptSQLExec.contextGetOrForTest()
val format = config.config.getOrElse("implClass", fullFormat)
reader.options(rewriteConfig(config.config)).format(format).load(config.config("_filePath_"))
val owner = config.config.get("owner").getOrElse(context.owner)
reader.options(rewriteConfig(config.config)).format(format).load(resourceRealPath(context.execListener, Option(owner), config.path))
}

override def save(writer: DataFrameWriter[Row], config: DataSinkConfig): Unit = {
val context = ScriptSQLExec.contextGetOrForTest()
val format = config.config.getOrElse("implClass", fullFormat)

writer.options(rewriteConfig(config.config)).format(format).save(config.config("_filePath_"))
writer.options(rewriteConfig(config.config)).format(format).save(resourceRealPath(context.execListener, Option(context.owner), config.path))
}

def rewriteConfig(config: Map[String, String]) = {
config ++ Map("useHeader" -> config.getOrElse("useHeader", "false"))
}

override def sourceInfo(config: DataAuthConfig): SourceInfo = SourceInfo(shortFormat, "", "")
override def sourceInfo(config: DataAuthConfig): SourceInfo = {
val context = ScriptSQLExec.contextGetOrForTest()
val owner = config.config.get("owner").getOrElse(context.owner)
SourceInfo(shortFormat, "", resourceRealPath(context.execListener, Option(owner), config.path))
}

override def explainParams(spark: SparkSession) = {
_explainParams(spark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,14 @@ class BatchLoadAdaptor(scriptSQLExecListener: ScriptSQLExecListener,
path = TemplateMerge.merge(path, scriptSQLExecListener.env().toMap)
val resourceOwner = option.get("owner")

// calculate resource real absolute path
val filePath = resourceRealPath(scriptSQLExecListener, resourceOwner, path)

DataSourceRegistry.fetch(format, option).map { datasource =>
def emptyDataFrame = {
import sparkSession.implicits._
Seq.empty[String].toDF("name")
}

table = datasource.asInstanceOf[ {def load(reader: DataFrameReader, config: DataSourceConfig): DataFrame}].
load(reader, DataSourceConfig(cleanStr(path), option ++ Map("_filePath_" -> filePath), Option(emptyDataFrame)))
load(reader, DataSourceConfig(cleanStr(path), option, Option(emptyDataFrame)))
}.getOrElse {
format match {
case "crawlersql" =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ class SaveAdaptor(scriptSQLExecListener: ScriptSQLExecListener) extends DslAdapt
}

var streamQuery: StreamingQuery = null
option = option ++ Map("_filePath_" -> TemplateMerge.merge(withPathPrefix(scriptSQLExecListener.pathPrefix(owner), cleanStr(path)), scriptSQLExecListener.env().toMap))
if (scriptSQLExecListener.env().contains("stream")) {
streamQuery = new StreamSaveAdaptor(scriptSQLExecListener, option, oldDF, final_path, tableName, format, mode, partitionByCol.toArray).parse
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ object ScriptSQLExec extends Logging with WowLog {
def contextGetOrForTest(): MLSQLExecuteContext = {
if (context() == null) {
val exec = new ScriptSQLExecListener(null, "/tmp/william", Map())
setContext(new MLSQLExecuteContext("testUser", exec.pathPrefix(None), "", Map()))
setContext(new MLSQLExecuteContext(exec,"testUser", exec.pathPrefix(None), "", Map()))
}
context()
}
Expand Down Expand Up @@ -325,7 +325,7 @@ object ConnectMeta {
}
}

case class MLSQLExecuteContext(owner: String, home: String, groupId: String, userDefinedParam: Map[String, String] = Map())
case class MLSQLExecuteContext(execListener: ScriptSQLExecListener, owner: String, home: String, groupId: String, userDefinedParam: Map[String, String] = Map())

case class DBMappingKey(format: String, db: String)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,13 @@ class SQLDownloadExt(override val uid: String) extends SQLAlg with WowParams {
}

val auth_secret = context.userDefinedParam("__auth_secret__")
val stream = Request.Get(fromUrl + s"?userName=${URLEncoder.encode(context.owner, "utf-8")}&fileName=${URLEncoder.encode($(from), "utf-8")}&auth_secret=${URLEncoder.encode(auth_secret, "utf-8")}")

def urlencode(name: String) = {
URLEncoder.encode(name, "utf-8")
}

val getUrl = fromUrl + s"?userName=${urlencode(context.owner)}&fileName=${urlencode($(from))}&auth_secret=${urlencode(auth_secret)}"
val stream = Request.Get(getUrl)
.connectTimeout(60 * 1000)
.socketTimeout(10 * 60 * 1000)
.execute().returnContent().asStream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ package streaming.rest

import java.lang.reflect.Modifier

import _root_.streaming.common.JarUtil
import _root_.streaming.core._
import _root_.streaming.core.strategy.platform.{PlatformManager, SparkRuntime}
import _root_.streaming.dsl.mmlib.algs.tf.cluster.{ClusterSpec, ClusterStatus, ExecutorInfo}
import _root_.streaming.dsl.{MLSQLExecuteContext, ScriptSQLExec, ScriptSQLExecListener}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import net.csdn.annotation.rest.{At, _}
Expand All @@ -28,15 +33,11 @@ import net.csdn.common.path.Url
import net.csdn.modules.http.RestRequest.Method._
import net.csdn.modules.http.{ApplicationController, ViewType}
import net.csdn.modules.transport.HttpTransportService
import org.apache.spark.{MLSQLConf, SparkInstanceService}
import org.apache.spark.ps.cluster.Message
import org.apache.spark.sql._
import org.apache.spark.{MLSQLConf, SparkInstanceService}
import org.joda.time.format.ISODateTimeFormat
import _root_.streaming.common.JarUtil
import _root_.streaming.core._
import _root_.streaming.core.strategy.platform.{PlatformManager, SparkRuntime}
import _root_.streaming.dsl.mmlib.algs.tf.cluster.{ClusterSpec, ClusterStatus, ExecutorInfo}
import _root_.streaming.dsl.{MLSQLExecuteContext, ScriptSQLExec, ScriptSQLExecListener}

import scala.collection.JavaConversions._
import scala.util.control.NonFatal

Expand Down Expand Up @@ -149,7 +150,7 @@ class RestController extends ApplicationController {
val context = new ScriptSQLExecListener(sparkSession, defaultPathPrefix, allPathPrefix)
val ownerOption = if (params.containsKey("owner")) Some(param("owner")) else None
val userDefineParams = params.toMap.filter(f => f._1.startsWith("context.")).map(f => (f._1.substring("context.".length), f._2)).toMap
ScriptSQLExec.setContext(new MLSQLExecuteContext(param("owner"), context.pathPrefix(None), groupId, userDefineParams))
ScriptSQLExec.setContext(new MLSQLExecuteContext(context, param("owner"), context.pathPrefix(None), groupId, userDefineParams))
context.addEnv("HOME", context.pathPrefix(None))
context.addEnv("OWNER", ownerOption.getOrElse("anonymous"))
context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class RestPredictController extends ApplicationController {

def createContext = {
val userDefineParams = params.toMap.filter(f => f._1.startsWith("context.")).map(f => (f._1.substring("context.".length), f._2)).toMap
ScriptSQLExec.setContext(new MLSQLExecuteContext(param("owner"), "", "", userDefineParams))
ScriptSQLExec.setContext(new MLSQLExecuteContext(null,param("owner"), "", "", userDefineParams))
}

def getSQL = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ trait SpecFunctions {
StreamingproJobManager.addJobManually(StreamingproJobInfo(
"william", StreamingproJobType.SCRIPT, "", "", groupId, System.currentTimeMillis(), -1
))
ScriptSQLExec.setContext(new MLSQLExecuteContext("william", "/tmp/william", groupId, Map()))
val context = new ScriptSQLExecListener(spark, defaultPathPrefix, Map())
context.addEnv("HOME", context.pathPrefix(None))
ScriptSQLExec.setContext(new MLSQLExecuteContext(context, "william", "/tmp/william", groupId, Map()))
context
}

Expand All @@ -79,9 +79,9 @@ trait SpecFunctions {
StreamingproJobManager.addJobManually(StreamingproJobInfo(
"william", StreamingproJobType.SCRIPT, "", "", groupId, System.currentTimeMillis(), -1
))
ScriptSQLExec.setContext(new MLSQLExecuteContext("william", "/tmp/william", groupId, Map()))
val context = new ScriptSQLExecListener(spark, "/tmp/william", Map())
context.addEnv("HOME", context.pathPrefix(None))
ScriptSQLExec.setContext(new MLSQLExecuteContext(context, "william", "/tmp/william", groupId, Map()))
context
}

Expand Down

0 comments on commit f1d7fdd

Please sign in to comment.