Skip to content

Commit

Permalink
remove conda mirror
Browse files Browse the repository at this point in the history
  • Loading branch information
allwefantasy committed May 29, 2019
1 parent 46cf151 commit 471e45c
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 70 deletions.
8 changes: 5 additions & 3 deletions dev/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ ENV TINI_VERSION v0.16.1
ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini /usr/bin/tini
RUN chmod +x /usr/bin/tini

RUN conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
RUN conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/
RUN conda config --set show_channel_urls yes
#RUN conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
#RUN conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/
#RUN conda config --set show_channel_urls yes

RUN mkdir ~/.pip

Expand Down Expand Up @@ -71,13 +71,15 @@ ENV SPARK_HOME /work/spark-${SPARK_VERSION}-bin-hadoop2.7
RUN mkdir -p /home/deploy
RUN mkdir -p /home/deploy/mlsql
RUN mkdir -p /home/deploy/mlsql-console
RUN mkdir -p /tmp/__mlsql__/logs

RUN mkdir -p /home/deploy/mlsql/libs
ENV MLSQL_DISTRIBUTIOIN_URL="streamingpro-mlsql-spark_${MLSQL_SPARK_VERSION}-${MLSQL_VERSION}.jar"
ADD ${MLSQL_DISTRIBUTIOIN_URL} /home/deploy/mlsql/libs
ADD lib/ansj_seg-5.1.6.jar /home/deploy/mlsql/libs
ADD lib/nlp-lang-1.7.8.jar /home/deploy/mlsql/libs
ADD start-local.sh /home/deploy/mlsql
ADD log4j.properties /work/${FILENAME}/conf
WORKDIR /home/deploy/mlsql

#ENTRYPOINT [ "/usr/bin/tini", "--" ]
Expand Down
45 changes: 45 additions & 0 deletions dev/docker/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Set everything to be logged to the console
log4j.rootCategory=INFO, console,file
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

