Skip to content

Commit

Permalink
vec_image 函数迁移到opencv子模块中
Browse files Browse the repository at this point in the history
  • Loading branch information
allwefantasy committed May 29, 2018
1 parent 52dd982 commit 5c28090
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 10 deletions.
2 changes: 1 addition & 1 deletion docs/mlsql-data-processing-model.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ as images;
train images as OpenCVImage.`/tmp/word2vecinplace`
where inputCol="imagePath"
-- 宽度和高度重新设置为100
and shape="100,100"
and shape="100,100,-1"
;
load parquet.`/tmp/word2vecinplace/data`
as imagesWithResize;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package streaming.dsl.mmlib.algs.processing

import org.apache.spark.sql.{Row, UDFRegistration}
import streaming.dsl.mmlib.algs.processing.image.ImageSchema

/**
* Created by allwefantasy on 29/5/2018.
*/
object UDFFunctions {
def imageVec(uDFRegistration: UDFRegistration) = {
uDFRegistration.register("vec_image", (a: Row) => {
ImageSchema.toSeq(a)
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.apache.spark.sql.execution.streaming

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.internal.SQLConf
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

/**
* Created by allwefantasy on 29/5/2018.
*/
class NewFileStreamSinkLog(
metadataLogVersion: Int,
sparkSession: SparkSession,
path: String
)
extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, sparkSession, path) {

private implicit val formats = Serialization.formats(NoTypeHints)

protected override val fileCleanupDelayMs = sparkSession.sessionState.conf.fileSinkLogCleanupDelay

protected override val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSinkLogDeletion

protected override val defaultCompactInterval =
sparkSession.sessionState.conf.fileSinkLogCompactInterval

require(defaultCompactInterval > 0,
s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $defaultCompactInterval) " +
"to a positive value.")

override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
val deletedFiles = logs.filter(_.action == FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet
if (deletedFiles.isEmpty) {
logs
} else {
logs.filter(f => !deletedFiles.contains(f.path))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import scala.collection.mutable
import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
import streaming.common.UnicodeUtils
import streaming.dsl.mmlib.algs.processing.image.ImageSchema

/**
* Created by allwefantasy on 3/5/2017.
Expand Down Expand Up @@ -107,12 +106,6 @@ object Functions {
})
}

def imageVec(uDFRegistration: UDFRegistration) = {
uDFRegistration.register("vec_image", (a: Row) => {
ImageSchema.toSeq(a)
})
}

def onehot(uDFRegistration: UDFRegistration) = {
uDFRegistration.register("onehot", (a: Int, size: Int) => {
val oneValue = Array(1.0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ trait BasicMLSQLConfig {
"-streaming.platform", "spark",
"-streaming.enableHiveSupport", "true",
"-streaming.spark.service", "false",
"-streaming.udf.clzznames", "streaming.crawler.udf.Functions",
"-streaming.udf.clzznames", "streaming.crawler.udf.Functions,streaming.dsl.mmlib.algs.processing.UDFFunctions",
"-streaming.unittest", "true"
)

Expand All @@ -24,7 +24,7 @@ trait BasicMLSQLConfig {
"-streaming.spark.service", "false",
"-streaming.unittest", "true",
"-streaming.enableCarbonDataSupport", "true",
"-streaming.udf.clzznames", "streaming.crawler.udf.Functions",
"-streaming.udf.clzznames", "streaming.crawler.udf.Functions,streaming.dsl.mmlib.algs.processing.UDFFunctions",
"-streaming.carbondata.store", "/tmp/carbondata/store",
"-streaming.carbondata.meta", "/tmp/carbondata/meta"
)
Expand Down

0 comments on commit 5c28090

Please sign in to comment.