Skip to content

Commit

Permalink
Merge pull request byzer-org#526 from allwefantasy/mlsql
Browse files Browse the repository at this point in the history
以RandomForest为例,增强了MLLib库的算法
  • Loading branch information
allwefantasy authored Sep 13, 2018
2 parents 63ee8e9 + 4c43d7b commit d247544
Show file tree
Hide file tree
Showing 67 changed files with 648 additions and 103 deletions.
95 changes: 95 additions & 0 deletions docs/en/mlsql-mllib-randomfores-classfication.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
## RandomForest Example

Suppose you want to use RandomForest to train a model. Here is the MLSQL script.

```sql

-- create test data
set jsonStr='''
{"features":[5.1,3.5,1.4,0.2],"label":0.0},
{"features":[5.1,3.5,1.4,0.2],"label":1.0}
{"features":[5.1,3.5,1.4,0.2],"label":0.0}
{"features":[4.4,2.9,1.4,0.2],"label":0.0}
{"features":[5.1,3.5,1.4,0.2],"label":1.0}
{"features":[5.1,3.5,1.4,0.2],"label":0.0}
{"features":[5.1,3.5,1.4,0.2],"label":0.0}
{"features":[4.7,3.2,1.3,0.2],"label":1.0}
{"features":[5.1,3.5,1.4,0.2],"label":0.0}
{"features":[5.1,3.5,1.4,0.2],"label":0.0}
''';
load jsonStr.`jsonStr` as data;
select vec_dense(features) as features ,label as label from data
as data1;

-- use RandomForest
train data1 as RandomForest.`/tmp/model` where

-- once set true,every time you run this script, MLSQL will generate new directory for you model
keepVersion="true"

-- specicy the test dataset which will be used to feed evaluator to generate some metrics e.g. F1, Accurate
and evaluateTable="data1"

-- specify group 0 parameters
and `fitParam.0.labelCol`="features"
and `fitParam.0.featuresCol`="label"
and `fitParam.0.maxDepth`="2"

-- specify group 1 parameters
and `fitParam.0.featuresCol`="features"
and `fitParam.0.labelCol`="label"
and `fitParam.1.maxDepth`="10"
;

```

When this script is executed, the following result will be showed in web console:



```
name value
---------------------------------
modelPath /tmp/model/_model_10/model/1
algIndex 1
alg org.apache.spark.ml.classification.RandomForestClassifier
metrics f1: 0.7625000000000001 weightedPrecision: 0.8444444444444446 weightedRecall: 0.7999999999999999 accuracy: 0.8
status success
startTime 20180913 59:15:32:685
endTime 20180913 59:15:36:317
trainParams Map(maxDepth -> 10)
---------------------------------
modelPath /tmp/model/_model_10/model/0
algIndex 0
alg org.apache.spark.ml.classification.RandomForestClassifier
metrics f1:0.7625000000000001 weightedPrecision: 0.8444444444444446 weightedRecall: 0.7999999999999999 accuracy: 0.8
status success
startTime 20180913 59:1536:318
endTime 20180913 59:1538:024
trainParams Map(maxDepth -> 2, featuresCol -> features, labelCol -> label)
```

If you feel ok, register and use the model:

```sql
register RandomForest.`/tmp/model` as rf_predict;

-- you can specify which module you want to use:
register RandomForest.`/tmp/model` as rf_predict where
algIndex="0";

-- you can specify which metric the MLSQL should use to get best model
register RandomForest.`/tmp/model` as rf_predict where
autoSelectByMetric="f1";

select rf_predict(features) as predict_label, label from data1 as output;
```









Empty file added docs/en/mlsql-xgboost.md
Empty file.
7 changes: 2 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
<module>streamingpro-tensorflow</module>
<module>streamingpro-opencv</module>
<module>streamingpro-jython</module>
<module>streamingpro-automl</module>
</modules>

<properties>
Expand All @@ -64,7 +65,6 @@
<scope>provided</scope>
</properties>


<repositories>
<repository>
<id>aliyun</id>
Expand All @@ -77,10 +77,6 @@
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>