log4j.appender.file=org.apache.log4j.rolling.RollingFileAppender
log4j.appender.file.File=/tmp/__mlsql__/logs/mlsql_engine.log
log4j.appender.file.rollingPolicy=org.apache.log4j.rolling.TimeBasedRollingPolicy
log4j.appender.file.rollingPolicy.fileNamePattern=/tmp/__mlsql__/logs/mlsql_engine.%d.gz
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
log4j.appender.file.MaxBackupIndex=5
# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN
#log4j.logger.org.apache.spark=WARN
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
2 changes: 2 additions & 0 deletions dev/docker/start-local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ $SPARK_HOME/bin/spark-submit --class streaming.core.StreamingApp \
${MLSQL_HOME}/libs/${MAIN_JAR} \
-streaming.name mlsql \
-streaming.platform spark \
-streaming.ps.enable true \
-streaming.udf.clzznames "streaming.crawler.udf.Functions" \
-streaming.rest true \
-streaming.driver.port 9003 \
-streaming.spark.service true \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ object BackendService {
val counter = active_task_meta.get(ins.meta)
try {
counter.incrementAndGet()
logger.info(s"Visit backend tagged with ${tags}. Finally we found ${ins.meta.getUrl} with tags:${ins.meta.getTags.mkString(",")}")
Option(f(ins.instance))
} finally {
counter.decrementAndGet()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,80 +1,59 @@
package streaming.core.datasource

import java.io.File
import java.net.{URL, URLClassLoader, URLEncoder}
import java.net.{URL, URLClassLoader}
import java.nio.file.Files

import net.csdn.ServiceFramwork
import net.csdn.common.path.Url
import net.csdn.modules.http.RestRequest
import net.csdn.modules.transport.HttpTransportService
import net.sf.json.{JSONArray, JSONObject}
import org.apache.http.client.fluent.Request
import org.apache.http.client.fluent.{Form, Request}
import streaming.common.{JSONTool, PathFun}
import streaming.dsl.ScriptSQLExec

import scala.collection.JavaConverters._
import streaming.log.{Logging, WowLog}

/**
* 2019-01-14 WilliamZhu([email protected])
*/
class DataSourceRepository(url: String) {
def httpClient = ServiceFramwork.injector.getInstance[HttpTransportService](classOf[HttpTransportService])
class DataSourceRepository(url: String) extends Logging with WowLog {


//"http://respository.datasource.mlsql.tech"
def getOrDefaultUrl = {
if (url == null || url.isEmpty) {
val context = ScriptSQLExec.contextGetOrForTest()
require(context.userDefinedParam.contains("__datasource_repository_url__"), "context.__datasource_repository_url__ should be configure if you want use connect DataSourceRepository")
context.userDefinedParam.get("__datasource_repository_url__")
context.userDefinedParam.getOrElse("__datasource_repository_url__", "http://datasource.repository.mlsql.tech")
} else {
url
}
}

def listCommand = {
val res = httpClient.http(new Url(s"${getOrDefaultUrl}/jar/manager/source/mapper"), "{}", RestRequest.Method.POST)

JSONObject.fromObject(res.getContent).asScala.flatMap { kv =>
kv._2.asInstanceOf[JSONArray].asScala.map { _item =>
val item = _item.asInstanceOf[JSONObject]
item.put("name", kv._1)
val temp = new JSONArray()
temp.add(item)
val versionList = versionCommand(temp)
val versionArray = new JSONArray
versionList.foreach { v => versionArray.add(v) }
item.put("versions", versionArray)
item.toString
}
}.toSeq
def versionCommand(name: String, sparkV: String) = {
val res = Request.Post(s"${getOrDefaultUrl}/repo/versions").connectTimeout(60 * 1000)
.socketTimeout(10 * 60 * 1000).bodyForm(Form.form().add("name", name).
add("scalaV", "2.11").add("sparkV", sparkV).build()
).execute().returnContent().asString()
JSONTool.parseJson[List[String]](res)
}

def versionCommand(items: JSONArray) = {
val request = new JSONArray
items.asScala.map { _item =>
val item = _item.asInstanceOf[JSONObject]
val json = new JSONObject()
json.put("jarname", item.getString("groupid") + "/" + item.getString("artifactId"))
request.add(json)
}

val res = httpClient.http(new Url(s"${getOrDefaultUrl}/jar/manager/versions"), request.toString(), RestRequest.Method.POST)
JSONArray.fromObject(res.getContent).get(0).asInstanceOf[JSONObject].asScala.map { kv =>
val version = kv._1.asInstanceOf[String].split("/").last
version
}.toSeq
def list() = {
val res = Request.Get(s"${getOrDefaultUrl}/repo/list").connectTimeout(60 * 1000)
.socketTimeout(10 * 60 * 1000).execute().returnContent().asString()
JSONTool.parseJson[List[String]](res)
}

def addCommand(format: String, groupid: String, artifactId: String, version: String) = {
val url = s"http://central.maven.org/maven2/${groupid.replaceAll("\\.", "/")}/${artifactId}/${version}"
def addCommand(name: String, version: String, sparkV: String, scalaV: String = "2.11") = {

// fileName format e.g es, mongodb
val finalUrl = s"${getOrDefaultUrl}/jar/manager/http?fileName=${URLEncoder.encode(artifactId, "utf-8")}&url=${URLEncoder.encode(url, "utf-8")}"
val inputStream = Request.Get(finalUrl).execute().returnContent().asStream()
logInfo(s"downloading ${name} from ${getOrDefaultUrl}")
val response = Request.Post(s"${getOrDefaultUrl}/repo/download").connectTimeout(60 * 1000)
.socketTimeout(10 * 60 * 1000).bodyForm(Form.form().add("name", name).
add("scalaV", scalaV).add("sparkV", sparkV).add("version", version).build()).execute().returnResponse()
var fieldValue = response.getFirstHeader("Content-Disposition").getValue
val inputStream = response.getEntity.getContent
fieldValue = fieldValue.substring(fieldValue.indexOf("filename=\"") + 10, fieldValue.length() - 1);
val tmpLocation = new File("./dataousrce_upjars")
if (!tmpLocation.exists()) {
tmpLocation.mkdirs()
}
val jarFile = new File(tmpLocation.getPath + s"/${artifactId}-${version}.jar")
val jarFile = new File(PathFun(tmpLocation.getPath).add(fieldValue).toPath)
if (!jarFile.exists()) {
Files.copy(inputStream, jarFile.toPath)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package streaming.dsl.mmlib.algs

import org.apache.spark.ml.param.Param
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.mlsql.session.MLSQLException
import org.apache.spark.sql.{DataFrame, SparkSession}
import streaming.core.datasource.DataSourceRepository
import streaming.dsl.mmlib.SQLAlg
Expand All @@ -14,6 +15,9 @@ import streaming.log.{Logging, WowLog}
class SQLDataSourceExt(override val uid: String) extends SQLAlg with WowParams with Logging with WowLog {


override def skipPathPrefix: Boolean = true

// path: es
override def train(df: DataFrame, path: String, params: Map[String, String]): DataFrame = {

params.get(command.name).map { item =>
Expand All @@ -27,32 +31,41 @@ class SQLDataSourceExt(override val uid: String) extends SQLAlg with WowParams w
set(repository, item)
item
}

params.get(sparkV.name).map { item =>
set(sparkV, item)
item
}.getOrElse {
throw new MLSQLException("please set spark version")
}

params.get(scalaV.name).map { item =>
set(scalaV, item)
item
}.getOrElse {
set(scalaV, "2.11")
}

val rep = if (isDefined(repository)) $(repository) else ""
val dataSourceRepository = new DataSourceRepository(rep)

val spark = df.sparkSession
import spark.implicits._
$(command) match {
case "list" =>
spark.read.json(spark.createDataset(dataSourceRepository.listCommand))
case "list" => spark.createDataset[String](dataSourceRepository.list()).toDF("value")
case "version" =>
val res = dataSourceRepository.versionCommand(path, $(sparkV))
spark.createDataset[String](res).toDF("value")
case "add" =>
if (!path.contains("/")) {
// spark.table(path).collect().map{row=>
// row.get()
//
// }
Seq[Seq[String]](Seq()).toDF("desc")
} else {
val Array(dsFormat, groupid, artifactId, version) = path.split("/")
val url = dataSourceRepository.addCommand(dsFormat, groupid, artifactId, version)
val logMsg = format(s"Datasource is loading jar from ${url}")
logInfo(logMsg)
dataSourceRepository.loadJarInDriver(url)
spark.sparkContext.addJar(url)

//FileUtils.deleteQuietly(new File(url))
Seq[Seq[String]](Seq(logMsg)).toDF("desc")
}
val Array(name, version) = path.split("/")
val url = dataSourceRepository.addCommand(name, version, $(sparkV), $(scalaV))
val logMsg = format(s"Datasource is loading jar from ${url}")
logInfo(logMsg)
dataSourceRepository.loadJarInDriver(url)
spark.sparkContext.addJar(url)

//FileUtils.deleteQuietly(new File(url))
Seq[Seq[String]](Seq(logMsg)).toDF("desc")

}
}
Expand All @@ -75,6 +88,8 @@ class SQLDataSourceExt(override val uid: String) extends SQLAlg with WowParams w
})

final val repository: Param[String] = new Param[String](this, "repository", "repository url")
final val sparkV: Param[String] = new Param[String](this, "sparkV", "2.3/2.4")
final val scalaV: Param[String] = new Param[String](this, "scalaV", "2.11/2.12")

def this() = this(BaseParams.randomUID())
}

0 comments on commit 471e45c

Please sign in to comment.