Skip to content

Commit

Permalink
Merge pull request byzer-org#910 from allwefantasy/ISSUE-908
Browse files Browse the repository at this point in the history
all local path should be uuid to avoid  path conflicks
  • Loading branch information
allwefantasy authored Jan 16, 2019
2 parents a3619ad + 231eab3 commit 8d499d8
Show file tree
Hide file tree
Showing 12 changed files with 60 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ object HDFSOperator {
fs.copyFromLocalFile(new Path(tempLocalPath),
new Path(path))
if (cleanSource) {
FileUtils.deleteQuietly(new File(tempLocalPath))
FileUtils.forceDelete(new File(tempLocalPath))
}

}
Expand All @@ -157,7 +157,7 @@ object HDFSOperator {
val fs = FileSystem.get(new Configuration())
val tmpFile = new File(tempLocalPath)
if (tmpFile.exists()) {
FileUtils.deleteQuietly(tmpFile)
FileUtils.forceDelete(tmpFile)
}
fs.copyToLocalFile(new Path(path), new Path(tempLocalPath))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

package streaming.dsl.mmlib.algs

import scala.collection.JavaConverters._

import org.apache.spark.sql.SparkSession
import streaming.dsl.mmlib.algs.python._

import scala.collection.JavaConverters._

/**
* a extention of [[SQLPythonAlg]], which you can train python algthrim without mlsql.
*/
Expand All @@ -36,6 +36,9 @@ class SQLExternalPythonAlg extends SQLPythonAlg {
val pythonVer = systemParam.getOrElse("pythonVer", "2.7")
val metasTemp = Seq(Map("pythonPath" -> pythonPath, "pythonVer" -> pythonVer))

// distribute python project
val localPathConfig = LocalPathConfig.buildFromParams(_path)

val algIndex = params.getOrElse("algIndex", "-1").toInt
val fitParam = arrayParamsWithIndex("fitParam", params)
val selectedFitParam = {
Expand All @@ -60,8 +63,6 @@ class SQLExternalPythonAlg extends SQLPythonAlg {
}
}

// distribute python project
val localPathConfig = LocalPathConfig.buildFromParams(_path)

val loadPythonProject = params.contains("pythonProjectPath")
if (loadPythonProject) {
Expand All @@ -70,10 +71,9 @@ class SQLExternalPythonAlg extends SQLPythonAlg {
resourceParams += ("pythonProjectPath" -> path)
})
}

val taskDirectory = localPathConfig.localRunPath + params("pythonProjectPath").split("/").last
val pythonScript = PythonAlgProject.loadProject(params, sparkSession)
val resources = selectedFitParam + ("resource" -> resourceParams.asJava)
ModelMeta(pythonScript.get, params, Seq(""), resources, Option(taskDirectory))
ModelMeta(pythonScript.get, params, Seq(""), resources, localPathConfig, Map(), Option(taskDirectory))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,23 @@ object SQLPythonFunc {
// -- path related (local/hdfs)

def getLocalTempModelPath(hdfsPath: String) = {
s"${getLocalBasePath}/models/${WowMD5.md5Hash(hdfsPath)}"
s"${getLocalBasePath}/models/${UUID.randomUUID().toString}"
}

def localOutputPath(hdfsPath: String) = {
s"${getLocalBasePath}/output/${WowMD5.md5Hash(hdfsPath)}"
s"${getLocalBasePath}/output/${UUID.randomUUID().toString}"
}

def getLocalTempDataPath(hdfsPath: String) = {
s"${getLocalBasePath}/data/${WowMD5.md5Hash(hdfsPath)}"
s"${getLocalBasePath}/data/${UUID.randomUUID().toString}"
}

def getLocalRunPath(hdfsPath: String) = {
s"${getLocalBasePath}/mlsqlrun/${UUID.randomUUID().toString}"
}

def getLocalTempResourcePath(hdfsPath: String, resourceName: String) = {
s"${getLocalBasePath}/resource/${WowMD5.md5Hash(hdfsPath)}/${resourceName}"
s"${getLocalBasePath}/resource/${UUID.randomUUID()}/${resourceName}"
}

def getLocalBasePath = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ import java.util
import java.util.UUID

import org.apache.commons.io.FileUtils
import org.apache.spark.{APIDeployPythonRunnerEnv, SparkCoreVersion, TaskContext}
import org.apache.spark.api.python.WowPythonRunner
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.types.{MapType, StringType, StructField, StructType}
import org.apache.spark.util.ObjPickle.{pickleInternalRow, unpickle}
import org.apache.spark.util.{PredictTaskContext, PythonProjectExecuteRunner, TaskContextUtil, VectorSerDer}
import org.apache.spark.util.VectorSerDer.{ser_vector, vector_schema}
import org.apache.spark.util.{PredictTaskContext, PythonProjectExecuteRunner, VectorSerDer}
import org.apache.spark.{APIDeployPythonRunnerEnv, SparkCoreVersion}
import streaming.dsl.ScriptSQLExec
import streaming.dsl.mmlib.algs.{Functions, SQLPythonAlg, SQLPythonFunc}
import streaming.dsl.mmlib.algs.{Functions, SQLPythonAlg}
import streaming.log.{Logging, WowLog}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -155,9 +155,9 @@ class APIPredict extends Logging with WowLog with Serializable {
}

logInfo(format(s"daemonCommand => ${daemonCommand.mkString(" ")} workerCommand=> ${workerCommand.mkString(" ")}"))

val modelHDFSToLocalPath = modelMeta.modelHDFSToLocalPath
val f = (v: org.apache.spark.ml.linalg.Vector, modelPath: String) => {
val modelRow = InternalRow.fromSeq(Seq(SQLPythonFunc.getLocalTempModelPath(modelPath)))
val modelRow = InternalRow.fromSeq(Seq(modelHDFSToLocalPath.getOrElse(modelPath, "")))
val trainParamsRow = InternalRow.fromSeq(Seq(ArrayBasedMapData(trainParams)))
val v_ser = pickleInternalRow(Seq(ser_vector(v)).toIterator, vector_schema())
val v_ser2 = pickleInternalRow(Seq(modelRow).toIterator, StructType(Seq(StructField("modelPath", StringType))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ class BatchPredict extends Logging with WowLog with Serializable {

val keepLocalDirectory = params.getOrElse("keepLocalDirectory", "false").toBoolean
val modelMetaManager = new ModelMetaManager(spark, _path, params)
val modelMeta = modelMetaManager.loadMetaAndModel
var (selectedFitParam, resourceParams) = new ResourceManager(params).loadResourceInRegister(spark, modelMeta)
val modelMeta = modelMetaManager.loadMetaAndModel(null, Map())
var (selectedFitParam, resourceParams, modelHDFSToLocalPath) = new ResourceManager(params).loadResourceInRegister(spark, modelMeta)

modelMeta.copy(resources = selectedFitParam)
modelMeta.copy(resources = selectedFitParam, modelHDFSToLocalPath = modelHDFSToLocalPath)
val resources = modelMeta.resources

// if pythonScriptPath is defined in predict/run, then use it otherwise find them in train params.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ class ModelMetaManager(sparkSession: SparkSession, _path: String, params: Map[St

val wowMetas = sparkSession.read.parquet(metaPath + "/1").collect()

def loadMetaAndModel = {
def loadMetaAndModel(localPathConfig: LocalPathConfig, modelHDFSToLocalPath: Map[String, String]) = {
val _trainParams = trainParams
val pythonTrainScript = PythonAlgProject.loadProject(_trainParams, sparkSession)
ModelMeta(pythonTrainScript.get, _trainParams, modelEntityPaths, Map())
ModelMeta(pythonTrainScript.get, _trainParams, modelEntityPaths, Map(), localPathConfig, modelHDFSToLocalPath)
}

def maxVersion = getModelVersion(_path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ class PythonLoad extends Logging with WowLog with Serializable {
def load(sparkSession: SparkSession, _path: String, params: Map[String, String]): ModelMeta = {

val modelMetaManager = new ModelMetaManager(sparkSession, _path, params)
val modelMeta = modelMetaManager.loadMetaAndModel
val localPathConfig = LocalPathConfig.buildFromParams(_path)
val modelMeta = modelMetaManager.loadMetaAndModel(localPathConfig, Map())

val taskDirectory = localPathConfig.localRunPath + "/" + modelMeta.pythonScript.projectName

var (selectedFitParam, resourceParams) = new ResourceManager(params).loadResourceInRegister(sparkSession, modelMeta)
var (selectedFitParam, resourceParams, modelHDFSToLocalPath) = new ResourceManager(params).loadResourceInRegister(sparkSession, modelMeta)


modelMeta.pythonScript.scriptType match {
Expand All @@ -54,6 +54,6 @@ class PythonLoad extends Logging with WowLog with Serializable {
})
}

modelMeta.copy(resources = selectedFitParam + ("resource" -> resourceParams.asJava), taskDirectory = Option(taskDirectory))
modelMeta.copy(resources = selectedFitParam + ("resource" -> resourceParams.asJava), taskDirectory = Option(taskDirectory), modelHDFSToLocalPath = modelHDFSToLocalPath)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -514,14 +514,14 @@ class PythonTrain extends Functions with Serializable {
trainFailFlag = true
} finally {
// delete local model
FileUtils.deleteQuietly(new File(tempModelLocalPath))
FileUtils.forceDelete(new File(tempModelLocalPath))
// delete local data
if (!keepLocalDirectory) {
FileUtils.deleteQuietly(new File(tempDataLocalPathWithAlgSuffix))
FileUtils.forceDelete(new File(tempDataLocalPathWithAlgSuffix))
}
// delete resource
resourceParams.foreach { rp =>
FileUtils.deleteQuietly(new File(rp._2))
FileUtils.forceDelete(new File(rp._2))
}
}
val status = if (trainFailFlag) "fail" else "success"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ class ResourceManager(params: Map[String, String]) extends Logging with WowLog {
val selectedFitParam = if (algIndex == -1) Map[String, String]() else fitParam(algIndex)._2
val loadResource = selectedFitParam.keys.map(_.split("\\.")(0)).toSet.contains("resource")
var resourceParams = Map.empty[String, String]

var modelHDFSToLocalPath = Map.empty[String, String]
// make sure every executor have the model in local directory.
// we should unregister manually
modelMeta.modelEntityPaths.foreach { modelPath =>
val tempModelLocalPath = SQLPythonFunc.getLocalTempModelPath(modelPath)
modelHDFSToLocalPath += (modelPath -> tempModelLocalPath)
SQLPythonAlg.distributeResource(sparkSession, modelPath, tempModelLocalPath)

if (loadResource) {
Expand All @@ -68,6 +69,6 @@ class ResourceManager(params: Map[String, String]) extends Logging with WowLog {
}

}
(selectedFitParam,resourceParams)
(selectedFitParam, resourceParams, modelHDFSToLocalPath)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,5 +151,7 @@ case class ModelMeta(pythonScript: PythonScript,
trainParams: Map[String, String],
modelEntityPaths: Seq[String],
resources: Map[String, Any],
localPathConfig: LocalPathConfig,
modelHDFSToLocalPath: Map[String, String],
taskDirectory: Option[String] = None
)
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,20 @@ object ScriptCode {
|
|set data='''
|{"jack":1}
|{"jack":2}
|{"jack":3}
|{"jack":4}
|{"jack":5}
|{"jack":6}
|''';
|
|load jsonStr.`data` as testData;
|load script.`python1` as python1;
|load script.`dependencies` as dependencies;
|
|run testData as RepartitionExt.`` where partitionNum="3" as testData1;
|-- train sklearn model
|run testData as PythonParallelExt.`${modelPath}`
|run testData1 as PythonParallelExt.`${modelPath}`
|where scripts="python1"
|and entryPoint="python1"
|and condaFile="dependencies"
Expand Down Expand Up @@ -182,14 +188,20 @@ object ScriptCode {
|
|set data='''
|{"jack":1}
|{"jack":2}
|{"jack":3}
|{"jack":4}
|{"jack":5}
|{"jack":6}
|''';
|
|load jsonStr.`data` as testData;
|load script.`python1` as python1;
|load script.`dependencies` as dependencies;
|
|run testData as RepartitionExt.`` where partitionNum="3" as testData1;
|-- train sklearn model
|run testData as PythonAlg.`${modelPath}`
|run testData1 as PythonAlg.`${modelPath}`
|where scripts="python1"
|and entryPoint="python1"
|and condaFile="dependencies"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@

package org.apache.spark.scheduler.cluster

import java.util.concurrent.TimeUnit

import org.apache.spark.{SparkContext, SparkEnv}
import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.ps.cluster.Message
import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.security.CryptoStreamUtils
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.ThreadUtils

import scala.collection.mutable
import scala.collection.mutable.HashMap

/**
Expand Down Expand Up @@ -84,7 +81,15 @@ class PSDriverEndpoint(sc: SparkContext, override val rpcEnv: RpcEnv)
}
case Message.CopyModelToLocal(modelPath, destPath) =>
val ks = sc.getExecutorIds().toSet
executorDataMap.foreach { ed =>
val hostMap = new mutable.HashMap[String, (String, ExecutorData)]()

executorDataMap.foreach { f =>
if (!hostMap.contains(f._2.executorHost)) {
hostMap.put(f._2.executorHost, (f._1, f._2))
}
}

hostMap.values.foreach { ed =>
if (ks.contains(ed._1)) {
ed._2.executorEndpoint.askSync[Boolean](Message.CopyModelToLocal(modelPath, destPath))
}
Expand Down

0 comments on commit 8d499d8

Please sign in to comment.