Skip to content

Commit

Permalink
支持样本不均衡问题的模型配置
Browse files Browse the repository at this point in the history
  • Loading branch information
allwefantasy committed Feb 5, 2018
1 parent 7452baa commit 24206fa
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 25 deletions.
22 changes: 11 additions & 11 deletions docs/mlsql.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,21 @@ train data as RandomForest.`/tmp/model` where inputCol="featrues" and maxDepth="
如果需要知道算法的输入格式以及算法的参数,可以参看[Spark MLlib](https://spark.apache.org/docs/latest/ml-guide.html)
在MLSQL中,输入格式和算法的参数和Spark MLLib保持一致。

通常,大部分分类或者回归类算法,都支持libsvm格式。你可以通过SQL或者程序生成libsvm格式文件。之后可以通过
### 样本不均衡问题

```sql
load libsvm.`/data/mllib/sample_libsvm_data.txt`
as sample_table;
```
为了解决样本数据不平衡问题,所有模型(目前只支持贝叶斯)都支持一种特殊的训练方式。假设我们是一个二分类,A,B。 A 分类有100个样本,B分类有1000个。
差距有十倍。为了得到一个更好的训练效果,我们会训练十个(最大样本数/最小样本数)模型。

其中sample_table就是一个表,有label和features两个字段。load完成之后就可以喂给算法了。
第一个模型:

```sql
train sample_table as RandomForest.`/tmp/zhuwl_rf_model` where maxDepth="3";
```
A拿到100,从B随机抽样10%(100/1000),训练。

重复第一个模型十次。

这个可以通过在where条件里把multiModels="true" 即可开启。

在预测函数中,会自动拿到置信度最高模型作为预测结果。

对于Word2Vec,输入的是字符串数组就行。
LDA 我们建议输入Int数组。

### 预测

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package streaming.common
import java.io.{BufferedReader, InputStreamReader}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path}
import org.apache.hadoop.fs.{FSDataOutputStream, FileStatus, FileSystem, Path}

import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._

/**
* 5/5/16 WilliamZhu([email protected])
Expand All @@ -32,6 +33,12 @@ object HDFSOperator {
}


def listModelDirectory(path: String): Seq[FileStatus] = {
val fs = FileSystem.get(new Configuration())
fs.listStatus(new Path(path)).filter(f => f.isDirectory)
}


def saveFile(path: String, fileName: String, iterator: Iterator[(String, String)]) = {

var dos: FSDataOutputStream = null
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.apache.spark.util

import org.apache.spark.util.random.XORShiftRandom

/**
* Created by allwefantasy on 5/2/2018.
*/
class WowXORShiftRandom {
val random = new XORShiftRandom()

def nextDouble = {
random.nextDouble()
}

}