<profiles>
Expand Down Expand Up @@ -256,6 +252,7 @@
<module>streamingpro-dsl</module>
<module>streamingpro-mlsql</module>
<module>streamingpro-crawler</module>
<module>streamingpro-xgboost</module>
</modules>
<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import org.apache.spark.sql.expressions.UserDefinedFunction
/**
* Created by allwefantasy on 13/1/2018.
*/
trait SQLAlg extends Serializable{
def train(df: DataFrame, path: String, params: Map[String, String]): Unit
trait SQLAlg extends Serializable {
def train(df: DataFrame, path: String, params: Map[String, String]): DataFrame

def load(sparkSession: SparkSession, path: String, params: Map[String, String]): Any

Expand Down
41 changes: 41 additions & 0 deletions streamingpro-automl/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>streamingpro</artifactId>
<groupId>streaming.king</groupId>
<version>1.1.3</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>streamingpro-automl</artifactId>
<dependencies>
<dependency>
<groupId>com.salesforce.transmogrifai</groupId>
<artifactId>transmogrifai-core_2.11</artifactId>
<version>0.3.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>${scope}</scope>
</dependency>


<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>${scope}</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package streaming.example

import com.salesforce.op._
import com.salesforce.op.evaluators.Evaluators
import com.salesforce.op.readers._
import com.salesforce.op.features._
import com.salesforce.op.features.types._
import com.salesforce.op.stages.impl.classification._
import com.salesforce.op.test.Passenger
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import com.salesforce.op.features.FeatureBuilder
import com.salesforce.op.features.types._

/**
* Created by allwefantasy on 12/9/2018.
*/
object AutoMLExample {
def main(args: Array[String]): Unit = {



}
}


Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import streaming.dsl.mmlib.SQLAlg
* Created by allwefantasy on 15/1/2018.
*/
class SQLDL4J extends SQLAlg with Dl4jFunctions {
override def train(df: DataFrame, path: String, params: Map[String, String]): Unit = {
override def train(df: DataFrame, path: String, params: Map[String, String]): DataFrame = {

require(params.contains("featureSize"), "featureSize is required")
require(params.contains("labelSize"), "labelSize is required")
Expand All @@ -35,6 +35,8 @@ class SQLDL4J extends SQLAlg with Dl4jFunctions {
--conf spark.yarn.executor.memoryOverhead=6144
*/
Class.forName("streaming.dsl.mmlib.algs.dl4j." + params("alg")).newInstance().asInstanceOf[SQLAlg].train(df, path, params)
import df.sparkSession.implicits._
Seq.empty[String].toDF("name")
}

override def load(sparkSession: SparkSession, path: String, params: Map[String, String]): Any = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import streaming.dsl.mmlib.SQLAlg
* Created by allwefantasy on 23/2/2018.
*/
class FCClassify extends SQLAlg with Dl4jFunctions {
def train(df: DataFrame, path: String, params: Map[String, String]): Unit = {
def train(df: DataFrame, path: String, params: Map[String, String]): DataFrame = {
dl4jClassificationTrain(df, path, params, () => {

val featureSize = params.getOrElse("featureSize", "-1").toInt
Expand Down Expand Up @@ -55,6 +55,8 @@ class FCClassify extends SQLAlg with Dl4jFunctions {
netConf

})
import df.sparkSession.implicits._
Seq.empty[String].toDF("name")
}

override def load(sparkSession: SparkSession, path: String, params: Map[String, String]): Any = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import streaming.dsl.mmlib.SQLAlg
*/
class SDAutoencoder extends SQLAlg with Dl4jFunctions {

override def train(df: DataFrame, path: String, params: Map[String, String]): Unit = {
override def train(df: DataFrame, path: String, params: Map[String, String]): DataFrame = {
dl4jClassificationTrain(df, path, params, () => {

val featureSize = params.getOrElse("featureSize", "-1").toInt
Expand Down Expand Up @@ -57,6 +57,8 @@ class SDAutoencoder extends SQLAlg with Dl4jFunctions {
netConf

})
import df.sparkSession.implicits._
Seq.empty[String].toDF("name")
}

override def load(sparkSession: SparkSession, path: String, params: Map[String, String]): Any = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import streaming.dsl.mmlib.SQLAlg
* Created by allwefantasy on 26/2/2018.
*/
class VanillaLSTMClassify extends SQLAlg with Dl4jFunctions {
override def train(df: DataFrame, path: String, params: Map[String, String]): Unit = {
override def train(df: DataFrame, path: String, params: Map[String, String]): DataFrame = {
def train(df: DataFrame, path: String, params: Map[String, String]): Unit = {
dl4jClassificationTrain(df, path, params, () => {

Expand Down Expand Up @@ -58,11 +58,12 @@ class VanillaLSTMClassify extends SQLAlg with Dl4jFunctions {

})
}

import df.sparkSession.implicits._
Seq.empty[String].toDF("name")

}

override def load(sparkSession: SparkSession, path: String,params: Map[String, String]): Any = null
override def load(sparkSession: SparkSession, path: String, params: Map[String, String]): Any = null

override def predict(sparkSession: SparkSession, _model: Any, name: String, params: Map[String, String]): UserDefinedFunction = null
}
11 changes: 11 additions & 0 deletions streamingpro-mlsql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,17 @@
</dependencies>
</profile>

<profile>
<id>xgboost</id>
<dependencies>
<dependency>
<groupId>streaming.king</groupId>
<artifactId>streamingpro-xgboost</artifactId>
<version>${project.parent.version}</version>
</dependency>
</dependencies>
</profile>

<profile>
<id>streamingpro-spark-2.2.0-adaptor</id>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package streaming.dsl

import org.apache.spark.sql.SparkSession
import java.util.UUID

import streaming.dsl.mmlib.SQLAlg
import streaming.dsl.parser.DSLSQLParser._
import streaming.dsl.template.TemplateMerge
Expand Down Expand Up @@ -41,8 +42,10 @@ class TrainAdaptor(scriptSQLExecListener: ScriptSQLExecListener) extends DslAdap
if (!sqlAlg.skipPathPrefix) {
path = withPathPrefix(scriptSQLExecListener.pathPrefix(owner), path)
}
sqlAlg.train(df, path, options)
scriptSQLExecListener.setLastSelectTable(null)
val newdf = sqlAlg.train(df, path, options)
val tempTable = UUID.randomUUID().toString.replace("-", "")
newdf.createOrReplaceTempView(tempTable)
scriptSQLExecListener.setLastSelectTable(tempTable)
}
}

Expand Down Expand Up @@ -99,7 +102,7 @@ object MLMapping {
case Some(clzz) =>
Class.forName(clzz).newInstance().asInstanceOf[SQLAlg]
case None =>
if (!name.contains(".") && name.endsWith("InPlace")) {
if (!name.contains(".") && (name.endsWith("InPlace") || name.endsWith("Ext"))) {
Class.forName(s"streaming.dsl.mmlib.algs.SQL${name}").newInstance().asInstanceOf[SQLAlg]
} else {
throw new RuntimeException(s"${name} is not found")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package streaming.dsl.load.batch

/**
* Created by allwefantasy on 12/9/2018.
*/
class ModelSource {
def output() = {

}
}
Loading

0 comments on commit d247544

Please sign in to comment.