Skip to content

Commit

Permalink
udpate
Browse files Browse the repository at this point in the history
  • Loading branch information
allwefantasy committed May 12, 2019
1 parent 526794b commit f29b8ef
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,15 @@ object FVectors {
}.unzip
new SparseVector(selectedIndices.length, sliceInds.toArray, sliceVals.toArray)
}

def range(vector: SparseVector, range: Array[Int]): SparseVector = {
val start = range(0)
val end = range(1)

val (sliceVals, sliceInds) = vector.values.zip(vector.indices).filter { d =>
val (v, i) = d
i < end && i >= start
}.unzip
new SparseVector(end - start, sliceInds.toArray, sliceVals.toArray)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,17 @@ object Functions {
})
}

def vec_range(uDFRegistration: UDFRegistration) = {
uDFRegistration.register("vec_range", (vec: Vector, inds: Seq[Int]) => {
assert(inds.size == 2)
vec match {
case features: DenseVector => Vectors.dense(features.toArray.slice(inds(0), inds(1)))
case features: SparseVector => FVectors.range(features, inds.toArray)
}
})
}


/*
1 - x.dot(y)/(x.norm(2)*y.norm(2))
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ object IncludeAdaptor {
val mapping = Map[String, String](
"hdfs" -> "streaming.dsl.mmlib.algs.includes.HDFSIncludeSource",
"http" -> "streaming.dsl.mmlib.algs.includes.HTTPIncludeSource",
"store" -> "tech.mlsql.dsl.includes.StoreIncludeSource",

"function" -> "streaming.dsl.mmlib.algs.includes.analyst.HttpBaseDirIncludeSource",
"view" -> "streaming.dsl.mmlib.algs.includes.analyst.HttpBaseDirIncludeSource",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ object StringFeature extends BaseFeatureFunctions with Logging with WowLog {
val spark = df.sparkSession

//tfidf feature
val tfIDFconfig = Map("inputCol" -> inputCol, "numFeatures" -> wordCount.toString, "binary" -> "true")
val tfIDFconfig = Map("inputCol" -> inputCol, "numFeatures" -> wordCount.toString, "binary" -> "false")
logInfo(format(s"[TFIDF] run tf/idf estimator with follow configuration ${tfIDFconfig.map(s => s"${s._1}->${s._2}").mkString(" ; ")} "))
val tfidf = new SQLTfIdf()
tfidf.train(newDF, TF_IDF_PATH(metaPath, inputCol), tfIDFconfig)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package tech.mlsql.dsl.includes

import org.apache.http.client.fluent.Request
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.mlsql.session.MLSQLException
import streaming.common.PathFun
import streaming.dsl.IncludeSource
import streaming.log.Logging

/**
* 2019-05-12 WilliamZhu([email protected])
*/
class StoreIncludeSource extends IncludeSource with Logging {
override def fetchSource(sparkSession: SparkSession, path: String, options: Map[String, String]): String = {

val res = Request.Get(PathFun("http://repo.store.mlsql.tech").add(path).toPath)
.connectTimeout(60 * 1000)
.socketTimeout(10 * 60 * 1000)
.execute().returnContent().asString()

if (res == null) {
throw new MLSQLException(
s"""
|${path} is not found
""".stripMargin)
}
res
}

override def skipPathPrefix: Boolean = true
}

0 comments on commit f29b8ef

Please sign in to comment.