object WowXORShiftRandom {
def main(args: Array[String]): Unit = {
val random = new WowXORShiftRandom()
(0 until 1000).foreach { f =>
println(random.nextDouble)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,72 @@
package streaming.dsl.mmlib.algs

import net.csdn.common.logging.Loggers
import org.apache.spark.Partitioner
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.param.Params
import org.apache.spark.ml.util.{MLReadable, MLWritable}
import org.apache.spark.sql.{DataFrame, Dataset, functions => F}
import org.apache.spark.util.WowXORShiftRandom
import streaming.common.HDFSOperator

import scala.collection.mutable.ArrayBuffer

/**
* Created by allwefantasy on 13/1/2018.
*/
trait Functions {
val logger = Loggers.getLogger(getClass)

def sampleUnbalanceWithMultiModel(df: DataFrame, path: String, params: Map[String, String], train: (DataFrame, Int) => Unit) = {
//select count(*) as subLabelCount,label from _ group by labelCol order by subLabelCount asc
val labelCol = params.getOrElse("labelCol", "label")
val labelToCountSeq = df.groupBy(labelCol).agg(F.count(labelCol).as("subLabelCount")).orderBy(F.asc("subLabelCount")).
select(labelCol, "subLabelCount").collect().map { f =>
(f.getDouble(0), f.getLong(1))
}
val forLog = labelToCountSeq.map(f => s"${f._1}:${f._2}").mkString(",")
logger.info(s"computing data stat:${forLog}")
val labelCount = labelToCountSeq.size

val dfWithLabelPartition = df.rdd.map { f =>
(f.getAs[Double](labelCol).toInt, f)
}.partitionBy(new Partitioner {
override def numPartitions: Int = labelCount

override def getPartition(key: Any): Int = {
key.asInstanceOf[Int]
}
}).cache()

try {
val minLabel = labelToCountSeq.head._1
val minCount = labelToCountSeq.head._2

val maxLabel = labelToCountSeq.last._1
val maxCount = labelToCountSeq.last._2

val times = (maxCount.toDouble / minCount).ceil

val labelToCountMapBr = df.sparkSession.sparkContext.broadcast(labelToCountSeq.map { f =>
//sample rate
(f._1, minCount.toDouble / f._2)
}.toMap)
val forLog2 = labelToCountMapBr.value.map(f => s"${f._1}:${f._2}").mkString(",")
logger.info(s"all label sample rate:${forLog2}")

(0 until times.toInt).foreach { time =>
val tempRdd = dfWithLabelPartition.mapPartitionsWithIndex { (label, iter) =>
val wow = new WowXORShiftRandom()
iter.filter(k => wow.nextDouble <= labelToCountMapBr.value(label)).map(f => f._2)
}
val trainData = df.sparkSession.createDataFrame(tempRdd, df.schema)
logger.info(s"training model :${time}")
train(trainData, time)
}
} finally {
dfWithLabelPartition.unpersist(false)
}
}

def configureModel(model: Params, params: Map[String, String]) = {
model.params.map { f =>
Expand Down Expand Up @@ -42,9 +103,34 @@ trait Functions {
modelField.get(model)
}

def getModelField(model: Any, fieldName: String) = {
def getModelField(model: Any, fieldName: String) = {
val modelField = model.getClass.getDeclaredField(fieldName)
modelField.setAccessible(true)
modelField.get(model)
}

def loadModels(path: String, modelType: (String) => Any) = {
val files = HDFSOperator.listModelDirectory(path)
val models = ArrayBuffer[Any]()
files.foreach { f =>
val model = modelType(f.getPath.toString)
models += model
}
models
}

def trainModels[T <: Model[T]](df: DataFrame, path: String, params: Map[String, String], modelType: () => Params) = {

def f(trainData: DataFrame, modelIndex: Int) = {
val alg = modelType()
configureModel(alg, params)
val model = alg.asInstanceOf[Estimator[T]].fit(trainData)
model.asInstanceOf[MLWritable].write.overwrite().save(path + "/" + modelIndex)
}
params.getOrElse("multiModels", "false").toBoolean match {
case true => sampleUnbalanceWithMultiModel(df, path, params, f)
case false =>
f(df, 0)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,38 +1,59 @@
package streaming.dsl.mmlib.algs

import net.csdn.common.logging.Loggers
import org.apache.spark.Partitioner
import org.apache.spark.ml.classification.{NaiveBayes, NaiveBayesModel}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.expressions.UserDefinedFunction
import streaming.dsl.mmlib.SQLAlg
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
import streaming.common.HDFSOperator

import scala.collection.mutable.ArrayBuffer

/**
* Created by allwefantasy on 13/1/2018.
*/
class SQLNaiveBayes extends SQLAlg with Functions {

override def train(df: DataFrame, path: String, params: Map[String, String]): Unit = {
val bayes = new NaiveBayes()
configureModel(bayes, params)
val model = bayes.fit(df)
model.write.overwrite().save(path)
trainModels[NaiveBayesModel](df, path, params, () => {
new NaiveBayes()
})
}

override def load(sparkSession: SparkSession, path: String): Any = {
val model = NaiveBayesModel.load(path)
model
loadModels(path, (tempPath) => {
NaiveBayesModel.load(tempPath)
})
}

override def predict(sparkSession: SparkSession, _model: Any,name:String): UserDefinedFunction = {
val model = sparkSession.sparkContext.broadcast(_model.asInstanceOf[NaiveBayesModel])
override def predict(sparkSession: SparkSession, _model: Any, name: String): UserDefinedFunction = {

val models = sparkSession.sparkContext.broadcast(_model.asInstanceOf[ArrayBuffer[NaiveBayesModel]])

val f = (vec: Vector) => {
val predictRaw = model.value.getClass.getMethod("predictRaw", classOf[Vector]).invoke(model.value, vec).asInstanceOf[Vector]
val raw2probability = model.value.getClass.getMethod("raw2probability", classOf[Vector]).invoke(model.value, predictRaw).asInstanceOf[Vector]
//model.getClass.getMethod("probability2prediction", classOf[Vector]).invoke(model, raw2probability).asInstanceOf[Vector]
raw2probability
models.value.map { model =>
val predictRaw = model.getClass.getMethod("predictRaw", classOf[Vector]).invoke(model, vec).asInstanceOf[Vector]
val raw2probability = model.getClass.getMethod("raw2probability", classOf[Vector]).invoke(model, predictRaw).asInstanceOf[Vector]
//model.getClass.getMethod("probability2prediction", classOf[Vector]).invoke(model, raw2probability).asInstanceOf[Vector]
//概率,分类
(raw2probability(raw2probability.argmax), raw2probability)
}.sortBy(f => f._1).reverse.head._2
}

val f2 = (vec: Vector) => {
models.value.map { model =>
val predictRaw = model.getClass.getMethod("predictRaw", classOf[Vector]).invoke(model, vec).asInstanceOf[Vector]
val raw2probability = model.getClass.getMethod("raw2probability", classOf[Vector]).invoke(model, predictRaw).asInstanceOf[Vector]
//model.getClass.getMethod("probability2prediction", classOf[Vector]).invoke(model, raw2probability).asInstanceOf[Vector]
raw2probability
}
}

sparkSession.udf.register(name + "_raw", f2)

UserDefinedFunction(f, VectorType, Some(Seq(VectorType)))
}
}

0 comments on commit 24206fa

Please sign in to comment.